diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java index 2d305c5c5..037b7152d 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java @@ -225,9 +225,19 @@ public class RDFUploadController extends JenaIngestController { } } + private static final boolean BEGIN = true; + private static final boolean END = !BEGIN; + + private ChangeSet makeChangeSet(RDFService rdfService) { + ChangeSet cs = rdfService.manufactureChangeSet(); + cs.addPreChangeEvent(new BulkUpdateEvent(null, BEGIN)); + cs.addPostChangeEvent(new BulkUpdateEvent(null, END)); + return cs; + } + private void addUsingRDFService(InputStream in, String languageStr, RDFService rdfService) { - ChangeSet changeSet = rdfService.manufactureChangeSet(); + ChangeSet changeSet = makeChangeSet(rdfService); RDFService.ModelSerializationFormat format = ("RDF/XML".equals(languageStr) || "RDF/XML-ABBREV".equals(languageStr)) @@ -333,7 +343,7 @@ public class RDFUploadController extends JenaIngestController { RDFService rdfService = new RDFServiceModel(mainModel); ByteArrayOutputStream out = new ByteArrayOutputStream(); changesModel.write(out, "N-TRIPLE"); - ChangeSet cs = rdfService.manufactureChangeSet(); + ChangeSet cs = makeChangeSet(rdfService); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); cs.addRemoval(in, RDFService.ModelSerializationFormat.NTRIPLE, null); try { @@ -398,7 +408,7 @@ public class RDFUploadController extends JenaIngestController { private void readIntoModel(InputStream in, String language, RDFService rdfService, String modelName) { - ChangeSet cs = rdfService.manufactureChangeSet(); + ChangeSet cs = makeChangeSet(rdfService); cs.addAddition(in, RDFServiceUtils.getSerializationFormatFromJenaString( language), modelName); try { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 78aa6a4f4..8725bc7f9 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -5,19 +5,18 @@ package edu.cornell.mannlib.vitro.webapp.reasoner; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; -import com.hp.hpl.jena.rdf.model.NodeIterator; -import com.hp.hpl.jena.rdf.model.Property; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +30,8 @@ import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.NodeIterator; +import com.hp.hpl.jena.rdf.model.Property; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.ResourceFactory; @@ -47,6 +48,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; public class ABoxRecomputer { @@ -91,10 +93,15 @@ public class ABoxRecomputer { return recomputing; } + public void recompute() { + recompute(null); + } + /** - * Recompute all inferences. + * Recompute inferences for specified collection of individual URIs, + * or all URIs if parameter is null */ - public void recompute() { + public void recompute(Queue individualURIs) { synchronized (lock1) { if (recomputing) { return; @@ -111,7 +118,7 @@ public class ABoxRecomputer { } // Create a type cache for this execution and pass it to the recompute function // Ensures that caches are only valid for the length of one recompute - recomputeABox(new TypeCaches()); + recomputeABox(individualURIs, new TypeCaches()); } finally { if (searchIndexer != null) { searchIndexer.unpause(); @@ -123,21 +130,26 @@ public class ABoxRecomputer { } /* - * Recompute the entire ABox inference graph. + * Recompute the ABox inference graph for the specified collection of + * individual URIs, or all individuals if the collection is null. */ - protected void recomputeABox(TypeCaches caches) { - log.info("Recomputing ABox inferences."); - log.info("Finding individuals in ABox."); - Collection individuals = this.getAllIndividualURIs(); - log.info("Recomputing inferences for " + individuals.size() + " individuals"); + protected void recomputeABox(Queue individuals, TypeCaches caches) { + boolean printLog = false; + if (individuals == null) { + printLog = true; + log.info("Recomputing ABox inferences."); + log.info("Finding individuals in ABox."); + individuals = this.getAllIndividualURIs(); + log.info("Recomputing inferences for " + individuals.size() + " individuals"); + } long start = System.currentTimeMillis(); int numInds = 0; Model rebuildModel = ModelFactory.createDefaultModel(); Model additionalInferences = ModelFactory.createDefaultModel(); List individualsInBatch = new ArrayList(); - Iterator individualIt = individuals.iterator(); - while (individualIt.hasNext()) { - String individualURI = individualIt.next(); + //Iterator individualIt = individuals.iterator(); + while (!individuals.isEmpty()) { + String individualURI = individuals.poll(); try { additionalInferences.add(recomputeIndividual( individualURI, rebuildModel, caches)); @@ -145,7 +157,7 @@ public class ABoxRecomputer { individualsInBatch.add(individualURI); boolean batchFilled = (numInds % BATCH_SIZE) == 0; boolean reportingInterval = (numInds % REPORTING_INTERVAL) == 0; - if (batchFilled || !individualIt.hasNext()) { + if (batchFilled || individuals.isEmpty()) { log.debug(rebuildModel.size() + " total inferences"); updateInferenceModel(rebuildModel, individualsInBatch); rebuildModel.removeAll(); @@ -175,7 +187,9 @@ public class ABoxRecomputer { log.error("Unable to write additional inferences from reasoner plugins", e); } } - log.info("Finished recomputing inferences"); + if (printLog) { + log.info("Finished recomputing inferences"); + } } private static final boolean RUN_PLUGINS = true; @@ -330,7 +344,6 @@ public class ABoxRecomputer { mostSpecificTypes = getMostSpecificTypes(individual, assertedTypes); caches.cacheMostSpecificTypes(key, mostSpecificTypes); } - return mostSpecificTypes; } @@ -343,6 +356,8 @@ public class ABoxRecomputer { " FILTER NOT EXISTS { \n" + " <" + individual.getURI() + "> a ?type2 . \n" + " ?type2 <" + RDFS.subClassOf.getURI() + "> ?type. \n" + + " FILTER (?type != ?type2) \n" + + " FILTER NOT EXISTS { ?type <" + OWL.equivalentClass + "> ?type2 } \n" + " } \n" + " FILTER NOT EXISTS { \n" + " <" + individual.getURI() + "> <" + VitroVocabulary.MOST_SPECIFIC_TYPE + "> ?type \n" + @@ -395,8 +410,8 @@ public class ABoxRecomputer { /* * Get the URIs for all individuals in the system */ - protected Collection getAllIndividualURIs() { - HashSet individualURIs = new HashSet(); + protected Queue getAllIndividualURIs() { + Queue individualURIs = new ArrayDeque(); List classList = new ArrayList(); tboxModel.enterCriticalSection(Lock.READ); try { @@ -420,7 +435,7 @@ public class ABoxRecomputer { return individualURIs; } - protected void getIndividualURIs(String queryString, Set individuals) { + protected void getIndividualURIs(String queryString, Queue individuals) { int batchSize = 50000; int offset = 0; boolean done = false; @@ -522,11 +537,17 @@ public class ABoxRecomputer { builder.append("SELECT\n") .append(" ?object\n") - .append("WHERE\n") - .append("{\n") - .append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n") + .append("WHERE {\n") + .append(" GRAPH ?g { \n") + .append(" {\n") + .append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n") + .append(" } UNION {\n") + .append(" ?object <" + OWL.sameAs + "> <" + individualUri + "> .\n") + .append(" }\n") + .append(" } \n") + .append(" FILTER (?g != <" + ModelNames.ABOX_INFERENCES + ">)\n") .append("}\n"); - + rdfService.sparqlSelectQuery(builder.toString(), new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java index 524ab7d7f..7a75ba3d0 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java @@ -8,7 +8,9 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; @@ -75,6 +77,8 @@ public class SimpleReasoner extends StatementListener { VitroModelFactory.createOntologyModel()) .createAnnotationProperty(mostSpecificTypePropertyURI); + private Queue individualURIqueue = new ConcurrentLinkedQueue(); + // DeltaComputer private CumulativeDeltaModeler aBoxDeltaModeler1 = null; private CumulativeDeltaModeler aBoxDeltaModeler2 = null; @@ -183,6 +187,31 @@ public class SimpleReasoner extends StatementListener { return this.doSameAs; } + private void listenToStatement(Statement stmt) { + if(stmt.getSubject().isURIResource()) { + if (!individualURIqueue.contains(stmt.getSubject().getURI())) { + individualURIqueue.add(stmt.getSubject().getURI()); + } + } + if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) { + if (!individualURIqueue.contains(stmt.getObject().asResource().getURI())) { + individualURIqueue.add(stmt.getObject().asResource().getURI()); + } + } + if(!accumulateChanges || individualURIqueue.size() > SAFETY_VALVE) { + recomputeIndividuals(); + } + } + + private static final int SAFETY_VALVE = 1000000; // one million individuals + + private void recomputeIndividuals() { + recomputer.recompute(individualURIqueue); + individualURIqueue.clear(); + } + + private boolean accumulateChanges = false; + /* * Performs incremental ABox reasoning based * on the addition of a new statement @@ -190,21 +219,24 @@ public class SimpleReasoner extends StatementListener { */ @Override public void addedStatement(Statement stmt) { - try { - if (stmt.getPredicate().equals(RDF.type)) { - addedABoxTypeAssertion(stmt, inferenceModel, new HashSet()); - setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); - } else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) { - addedABoxSameAsAssertion(stmt, inferenceModel); - } else { - addedABoxAssertion(stmt, inferenceModel); - } - - doPlugins(ModelUpdate.Operation.ADD,stmt); - - } catch (Exception e) { // don't stop the edit if there's an exception - log.error("Exception while computing inferences: " + e.getMessage()); - } + doPlugins(ModelUpdate.Operation.ADD,stmt); + listenToStatement(stmt); + +// try { +// if (stmt.getPredicate().equals(RDF.type)) { +// addedABoxTypeAssertion(stmt, inferenceModel, new HashSet()); +// setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); +// } else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) { +// addedABoxSameAsAssertion(stmt, inferenceModel); +// } else { +// addedABoxAssertion(stmt, inferenceModel); +// } +// +// doPlugins(ModelUpdate.Operation.ADD,stmt); +// +// } catch (Exception e) { // don't stop the edit if there's an exception +// log.error("Exception while computing inferences: " + e.getMessage()); +// } } /* @@ -214,11 +246,13 @@ public class SimpleReasoner extends StatementListener { */ @Override public void removedStatement(Statement stmt) { - try { - handleRemovedStatement(stmt); - } catch (Exception e) { // don't stop the edit if there's an exception - log.error("Exception while retracting inferences: ", e); - } + doPlugins(ModelUpdate.Operation.RETRACT,stmt); + listenToStatement(stmt); +// try { +// handleRemovedStatement(stmt); +// } catch (Exception e) { // don't stop the edit if there's an exception +// log.error("Exception while retracting inferences: ", e); +// } } /* @@ -1597,36 +1631,39 @@ public class SimpleReasoner extends StatementListener { if (event instanceof BulkUpdateEvent) { if (((BulkUpdateEvent) event).getBegin()) { + this.accumulateChanges = true; log.info("received a bulk update begin event"); - if (deltaComputerProcessing) { - eventCount++; - log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); - return; - } else { - batchMode = 1; - if (aBoxDeltaModeler1.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode."); - } - - if (aBoxDeltaModeler2.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode."); - } - - log.info("initializing batch mode 1"); - } +// if (deltaComputerProcessing) { +// eventCount++; +// log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); +// return; +// } else { +// batchMode = 1; +// if (aBoxDeltaModeler1.getRetractions().size() > 0) { +// log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode."); +// } +// +// if (aBoxDeltaModeler2.getRetractions().size() > 0) { +// log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode."); +// } +// +// log.info("initializing batch mode 1"); +// } } else { log.info("received a bulk update end event"); - if (!deltaComputerProcessing) { - deltaComputerProcessing = true; - VitroBackgroundThread thread = new VitroBackgroundThread(new DeltaComputer(), - "SimpleReasoner.DeltaComputer"); - thread.setWorkLevel(WORKING); - thread.start(); - } else { - eventCount--; - log.info("received a bulk update end event while currently processing in aynchronous mode. Event count = " + eventCount); - } + this.accumulateChanges = false; + recomputeIndividuals(); +// if (!deltaComputerProcessing) { +// deltaComputerProcessing = true; +// VitroBackgroundThread thread = new VitroBackgroundThread(new DeltaComputer(), +// "SimpleReasoner.DeltaComputer"); +// thread.setWorkLevel(WORKING); +// thread.start(); +// } else { +// eventCount--; +// log.info("received a bulk update end event while currently processing in aynchronous mode. Event count = " + eventCount); +// } } } }