First rough steps at replacing SimpleReasoner's incremental mode with selective recomputing, for major improvement to RDF uploading speed. A few unit tests related to retraction with sameAs aren't passing yet.

This commit is contained in:
brianjlowe 2015-11-23 20:47:40 +02:00
parent 3157b0941c
commit dcfd95ca9d
3 changed files with 143 additions and 75 deletions

View file

@ -225,9 +225,19 @@ public class RDFUploadController extends JenaIngestController {
} }
} }
private static final boolean BEGIN = true;
private static final boolean END = !BEGIN;
private ChangeSet makeChangeSet(RDFService rdfService) {
ChangeSet cs = rdfService.manufactureChangeSet();
cs.addPreChangeEvent(new BulkUpdateEvent(null, BEGIN));
cs.addPostChangeEvent(new BulkUpdateEvent(null, END));
return cs;
}
private void addUsingRDFService(InputStream in, String languageStr, private void addUsingRDFService(InputStream in, String languageStr,
RDFService rdfService) { RDFService rdfService) {
ChangeSet changeSet = rdfService.manufactureChangeSet(); ChangeSet changeSet = makeChangeSet(rdfService);
RDFService.ModelSerializationFormat format = RDFService.ModelSerializationFormat format =
("RDF/XML".equals(languageStr) ("RDF/XML".equals(languageStr)
|| "RDF/XML-ABBREV".equals(languageStr)) || "RDF/XML-ABBREV".equals(languageStr))
@ -333,7 +343,7 @@ public class RDFUploadController extends JenaIngestController {
RDFService rdfService = new RDFServiceModel(mainModel); RDFService rdfService = new RDFServiceModel(mainModel);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
changesModel.write(out, "N-TRIPLE"); changesModel.write(out, "N-TRIPLE");
ChangeSet cs = rdfService.manufactureChangeSet(); ChangeSet cs = makeChangeSet(rdfService);
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
cs.addRemoval(in, RDFService.ModelSerializationFormat.NTRIPLE, null); cs.addRemoval(in, RDFService.ModelSerializationFormat.NTRIPLE, null);
try { try {
@ -398,7 +408,7 @@ public class RDFUploadController extends JenaIngestController {
private void readIntoModel(InputStream in, String language, private void readIntoModel(InputStream in, String language,
RDFService rdfService, String modelName) { RDFService rdfService, String modelName) {
ChangeSet cs = rdfService.manufactureChangeSet(); ChangeSet cs = makeChangeSet(rdfService);
cs.addAddition(in, RDFServiceUtils.getSerializationFormatFromJenaString( cs.addAddition(in, RDFServiceUtils.getSerializationFormatFromJenaString(
language), modelName); language), modelName);
try { try {

View file

@ -5,19 +5,18 @@ package edu.cornell.mannlib.vitro.webapp.reasoner;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.StringWriter;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import com.hp.hpl.jena.rdf.model.NodeIterator;
import com.hp.hpl.jena.rdf.model.Property;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,6 +30,8 @@ import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.query.ResultSetFactory;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.NodeIterator;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.ResourceFactory;
@ -47,6 +48,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
public class ABoxRecomputer { public class ABoxRecomputer {
@ -91,10 +93,15 @@ public class ABoxRecomputer {
return recomputing; return recomputing;
} }
/**
* Recompute all inferences.
*/
public void recompute() { public void recompute() {
recompute(null);
}
/**
* Recompute inferences for specified collection of individual URIs,
* or all URIs if parameter is null
*/
public void recompute(Queue<String> individualURIs) {
synchronized (lock1) { synchronized (lock1) {
if (recomputing) { if (recomputing) {
return; return;
@ -111,7 +118,7 @@ public class ABoxRecomputer {
} }
// Create a type cache for this execution and pass it to the recompute function // Create a type cache for this execution and pass it to the recompute function
// Ensures that caches are only valid for the length of one recompute // Ensures that caches are only valid for the length of one recompute
recomputeABox(new TypeCaches()); recomputeABox(individualURIs, new TypeCaches());
} finally { } finally {
if (searchIndexer != null) { if (searchIndexer != null) {
searchIndexer.unpause(); searchIndexer.unpause();
@ -123,21 +130,26 @@ public class ABoxRecomputer {
} }
/* /*
* Recompute the entire ABox inference graph. * Recompute the ABox inference graph for the specified collection of
* individual URIs, or all individuals if the collection is null.
*/ */
protected void recomputeABox(TypeCaches caches) { protected void recomputeABox(Queue<String> individuals, TypeCaches caches) {
boolean printLog = false;
if (individuals == null) {
printLog = true;
log.info("Recomputing ABox inferences."); log.info("Recomputing ABox inferences.");
log.info("Finding individuals in ABox."); log.info("Finding individuals in ABox.");
Collection<String> individuals = this.getAllIndividualURIs(); individuals = this.getAllIndividualURIs();
log.info("Recomputing inferences for " + individuals.size() + " individuals"); log.info("Recomputing inferences for " + individuals.size() + " individuals");
}
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
int numInds = 0; int numInds = 0;
Model rebuildModel = ModelFactory.createDefaultModel(); Model rebuildModel = ModelFactory.createDefaultModel();
Model additionalInferences = ModelFactory.createDefaultModel(); Model additionalInferences = ModelFactory.createDefaultModel();
List<String> individualsInBatch = new ArrayList<String>(); List<String> individualsInBatch = new ArrayList<String>();
Iterator<String> individualIt = individuals.iterator(); //Iterator<String> individualIt = individuals.iterator();
while (individualIt.hasNext()) { while (!individuals.isEmpty()) {
String individualURI = individualIt.next(); String individualURI = individuals.poll();
try { try {
additionalInferences.add(recomputeIndividual( additionalInferences.add(recomputeIndividual(
individualURI, rebuildModel, caches)); individualURI, rebuildModel, caches));
@ -145,7 +157,7 @@ public class ABoxRecomputer {
individualsInBatch.add(individualURI); individualsInBatch.add(individualURI);
boolean batchFilled = (numInds % BATCH_SIZE) == 0; boolean batchFilled = (numInds % BATCH_SIZE) == 0;
boolean reportingInterval = (numInds % REPORTING_INTERVAL) == 0; boolean reportingInterval = (numInds % REPORTING_INTERVAL) == 0;
if (batchFilled || !individualIt.hasNext()) { if (batchFilled || individuals.isEmpty()) {
log.debug(rebuildModel.size() + " total inferences"); log.debug(rebuildModel.size() + " total inferences");
updateInferenceModel(rebuildModel, individualsInBatch); updateInferenceModel(rebuildModel, individualsInBatch);
rebuildModel.removeAll(); rebuildModel.removeAll();
@ -175,8 +187,10 @@ public class ABoxRecomputer {
log.error("Unable to write additional inferences from reasoner plugins", e); log.error("Unable to write additional inferences from reasoner plugins", e);
} }
} }
if (printLog) {
log.info("Finished recomputing inferences"); log.info("Finished recomputing inferences");
} }
}
private static final boolean RUN_PLUGINS = true; private static final boolean RUN_PLUGINS = true;
private static final boolean SKIP_PLUGINS = !RUN_PLUGINS; private static final boolean SKIP_PLUGINS = !RUN_PLUGINS;
@ -330,7 +344,6 @@ public class ABoxRecomputer {
mostSpecificTypes = getMostSpecificTypes(individual, assertedTypes); mostSpecificTypes = getMostSpecificTypes(individual, assertedTypes);
caches.cacheMostSpecificTypes(key, mostSpecificTypes); caches.cacheMostSpecificTypes(key, mostSpecificTypes);
} }
return mostSpecificTypes; return mostSpecificTypes;
} }
@ -343,6 +356,8 @@ public class ABoxRecomputer {
" FILTER NOT EXISTS { \n" + " FILTER NOT EXISTS { \n" +
" <" + individual.getURI() + "> a ?type2 . \n" + " <" + individual.getURI() + "> a ?type2 . \n" +
" ?type2 <" + RDFS.subClassOf.getURI() + "> ?type. \n" + " ?type2 <" + RDFS.subClassOf.getURI() + "> ?type. \n" +
" FILTER (?type != ?type2) \n" +
" FILTER NOT EXISTS { ?type <" + OWL.equivalentClass + "> ?type2 } \n" +
" } \n" + " } \n" +
" FILTER NOT EXISTS { \n" + " FILTER NOT EXISTS { \n" +
" <" + individual.getURI() + "> <" + VitroVocabulary.MOST_SPECIFIC_TYPE + "> ?type \n" + " <" + individual.getURI() + "> <" + VitroVocabulary.MOST_SPECIFIC_TYPE + "> ?type \n" +
@ -395,8 +410,8 @@ public class ABoxRecomputer {
/* /*
* Get the URIs for all individuals in the system * Get the URIs for all individuals in the system
*/ */
protected Collection<String> getAllIndividualURIs() { protected Queue<String> getAllIndividualURIs() {
HashSet<String> individualURIs = new HashSet<String>(); Queue<String> individualURIs = new ArrayDeque<String>();
List<String> classList = new ArrayList<String>(); List<String> classList = new ArrayList<String>();
tboxModel.enterCriticalSection(Lock.READ); tboxModel.enterCriticalSection(Lock.READ);
try { try {
@ -420,7 +435,7 @@ public class ABoxRecomputer {
return individualURIs; return individualURIs;
} }
protected void getIndividualURIs(String queryString, Set<String> individuals) { protected void getIndividualURIs(String queryString, Queue<String> individuals) {
int batchSize = 50000; int batchSize = 50000;
int offset = 0; int offset = 0;
boolean done = false; boolean done = false;
@ -522,9 +537,15 @@ public class ABoxRecomputer {
builder.append("SELECT\n") builder.append("SELECT\n")
.append(" ?object\n") .append(" ?object\n")
.append("WHERE\n") .append("WHERE {\n")
.append("{\n") .append(" GRAPH ?g { \n")
.append(" {\n")
.append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n") .append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n")
.append(" } UNION {\n")
.append(" ?object <" + OWL.sameAs + "> <" + individualUri + "> .\n")
.append(" }\n")
.append(" } \n")
.append(" FILTER (?g != <" + ModelNames.ABOX_INFERENCES + ">)\n")
.append("}\n"); .append("}\n");
rdfService.sparqlSelectQuery(builder.toString(), new ResultSetConsumer() { rdfService.sparqlSelectQuery(builder.toString(), new ResultSetConsumer() {

View file

@ -8,7 +8,9 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -75,6 +77,8 @@ public class SimpleReasoner extends StatementListener {
VitroModelFactory.createOntologyModel()) VitroModelFactory.createOntologyModel())
.createAnnotationProperty(mostSpecificTypePropertyURI); .createAnnotationProperty(mostSpecificTypePropertyURI);
private Queue<String> individualURIqueue = new ConcurrentLinkedQueue<String>();
// DeltaComputer // DeltaComputer
private CumulativeDeltaModeler aBoxDeltaModeler1 = null; private CumulativeDeltaModeler aBoxDeltaModeler1 = null;
private CumulativeDeltaModeler aBoxDeltaModeler2 = null; private CumulativeDeltaModeler aBoxDeltaModeler2 = null;
@ -183,6 +187,31 @@ public class SimpleReasoner extends StatementListener {
return this.doSameAs; return this.doSameAs;
} }
private void listenToStatement(Statement stmt) {
if(stmt.getSubject().isURIResource()) {
if (!individualURIqueue.contains(stmt.getSubject().getURI())) {
individualURIqueue.add(stmt.getSubject().getURI());
}
}
if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) {
if (!individualURIqueue.contains(stmt.getObject().asResource().getURI())) {
individualURIqueue.add(stmt.getObject().asResource().getURI());
}
}
if(!accumulateChanges || individualURIqueue.size() > SAFETY_VALVE) {
recomputeIndividuals();
}
}
private static final int SAFETY_VALVE = 1000000; // one million individuals
private void recomputeIndividuals() {
recomputer.recompute(individualURIqueue);
individualURIqueue.clear();
}
private boolean accumulateChanges = false;
/* /*
* Performs incremental ABox reasoning based * Performs incremental ABox reasoning based
* on the addition of a new statement * on the addition of a new statement
@ -190,21 +219,24 @@ public class SimpleReasoner extends StatementListener {
*/ */
@Override @Override
public void addedStatement(Statement stmt) { public void addedStatement(Statement stmt) {
try {
if (stmt.getPredicate().equals(RDF.type)) {
addedABoxTypeAssertion(stmt, inferenceModel, new HashSet<String>());
setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
} else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) {
addedABoxSameAsAssertion(stmt, inferenceModel);
} else {
addedABoxAssertion(stmt, inferenceModel);
}
doPlugins(ModelUpdate.Operation.ADD,stmt); doPlugins(ModelUpdate.Operation.ADD,stmt);
listenToStatement(stmt);
} catch (Exception e) { // don't stop the edit if there's an exception // try {
log.error("Exception while computing inferences: " + e.getMessage()); // if (stmt.getPredicate().equals(RDF.type)) {
} // addedABoxTypeAssertion(stmt, inferenceModel, new HashSet<String>());
// setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
// } else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) {
// addedABoxSameAsAssertion(stmt, inferenceModel);
// } else {
// addedABoxAssertion(stmt, inferenceModel);
// }
//
// doPlugins(ModelUpdate.Operation.ADD,stmt);
//
// } catch (Exception e) { // don't stop the edit if there's an exception
// log.error("Exception while computing inferences: " + e.getMessage());
// }
} }
/* /*
@ -214,11 +246,13 @@ public class SimpleReasoner extends StatementListener {
*/ */
@Override @Override
public void removedStatement(Statement stmt) { public void removedStatement(Statement stmt) {
try { doPlugins(ModelUpdate.Operation.RETRACT,stmt);
handleRemovedStatement(stmt); listenToStatement(stmt);
} catch (Exception e) { // don't stop the edit if there's an exception // try {
log.error("Exception while retracting inferences: ", e); // handleRemovedStatement(stmt);
} // } catch (Exception e) { // don't stop the edit if there's an exception
// log.error("Exception while retracting inferences: ", e);
// }
} }
/* /*
@ -1597,36 +1631,39 @@ public class SimpleReasoner extends StatementListener {
if (event instanceof BulkUpdateEvent) { if (event instanceof BulkUpdateEvent) {
if (((BulkUpdateEvent) event).getBegin()) { if (((BulkUpdateEvent) event).getBegin()) {
this.accumulateChanges = true;
log.info("received a bulk update begin event"); log.info("received a bulk update begin event");
if (deltaComputerProcessing) { // if (deltaComputerProcessing) {
eventCount++; // eventCount++;
log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); // log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount);
return; // return;
} else { // } else {
batchMode = 1; // batchMode = 1;
if (aBoxDeltaModeler1.getRetractions().size() > 0) { // if (aBoxDeltaModeler1.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode."); // log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode.");
} // }
//
if (aBoxDeltaModeler2.getRetractions().size() > 0) { // if (aBoxDeltaModeler2.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode."); // log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode.");
} // }
//
log.info("initializing batch mode 1"); // log.info("initializing batch mode 1");
} // }
} else { } else {
log.info("received a bulk update end event"); log.info("received a bulk update end event");
if (!deltaComputerProcessing) { this.accumulateChanges = false;
deltaComputerProcessing = true; recomputeIndividuals();
VitroBackgroundThread thread = new VitroBackgroundThread(new DeltaComputer(), // if (!deltaComputerProcessing) {
"SimpleReasoner.DeltaComputer"); // deltaComputerProcessing = true;
thread.setWorkLevel(WORKING); // VitroBackgroundThread thread = new VitroBackgroundThread(new DeltaComputer(),
thread.start(); // "SimpleReasoner.DeltaComputer");
} else { // thread.setWorkLevel(WORKING);
eventCount--; // thread.start();
log.info("received a bulk update end event while currently processing in aynchronous mode. Event count = " + eventCount); // } else {
} // eventCount--;
// log.info("received a bulk update end event while currently processing in aynchronous mode. Event count = " + eventCount);
// }
} }
} }
} }