diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java index 21ba21aea..0be8b50f5 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.ReadWrite; import com.hp.hpl.jena.update.GraphStore; import com.hp.hpl.jena.update.GraphStoreFactory; import com.hp.hpl.jena.update.UpdateAction; @@ -96,17 +97,26 @@ public class SparqlUpdateApiController extends VitroApiServlet { } private void executeUpdate(HttpServletRequest req, UpdateRequest parsed) { - ServletContext ctx = req.getSession().getServletContext(); VitroRequest vreq = new VitroRequest(req); - SearchIndexer indexer = ApplicationUtils.instance().getSearchIndexer(); - indexer.pause(); - try { - Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService()); - GraphStore graphStore = GraphStoreFactory.create(ds); + Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService()); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(indexer != null) { + indexer.pause(); + } + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + } UpdateAction.execute(parsed, graphStore); } finally { - indexer.unpause(); + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + if(indexer != null) { + indexer.unpause(); + } } } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/JenaIngestController.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/JenaIngestController.java index a843a894e..f334f280a 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/JenaIngestController.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/JenaIngestController.java @@ -73,6 +73,7 @@ import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess.WhichService; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.utils.SparqlQueryUtils; import edu.cornell.mannlib.vitro.webapp.utils.jena.JenaIngestUtils; import edu.cornell.mannlib.vitro.webapp.utils.jena.JenaIngestUtils.MergeResult; @@ -706,14 +707,13 @@ public class JenaIngestController extends BaseEditController { private void doClearModel(String modelName, ModelMaker modelMaker) { Model m = modelMaker.getModel(modelName); - OntModel o = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM,m); + OntModel o = VitroModelFactory.createOntologyModel(m); o.enterCriticalSection(Lock.WRITE); try { - o.removeAll(null,null,null); + o.removeAll(); } finally { o.leaveCriticalSection(); } - // removeAll() doesn't work with the listeners! } private void doLoadRDFData(String modelName, String docLoc, String filePath, String language, ModelMaker modelMaker) { diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java index 2d305c5c5..bcdc408b2 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/controller/jena/RDFUploadController.java @@ -44,6 +44,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess.WhichService; +import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; @@ -202,9 +203,20 @@ public class RDFUploadController extends JenaIngestController { } if (aboxModel != null) { aboxChangeModel = uploadModel.remove(tboxChangeModel); - aboxstmtCount = operateOnModel(request.getUnfilteredWebappDaoFactory(), - aboxModel, aboxChangeModel, ontModelSelector, - remove, makeClassgroups, loginBean.getUserURI()); + aboxstmtCount = aboxChangeModel.size(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + aboxChangeModel.write(os, "N3"); + ByteArrayInputStream in = new ByteArrayInputStream(os.toByteArray()); + if(!remove) { + readIntoModel(in, "N3", request.getRDFService(), + ModelNames.ABOX_ASSERTIONS); + } else { + removeFromModel(in, "N3", request.getRDFService(), + ModelNames.ABOX_ASSERTIONS); + } +// operateOnModel(request.getUnfilteredWebappDaoFactory(), +// aboxModel, aboxChangeModel, ontModelSelector, +// remove, makeClassgroups, loginBean.getUserURI()); } request.setAttribute("uploadDesc", uploadDesc + ". " + verb + " " + (tboxstmtCount + aboxstmtCount) + " statements."); @@ -225,9 +237,19 @@ public class RDFUploadController extends JenaIngestController { } } + private static final boolean BEGIN = true; + private static final boolean END = !BEGIN; + + private ChangeSet makeChangeSet(RDFService rdfService) { + ChangeSet cs = rdfService.manufactureChangeSet(); + cs.addPreChangeEvent(new BulkUpdateEvent(null, BEGIN)); + cs.addPostChangeEvent(new BulkUpdateEvent(null, END)); + return cs; + } + private void addUsingRDFService(InputStream in, String languageStr, RDFService rdfService) { - ChangeSet changeSet = rdfService.manufactureChangeSet(); + ChangeSet changeSet = makeChangeSet(rdfService); RDFService.ModelSerializationFormat format = ("RDF/XML".equals(languageStr) || "RDF/XML-ABBREV".equals(languageStr)) @@ -333,7 +355,7 @@ public class RDFUploadController extends JenaIngestController { RDFService rdfService = new RDFServiceModel(mainModel); ByteArrayOutputStream out = new ByteArrayOutputStream(); changesModel.write(out, "N-TRIPLE"); - ChangeSet cs = rdfService.manufactureChangeSet(); + ChangeSet cs = makeChangeSet(rdfService); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); cs.addRemoval(in, RDFService.ModelSerializationFormat.NTRIPLE, null); try { @@ -398,7 +420,7 @@ public class RDFUploadController extends JenaIngestController { private void readIntoModel(InputStream in, String language, RDFService rdfService, String modelName) { - ChangeSet cs = rdfService.manufactureChangeSet(); + ChangeSet cs = makeChangeSet(rdfService); cs.addAddition(in, RDFServiceUtils.getSerializationFormatFromJenaString( language), modelName); try { @@ -408,6 +430,18 @@ public class RDFUploadController extends JenaIngestController { } } + private void removeFromModel(InputStream in, String language, + RDFService rdfService, String modelName) { + ChangeSet cs = makeChangeSet(rdfService); + cs.addRemoval(in, RDFServiceUtils.getSerializationFormatFromJenaString( + language), modelName); + try { + rdfService.changeSetUpdate(cs); + } catch (RDFServiceException e) { + throw new RuntimeException(e); + } + } + private void forwardToFileUploadError( String errrorMsg , HttpServletRequest req, HttpServletResponse response) diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ABoxJenaChangeListener.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ABoxJenaChangeListener.java deleted file mode 100644 index aa34acb91..000000000 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ABoxJenaChangeListener.java +++ /dev/null @@ -1,40 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.dao.jena; - -import com.hp.hpl.jena.rdf.model.ModelChangedListener; - -import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames; - -public class ABoxJenaChangeListener extends JenaChangeListener { - - public ABoxJenaChangeListener(ModelChangedListener listener) { - super(listener); - ignoredGraphs.add(ModelNames.ABOX_INFERENCES); - ignoredGraphs.add(ModelNames.TBOX_ASSERTIONS); - ignoredGraphs.add(ModelNames.TBOX_INFERENCES); - } - - @Override - public void addedStatement(String serializedTriple, String graphURI) { - if (isABoxGraph(graphURI)) { - super.addedStatement(serializedTriple, graphURI); - } - } - - @Override - public void removedStatement(String serializedTriple, String graphURI) { - if (isABoxGraph(graphURI)) { - super.removedStatement(serializedTriple, graphURI); - } - } - - private boolean isABoxGraph(String graphURI) { - return (graphURI == null || - ModelNames.ABOX_ASSERTIONS.equals(graphURI) - || (!ignoredGraphs.contains(graphURI) - && !graphURI.contains("filegraph") - && !graphURI.contains("tbox"))); - } - -} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java deleted file mode 100644 index 9304d99a7..000000000 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java +++ /dev/null @@ -1,89 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.dao.jena; - -import java.io.ByteArrayInputStream; -import java.io.UnsupportedEncodingException; -import java.util.HashSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.hp.hpl.jena.rdf.model.Model; -import com.hp.hpl.jena.rdf.model.ModelChangedListener; -import com.hp.hpl.jena.rdf.model.ModelFactory; -import com.hp.hpl.jena.rdf.model.Statement; -import com.hp.hpl.jena.rdf.model.StmtIterator; - -import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; -import edu.cornell.mannlib.vitro.webapp.servlet.setup.SimpleReasonerSetup; - -/** - * A ChangeListener that forwards events to a Jena ModelChangedListener - * @author bjl23 - * - */ -public class JenaChangeListener implements ChangeListener { - - private static final Log log = LogFactory.getLog(JenaChangeListener.class); - - protected HashSet ignoredGraphs = new HashSet(); - - private ModelChangedListener listener; - private Model m = ModelFactory.createDefaultModel(); - - public JenaChangeListener(ModelChangedListener listener) { - this.listener = listener; - ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_REBUILD); - ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); - } - - @Override - public void addedStatement(String serializedTriple, String graphURI) { - if (isRelevantGraph(graphURI)) { - listener.addedStatement(parseTriple(serializedTriple)); - } - } - - @Override - public void removedStatement(String serializedTriple, String graphURI) { - if (isRelevantGraph(graphURI)) { - listener.removedStatement(parseTriple(serializedTriple)); - } - } - - private boolean isRelevantGraph(String graphURI) { - return (graphURI == null || !ignoredGraphs.contains(graphURI)); - } - - @Override - public void notifyEvent(String graphURI, Object event) { - log.debug("event: " + event.getClass()); - listener.notifyEvent(m, event); - } - - // TODO avoid overhead of Model - private Statement parseTriple(String serializedTriple) { - try { - Model m = ModelFactory.createDefaultModel(); - m.read(new ByteArrayInputStream( - serializedTriple.getBytes("UTF-8")), null, "N3"); - StmtIterator sit = m.listStatements(); - if (!sit.hasNext()) { - throw new RuntimeException("no triple parsed from change event"); - } else { - Statement s = sit.nextStatement(); - if (sit.hasNext()) { - log.warn("More than one triple parsed from change event"); - } - return s; - } - } catch (RuntimeException riot) { - log.error("Unable to parse triple " + serializedTriple, riot); - throw riot; - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException(uee); - } - } - -} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ModelContext.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ModelContext.java index 120de2b2a..d83aa3438 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ModelContext.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/ModelContext.java @@ -35,8 +35,7 @@ public class ModelContext { public static void registerListenerForChanges(ServletContext ctx, ModelChangedListener ml){ try { - RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( - new JenaChangeListener(ml)); + RDFServiceUtils.getRDFServiceFactory(ctx).registerJenaModelChangedListener(ml); } catch (RDFServiceException e) { log.error(e,e); } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java index ae2bbfc34..b4f84cc18 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java @@ -5,8 +5,13 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.util.ArrayList; import java.util.Iterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.graph.TransactionHandler; import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.LabelExistsException; import com.hp.hpl.jena.query.ReadWrite; @@ -21,6 +26,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; public class RDFServiceDataset implements Dataset { private RDFServiceDatasetGraph g; + private ReadWrite transactionMode; public RDFServiceDataset(RDFServiceDatasetGraph g) { this.g = g; @@ -55,9 +61,13 @@ public class RDFServiceDataset implements Dataset { return g.getLock(); } + private final static Log log = LogFactory.getLog(RDFServiceDataset.class); + @Override public Model getNamedModel(String arg0) { - return RDFServiceGraph.createRDFServiceModel(g.getGraph(NodeFactory.createURI(arg0))); + Model model = RDFServiceGraph.createRDFServiceModel( + g.getGraph(NodeFactory.createURI(arg0))); + return model; } @Override @@ -108,36 +118,60 @@ public class RDFServiceDataset implements Dataset { @Override public boolean supportsTransactions() { - return false; + if (g.getDefaultGraph().getTransactionHandler() == null) { + return false; + } else { + return g.getDefaultGraph().getTransactionHandler().transactionsSupported(); + } } @Override public boolean isInTransaction() { - return false; + return (transactionMode != null); } + + private boolean supportsTransactions(Graph graph) { + return (graph.getTransactionHandler() != null + && graph.getTransactionHandler().transactionsSupported()); + } @Override public void begin(ReadWrite arg0) { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + this.transactionMode = arg0; + g.begin(arg0); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if (supportsTransactions(graph)) { + graph.getTransactionHandler().begin(); + } + } } @Override public void commit() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if(supportsTransactions(graph)) { + graph.getTransactionHandler().commit(); + } + } } @Override public void abort() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if(supportsTransactions(graph)) { + graph.getTransactionHandler().abort(); + } + } } @Override public void end() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + // the Graph tranaction handlers don't seem to support .end() + this.transactionMode = null; + g.end(); } @Override diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java index b6a60f6e1..5553a55aa 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java @@ -6,18 +6,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.NodeFactory; import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; +import com.hp.hpl.jena.query.ReadWrite; import com.hp.hpl.jena.shared.Lock; import com.hp.hpl.jena.shared.LockMRSW; import com.hp.hpl.jena.sparql.core.DatasetGraph; import com.hp.hpl.jena.sparql.core.Quad; -import com.hp.hpl.jena.sparql.resultset.JSONInput; import com.hp.hpl.jena.sparql.util.Context; import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator; @@ -32,11 +33,25 @@ public class RDFServiceDatasetGraph implements DatasetGraph { private RDFService rdfService; private Lock lock = new LockMRSW(); private Context context = new Context() ; - + private Map graphCache = new ConcurrentHashMap(); + private ReadWrite transactionMode; + public RDFServiceDatasetGraph(RDFService rdfService) { this.rdfService = rdfService; } + public Map getGraphCache() { + return graphCache; + } + + public void begin(ReadWrite mode) { + this.transactionMode = mode; + } + + public void end() { + this.transactionMode = null; + } + private Graph getGraphFor(Quad q) { return getGraphFor(q.getGraph()); } @@ -44,7 +59,7 @@ public class RDFServiceDatasetGraph implements DatasetGraph { private Graph getGraphFor(Node g) { return (g == Node.ANY) ? new RDFServiceGraph(rdfService) - : new RDFServiceGraph(rdfService, g.getURI()); + : getGraph(g); } @Override @@ -172,10 +187,25 @@ public class RDFServiceDatasetGraph implements DatasetGraph { public RDFServiceGraph getDefaultGraph() { return new RDFServiceGraph(rdfService); } - + @Override public RDFServiceGraph getGraph(Node arg0) { - return new RDFServiceGraph(rdfService, arg0.getURI()); + String graphURI = arg0.getURI(); + if(graphCache.containsKey(graphURI)) { + return graphCache.get(graphURI); + } else { + RDFServiceGraph graph = new RDFServiceGraph(rdfService, arg0.getURI()); + graphCache.put(graphURI, graph); + if(transactionMode != null && supportsTransactions(graph)) { + graph.getTransactionHandler().begin(); + } + return graph; + } + } + + private boolean supportsTransactions(Graph graph) { + return (graph.getTransactionHandler() != null + && graph.getTransactionHandler().transactionsSupported()); } @Override @@ -205,13 +235,11 @@ public class RDFServiceDatasetGraph implements DatasetGraph { @Override public void removeGraph(Node arg0) { // TODO Auto-generated method stub - } @Override public void setDefaultGraph(Graph arg0) { // TODO Auto-generated method stub - } @Override @@ -229,4 +257,5 @@ public class RDFServiceDatasetGraph implements DatasetGraph { return "RDFServiceDatasetGraph[" + ToString.hashHex(this) + ", " + rdfService + "]"; } + } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index ff262da04..a8ad33786 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -5,9 +5,9 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -23,14 +23,15 @@ import com.hp.hpl.jena.graph.TripleMatch; import com.hp.hpl.jena.graph.impl.GraphWithPerform; import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.shared.AddDeniedException; +import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; import com.hp.hpl.jena.shared.PrefixMapping; import com.hp.hpl.jena.shared.impl.PrefixMappingImpl; -import com.hp.hpl.jena.sparql.resultset.JSONInput; import com.hp.hpl.jena.util.iterator.ExtendedIterator; import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator; @@ -38,6 +39,7 @@ import com.hp.hpl.jena.util.iterator.WrappedIterator; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; @@ -51,6 +53,10 @@ public class RDFServiceGraph implements GraphWithPerform { private BulkUpdateHandler bulkUpdateHandler; private PrefixMapping prefixMapping = new PrefixMappingImpl(); private GraphEventManager eventManager; + + private boolean inTransaction = false; + private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); + private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); /** * Returns a SparqlGraph for the union of named graphs in a remote repository @@ -90,30 +96,82 @@ public class RDFServiceGraph implements GraphWithPerform { .append(sparqlNodeUpdate(t.getObject(), "")).append(" ."); return sb.toString(); } - - @Override - public void performAdd(Triple t) { + private synchronized void flush() { ChangeSet changeSet = rdfService.manufactureChangeSet(); try { - changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), - RDFService.ModelSerializationFormat.N3, graphURI); + if(!removalsGraph.isEmpty()) { + String removals = serializeGraph(removalsGraph); + changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), + RDFService.ModelSerializationFormat.N3, graphURI); + removalsGraph.clear(); + } + if(!additionsGraph.isEmpty()) { + String additions = serializeGraph(additionsGraph); + changeSet.addAddition(RDFServiceUtils.toInputStream(additions), + RDFService.ModelSerializationFormat.N3, graphURI); + additionsGraph.clear(); + } rdfService.changeSetUpdate(changeSet); } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } - + } + + private synchronized String serializeGraph(Graph graph) { + String triples = ""; + Iterator tripIt = graph.find(null, null, null); + while(tripIt.hasNext()) { + triples += " \n" + serialize(tripIt.next()); + } + return triples; + } + + @Override + public void performAdd(Triple t) { + if(inTransaction) { + stageAddition(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private void stageAddition(Triple t) { + if(removalsGraph.contains(t)) { + removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + additionsGraph.add(t); + } } @Override public void performDelete(Triple t) { - ChangeSet changeSet = rdfService.manufactureChangeSet(); - try { - changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), - RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); - } catch (RDFServiceException rdfse) { - throw new RuntimeException(rdfse); + if(inTransaction) { + stageDeletion(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private synchronized void stageDeletion(Triple t) { + if(additionsGraph.contains(t)) { + additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + removalsGraph.add(t); } } @@ -124,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform { } String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; try { - InputStream model = rdfService.sparqlConstructQuery( - constructStr, RDFService.ModelSerializationFormat.N3); - ChangeSet changeSet = rdfService.manufactureChangeSet(); - changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); + if(inTransaction) { + Model model = ModelFactory.createDefaultModel(); + rdfService.sparqlConstructQuery(constructStr, model); + stageRemoveAll(model); + } else { + InputStream model = rdfService.sparqlConstructQuery( + constructStr, RDFService.ModelSerializationFormat.N3); + ChangeSet changeSet = rdfService.manufactureChangeSet(); + changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } } + private void stageRemoveAll(Model removals) { + StmtIterator sit = removals.listStatements(); + while (sit.hasNext()) { + Triple t = sit.nextStatement().asTriple(); + stageDeletion(t); + } + } + @Override public void close() { // can't close a remote endpoint @@ -167,7 +239,22 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); - return consumer.hasResult(); + boolean initialResult = consumer.hasResult(); + if(!inTransaction) { + return initialResult; + } else { + Triple t = Triple.create(subject, predicate, object); + return (initialResult || additionsGraphContains(t)) + && !removalsGraphContains(t); + } + } + + private synchronized boolean additionsGraphContains(Triple t) { + return additionsGraph.contains(t); + } + + private synchronized boolean removalsGraphContains(Triple t) { + return removalsGraph.contains(t); } @Override @@ -260,7 +347,11 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - + if(inTransaction) { + addAdditions(triplist, additionsGraph.find(subject, predicate, object)); + subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); + } + execSelect(queryString, new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { @@ -287,6 +378,24 @@ public class RDFServiceGraph implements GraphWithPerform { return WrappedIterator.create(triplist.iterator()); } + private void addAdditions(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(!tripList.contains(t)) { + tripList.add(t); + } + } + } + + private void subtractRemovals(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(tripList.contains(t)) { + tripList.remove(t); + } + } + } + private boolean isVar(Node node) { return (node == null || node.isVariable() || node == Node.ANY); } @@ -321,12 +430,11 @@ public class RDFServiceGraph implements GraphWithPerform { @Override public GraphStatisticsHandler getStatisticsHandler() { return null; - } - + } + @Override public TransactionHandler getTransactionHandler() { - // TODO Auto-generated method stub - return null; + return transactionHandler; } @Override @@ -398,11 +506,42 @@ public class RDFServiceGraph implements GraphWithPerform { public boolean iteratorRemoveAllowed() { return false; } - + @Override public boolean sizeAccurate() { return true; } + }; + + private final TransactionHandler transactionHandler = new TransactionHandler() { + @Override + public synchronized void abort() { + inTransaction = false; + removalsGraph.clear(); + additionsGraph.clear(); + } + + @Override + public synchronized void begin() { + inTransaction = true; + } + + @Override + public synchronized void commit() { + flush(); + inTransaction = false; + } + + @Override + public Object executeInTransaction(Command arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean transactionsSupported() { + return true; + } }; private void execSelect(String queryStr, ResultSetConsumer consumer) { diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java index 9d2c36bad..baff88223 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java @@ -16,6 +16,7 @@ import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.ontology.OntModel; import com.hp.hpl.jena.rdf.listeners.StatementListener; +import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.shared.Lock; @@ -33,6 +34,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters; +import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException; @@ -161,9 +163,28 @@ public class VClassGroupCache implements SearchIndexer.Listener { VclassMap = classMap; } + private boolean paused = false; + private boolean updateRequested = false; + + public void pause() { + this.paused = true; + } + + public void unpause() { + this.paused = false; + if(updateRequested) { + updateRequested = false; + requestCacheUpdate(); + } + } + public void requestCacheUpdate() { - log.debug("requesting update"); - _cacheRebuildThread.informOfQueueChange(); + log.debug("requesting update"); + if(paused) { + updateRequested = true; + } else { + _cacheRebuildThread.informOfQueueChange(); + } } protected void requestStop() { @@ -460,20 +481,29 @@ public class VClassGroupCache implements SearchIndexer.Listener { log.debug("subject: " + stmt.getSubject().getURI()); log.debug("predicate: " + stmt.getPredicate().getURI()); } - if (RDF.type.getURI().equals(stmt.getPredicate().getURI())) { + if (RDF.type.equals(stmt.getPredicate())) { requestCacheUpdate(); } else if (VitroVocabulary.IN_CLASSGROUP.equals(stmt.getPredicate().getURI())) { requestCacheUpdate(); } else if(VitroVocabulary.DISPLAY_RANK.equals(stmt.getPredicate().getURI())){ requestCacheUpdate(); - } else { - OntModel jenaOntModel = ModelAccess.on(context).getOntModel(); + } else if (RDFS.label.equals(stmt.getPredicate())){ + OntModel jenaOntModel = ModelAccess.on(context).getOntModelSelector().getTBoxModel(); if( isClassNameChange(stmt, jenaOntModel) ) { requestCacheUpdate(); } } } + public void notifyEvent(Model model, Object event) { + if (event instanceof BulkUpdateEvent) { + if(((BulkUpdateEvent) event).getBegin()) { + pause(); + } else { + unpause(); + } + } + } } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/ChangeListener.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/ChangeListener.java index 4caed0674..ddd9df5aa 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/ChangeListener.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/ChangeListener.java @@ -9,21 +9,12 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice; public interface ChangeListener { /** - * Override this to listen to all statements added to the RDF store. + * Override this to listen to each model change * - * @param serializedTriple - the added statement in n3 format - * @param graphURI - the graph to which the statement was added + * @param modelChange - the object representing the model change */ - public void addedStatement(String serializedTriple, String graphURI); - - /** - * Override this to listen to all statements removed from the RDF store. - * - * @param serializedTriple - the removed statement in n3 format - * @param graphURI - the graph from which the statement was removed - */ - public void removedStatement(String serializedTriple, String graphURI); - + public void notifyModelChange(ModelChange modelChange); + /** * Override this to listen to events pertaining to the given graphURI. * diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFService.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFService.java index f098bab9a..5aa0b9a15 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFService.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFService.java @@ -2,12 +2,13 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice; -import com.hp.hpl.jena.rdf.model.Model; - import java.io.InputStream; import java.io.OutputStream; import java.util.List; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; + /** * Interface for API to write, read, and update Vitro's RDF store, with support * to allow listening, logging and auditing. @@ -208,20 +209,38 @@ public interface RDFService { public boolean isEquivalentGraph(String graphURI, Model graph) throws RDFServiceException; /** - * Registers a listener to listen to changes in any graph in + * Registers a Jena listener to listen to changes in any graph in * the RDF store. * * @param changeListener - the change listener */ - public void registerListener(ChangeListener changeListener) throws RDFServiceException; + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException; /** - * Unregisters a listener from listening to changes in + * Unregisters a Jena listener from listening to changes in * any graph in the RDF store * * @param changeListener - the change listener */ - public void unregisterListener(ChangeListener changeListener) throws RDFServiceException; + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException; + + /** + * Registers a listener to listen to changes in any graph in + * the RDF store. + * + * @param changeListener - the change listener + */ + public void registerListener(ChangeListener changeListener) throws RDFServiceException; + + /** + * Unregisters a listener from listening to changes in + * any graph in the RDF store + * + * @param changeListener - the change listener + */ + public void unregisterListener(ChangeListener changeListener) throws RDFServiceException; /** * Creates a ChangeSet object diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFServiceFactory.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFServiceFactory.java index 250e124b5..5497cbfc6 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFServiceFactory.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/RDFServiceFactory.java @@ -2,6 +2,8 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; + public interface RDFServiceFactory { /** @@ -37,11 +39,29 @@ public interface RDFServiceFactory { /** * Unregisters a listener from listening to changes in the RDF store. - * Any RDFService objects returned by this factory should notify + * Any RDFService objects returned by this factory should no longer notify * this listener of changes. * * @param changeListener - the change listener */ public void unregisterListener(ChangeListener changeListener) throws RDFServiceException; + /** + * Registers a Jena ModelChangedListener to listen to changes in any graph in + * the RDF store. Any RDFService objects returned by this factory + * should notify this listener of changes. + * + * @param changeListener - the change listener + */ + public void registerJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException; + + /** + * Unregisters a Jena ModelChangedListener from listening to changes in the RDF store. + * Any RDFService objects returned by this factory should no longer notify + * this listener of changes. + * + * @param changeListener - the change listener + */ + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException; + } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/LanguageFilteringRDFService.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/LanguageFilteringRDFService.java index 145e57c9a..d58643e8f 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/LanguageFilteringRDFService.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/LanguageFilteringRDFService.java @@ -12,7 +12,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -22,6 +21,7 @@ import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.query.ResultSetFormatter; import com.hp.hpl.jena.rdf.model.Literal; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.StmtIterator; @@ -30,6 +30,7 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; public class LanguageFilteringRDFService implements RDFService { @@ -440,7 +441,6 @@ public class LanguageFilteringRDFService implements RDFService { @Override public void registerListener(ChangeListener changeListener) throws RDFServiceException { - // TODO Auto-generated method stub s.registerListener(changeListener); } @@ -449,6 +449,19 @@ public class LanguageFilteringRDFService implements RDFService { throws RDFServiceException { s.unregisterListener(changeListener); } + + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + s.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + s.unregisterJenaModelChangedListener(changeListener); + } + @Override public ChangeSet manufactureChangeSet() { diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/SameAsFilteringRDFServiceFactory.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/SameAsFilteringRDFServiceFactory.java index 5f473ad0e..40b9e4666 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/SameAsFilteringRDFServiceFactory.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/filter/SameAsFilteringRDFServiceFactory.java @@ -10,7 +10,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -23,6 +22,7 @@ import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.query.ResultSetFormatter; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Resource; @@ -36,6 +36,7 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService.ModelSerializationFormat; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceFactory; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; @@ -76,6 +77,16 @@ public class SameAsFilteringRDFServiceFactory implements RDFServiceFactory { f.registerListener(changeListener); } + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException { + f.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException { + f.registerJenaModelChangedListener(changeListener); + } + public class SameAsFilteringRDFService extends RDFServiceImpl implements RDFService { private final Log log = LogFactory.getLog(SameAsFilteringRDFService.class); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceFactorySingle.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceFactorySingle.java index ec159b0be..8c940163f 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceFactorySingle.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceFactorySingle.java @@ -7,6 +7,8 @@ import java.io.OutputStream; import java.util.List; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; + import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; @@ -48,6 +50,16 @@ public class RDFServiceFactorySingle implements RDFServiceFactory { this.rdfService.unregisterListener(listener); } + @Override + public void registerJenaModelChangedListener(ModelChangedListener listener) throws RDFServiceException { + this.rdfService.registerJenaModelChangedListener(listener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener listener) throws RDFServiceException { + this.rdfService.unregisterJenaModelChangedListener(listener); + } + public class UnclosableRDFService implements RDFService { private RDFService s; @@ -162,6 +174,18 @@ public class RDFServiceFactorySingle implements RDFServiceFactory { throws RDFServiceException { s.unregisterListener(changeListener); } + + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + s.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + s.unregisterJenaModelChangedListener(changeListener); + } @Override public ChangeSet manufactureChangeSet() { diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java index 5e7c5aed3..0a11d55b7 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java @@ -3,12 +3,12 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.impl; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -18,20 +18,21 @@ import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.query.Query; import com.hp.hpl.jena.query.QueryFactory; import com.hp.hpl.jena.query.QueryParseException; -import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.Syntax; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.StmtIterator; -import com.hp.hpl.jena.sparql.resultset.XMLInput; import com.hp.hpl.jena.vocabulary.RDF; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange.Operation; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; public abstract class RDFServiceImpl implements RDFService { @@ -42,6 +43,7 @@ public abstract class RDFServiceImpl implements RDFService { protected String defaultWriteGraphURI; protected List registeredListeners = new CopyOnWriteArrayList(); + protected List registeredJenaListeners = new CopyOnWriteArrayList(); @Override public void newIndividual(String individualURI, @@ -87,7 +89,6 @@ public abstract class RDFServiceImpl implements RDFService { @Override public synchronized void registerListener(ChangeListener changeListener) throws RDFServiceException { - if (!registeredListeners.contains(changeListener)) { registeredListeners.add(changeListener); } @@ -97,41 +98,87 @@ public abstract class RDFServiceImpl implements RDFService { public synchronized void unregisterListener(ChangeListener changeListener) throws RDFServiceException { registeredListeners.remove(changeListener); } + + @Override + public synchronized void registerJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException { + if (!registeredJenaListeners.contains(changeListener)) { + registeredJenaListeners.add(changeListener); + } + } + + @Override + public synchronized void unregisterJenaModelChangedListener(ModelChangedListener changeListener) throws RDFServiceException { + registeredJenaListeners.remove(changeListener); + } public synchronized List getRegisteredListeners() { return this.registeredListeners; } + public synchronized List getRegisteredJenaModelChangedListeners() { + return this.registeredJenaListeners; + } + @Override public ChangeSet manufactureChangeSet() { return new ChangeSetImpl(); } - // I switched the following two methods back to public so they could be - // used by the ListeningGraph, which is common to both implementations. - // This could probably be improved later. BJL + protected void notifyListenersOfChanges(ChangeSet changeSet) + throws IOException { + if (registeredListeners.isEmpty() && registeredJenaListeners.isEmpty()) { + return; + } + for (ModelChange modelChange: changeSet.getModelChanges()) { + notifyListeners(modelChange); + } + } - public void notifyListeners(Triple triple, ModelChange.Operation operation, String graphURI) { + protected void notifyListeners(ModelChange modelChange) throws IOException { + modelChange.getSerializedModel().reset(); Iterator iter = registeredListeners.iterator(); - while (iter.hasNext()) { ChangeListener listener = iter.next(); - if (operation == ModelChange.Operation.ADD) { - listener.addedStatement(sparqlTriple(triple), graphURI); - } else { - listener.removedStatement(sparqlTriple(triple).toString(), graphURI); - } + listener.notifyModelChange(modelChange); + } + log.debug(registeredJenaListeners.size() + " registered Jena listeners"); + if (registeredJenaListeners.isEmpty()) { + return; + } + modelChange.getSerializedModel().reset(); + Model tempModel = ModelFactory.createDefaultModel(); + Iterator jenaIter = registeredJenaListeners.iterator(); + while (jenaIter.hasNext()) { + ModelChangedListener listener = jenaIter.next(); + log.debug("\t" + listener.getClass().getSimpleName()); + tempModel.register(listener); + } + if (Operation.ADD.equals(modelChange.getOperation())) { + tempModel.read(modelChange.getSerializedModel(), null, + RDFServiceUtils.getSerializationFormatString( + modelChange.getSerializationFormat())); + } else if (Operation.REMOVE.equals(modelChange.getOperation())) { + tempModel.remove(RDFServiceUtils.parseModel( + modelChange.getSerializedModel(), + modelChange.getSerializationFormat())); + } + while (jenaIter.hasNext()) { + tempModel.unregister(jenaIter.next()); } } public void notifyListenersOfEvent(Object event) { Iterator iter = registeredListeners.iterator(); - while (iter.hasNext()) { ChangeListener listener = iter.next(); // TODO what is the graphURI parameter for? listener.notifyEvent(null, event); } + Iterator jenaIter = registeredJenaListeners.iterator(); + while (jenaIter.hasNext()) { + ModelChangedListener listener = jenaIter.next(); + listener.notifyEvent(null, event); + } } protected boolean isPreconditionSatisfied(String query, diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/ListeningGraph.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/ListeningGraph.java deleted file mode 100644 index 8fb2752e4..000000000 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/ListeningGraph.java +++ /dev/null @@ -1,236 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.hp.hpl.jena.graph.BulkUpdateHandler; -import com.hp.hpl.jena.graph.Capabilities; -import com.hp.hpl.jena.graph.Graph; -import com.hp.hpl.jena.graph.GraphEventManager; -import com.hp.hpl.jena.graph.GraphStatisticsHandler; -import com.hp.hpl.jena.graph.Node; -import com.hp.hpl.jena.graph.TransactionHandler; -import com.hp.hpl.jena.graph.Triple; -import com.hp.hpl.jena.graph.TripleMatch; -import com.hp.hpl.jena.graph.impl.GraphWithPerform; -import com.hp.hpl.jena.graph.impl.SimpleBulkUpdateHandler; -import com.hp.hpl.jena.graph.impl.SimpleEventManager; -import com.hp.hpl.jena.shared.AddDeniedException; -import com.hp.hpl.jena.shared.DeleteDeniedException; -import com.hp.hpl.jena.shared.PrefixMapping; -import com.hp.hpl.jena.shared.impl.PrefixMappingImpl; -import com.hp.hpl.jena.util.iterator.ExtendedIterator; -import com.hp.hpl.jena.util.iterator.WrappedIterator; - -import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; -import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl; -import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; - -public class ListeningGraph implements GraphWithPerform { - - private static final Log log = LogFactory.getLog(ListeningGraph.class); - - private RDFServiceImpl rdfServiceImpl; - private String graphURI; - - private BulkUpdateHandler bulkUpdateHandler; - private GraphEventManager eventManager; - private PrefixMapping prefixMapping = new PrefixMappingImpl(); - - public ListeningGraph(String graphURI, RDFServiceImpl rdfServiceImpl) { - this.graphURI = graphURI; - this.rdfServiceImpl = rdfServiceImpl; - } - - @Override - public void add(Triple triple) throws AddDeniedException { - performAdd(triple); - } - - @Override - public void performAdd(Triple triple) throws AddDeniedException { - if (log.isDebugEnabled()) { - log.debug("adding " + triple + " to " + graphURI); - } - this.rdfServiceImpl.notifyListeners(triple, ModelChange.Operation.ADD, graphURI); - } - - @Override - public void delete(Triple triple) throws DeleteDeniedException { - performDelete(triple); - } - - @Override - public void performDelete(Triple triple) throws DeleteDeniedException { - if (log.isDebugEnabled()) { - log.debug("deleting " + triple + " from " + graphURI); - } - this.rdfServiceImpl.notifyListeners(triple, ModelChange.Operation.REMOVE, graphURI); - } - - @Override - public void close() { - // Nothing to close. - } - - @Override - public boolean contains(Triple arg0) { - return contains(arg0.getSubject(), arg0.getPredicate(), arg0.getObject()); - } - - @Override - public boolean contains(Node subject, Node predicate, Node object) { - return false; - } - - @Override - public boolean dependsOn(Graph arg0) { - return false; // who knows? - } - - @Override - public ExtendedIterator find(TripleMatch arg0) { - Triple t = arg0.asTriple(); - return find(t.getSubject(), t.getPredicate(), t.getObject()); - } - - @Override - public ExtendedIterator find(Node subject, Node predicate, Node object) { - List triplist = new ArrayList(); - return WrappedIterator.create(triplist.iterator()); - } - - @Override - public void clear() { - for (Triple t: find(null, null, null).toList()) { - delete(t); - } - } - - @Override - public void remove(Node subject, Node predicate, Node object) { - for (Triple t: find(subject, predicate, object).toList()) { - delete(t); - } - } - - @Override - @Deprecated - public BulkUpdateHandler getBulkUpdateHandler() { - if (this.bulkUpdateHandler == null) { - this.bulkUpdateHandler = new SimpleBulkUpdateHandler(this); - } - return this.bulkUpdateHandler; - } - - @Override - public Capabilities getCapabilities() { - return capabilities; - } - - @Override - public GraphEventManager getEventManager() { - if (eventManager == null) { - eventManager = new SimpleEventManager(this); - } - return eventManager; - } - - @Override - public PrefixMapping getPrefixMapping() { - return prefixMapping; - } - - @Override - public GraphStatisticsHandler getStatisticsHandler() { - return null; - } - - @Override - public TransactionHandler getTransactionHandler() { - return null; - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public boolean isEmpty() { - return !contains(null, null, null); - } - - @Override - public boolean isIsomorphicWith(Graph arg0) { - throw new UnsupportedOperationException("isIsomorphicWith() not supported " + - "by SPARQL graphs"); - } - - @Override - public int size() { - int size = find(null, null, null).toList().size(); - return size; - } - - private final static Capabilities capabilities = new Capabilities() { - - @Override - public boolean addAllowed() { - return false; - } - - @Override - public boolean addAllowed(boolean everyTriple) { - return false; - } - - @Override - public boolean canBeEmpty() { - return true; - } - - @Override - public boolean deleteAllowed() { - return false; - } - - @Override - public boolean deleteAllowed(boolean everyTriple) { - return false; - } - - @Override - public boolean findContractSafe() { - return true; - } - - @Override - public boolean handlesLiteralTyping() { - return true; - } - - @Override - public boolean iteratorRemoveAllowed() { - return false; - } - - @Override - public boolean sizeAccurate() { - return true; - } - }; - - @Override - public String toString() { - return "ListeningGraph["+ToString.hashHex(this) - + ", " + rdfServiceImpl - + ", " + ToString.modelName(graphURI) + "]"; - } - -} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/RDFServiceJena.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/RDFServiceJena.java index 8e07752fa..f7ec6797a 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/RDFServiceJena.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/RDFServiceJena.java @@ -91,16 +91,6 @@ public abstract class RDFServiceJena extends RDFServiceImpl implements RDFServic } } - protected void notifyListenersOfChanges(ChangeSet changeSet) - throws IOException { - for (ModelChange modelChange: changeSet.getModelChanges()) { - modelChange.getSerializedModel().reset(); - Model model = ModelFactory.createModelForGraph( - new ListeningGraph(modelChange.getGraphURI(), this)); - operateOnModel(model, modelChange, null); - } - } - protected void notifyListenersOfPostChangeEvents(ChangeSet changeSet) { for (Object o : changeSet.getPostChangeEvents()) { this.notifyListenersOfEvent(o); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/model/RDFServiceModel.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/model/RDFServiceModel.java index 95a0942cb..05bbfa72e 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/model/RDFServiceModel.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/model/RDFServiceModel.java @@ -12,14 +12,12 @@ import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.DatasetFactory; import com.hp.hpl.jena.rdf.model.Model; -import com.hp.hpl.jena.rdf.model.ModelFactory; import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; -import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.ListeningGraph; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.RDFServiceJena; public class RDFServiceModel extends RDFServiceJena implements RDFService { @@ -102,14 +100,15 @@ public class RDFServiceModel extends RDFServiceJena implements RDFService { } // notify listeners of triple changes - csIt = changeSet.getModelChanges().iterator(); - while (csIt.hasNext()) { - ModelChange modelChange = csIt.next(); - modelChange.getSerializedModel().reset(); - Model model = ModelFactory.createModelForGraph( - new ListeningGraph(modelChange.getGraphURI(), this)); - operateOnModel(model, modelChange, null); - } + notifyListenersOfChanges(changeSet); +// csIt = changeSet.getModelChanges().iterator(); +// while (csIt.hasNext()) { +// ModelChange modelChange = csIt.next(); +// modelChange.getSerializedModel().reset(); +// Model model = ModelFactory.createModelForGraph( +// new ListeningGraph(modelChange.getGraphURI(), this)); +// operateOnModel(model, modelChange, null); +// } for (Object o : changeSet.getPostChangeEvents()) { this.notifyListenersOfEvent(o); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceFactorySDB.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceFactorySDB.java index ad6d7863f..d6a7e761a 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceFactorySDB.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceFactorySDB.java @@ -7,6 +7,7 @@ import javax.sql.DataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.sdb.StoreDesc; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; @@ -41,6 +42,10 @@ public class RDFServiceFactorySDB implements RDFServiceFactory { .getRegisteredListeners() ) { rdfService.registerListener(cl); } + for (ModelChangedListener cl : ((RDFServiceSDB) longTermRDFService) + .getRegisteredJenaModelChangedListeners() ) { + rdfService.registerJenaModelChangedListener(cl); + } return rdfService; } catch (Exception e) { log.error(e,e); @@ -59,5 +64,17 @@ public class RDFServiceFactorySDB implements RDFServiceFactory { throws RDFServiceException { this.longTermRDFService.unregisterListener(changeListener); } + + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + this.longTermRDFService.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + this.longTermRDFService.unregisterJenaModelChangedListener(changeListener); + } } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java index 717142695..82fd2a55a 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java @@ -2,10 +2,9 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.sdb; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import javax.sql.DataSource; @@ -23,6 +22,8 @@ import com.hp.hpl.jena.sdb.sql.SDBConnection; import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper; import edu.cornell.mannlib.vitro.webapp.dao.jena.StaticDatasetFactory; +import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; @@ -74,22 +75,21 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService { return false; } - SDBConnection sdbConn = getSDBConnection(); + SDBConnection sdbConn = getSDBConnection(); Dataset dataset = getDataset(sdbConn); - - try { - insureThatInputStreamsAreResettable(changeSet); - - beginTransaction(sdbConn); + try { + insureThatInputStreamsAreResettable(changeSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + beginTransaction(sdbConn); notifyListenersOfPreChangeEvents(changeSet); - applyChangeSetToModel(changeSet, dataset); - + applyChangeSetToModel(changeSet, dataset); commitTransaction(sdbConn); - notifyListenersOfChanges(changeSet); - notifyListenersOfPostChangeEvents(changeSet); - + notifyListenersOfPostChangeEvents(changeSet); return true; } catch (Exception e) { log.error(e, e); @@ -139,11 +139,7 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService { sdbConn.getTransactionHandler().abort(); } } - - private static final Pattern OPTIONAL_PATTERN = Pattern.compile("optional", Pattern.CASE_INSENSITIVE); - - private static final Pattern GRAPH_PATTERN = Pattern.compile("graph", Pattern.CASE_INSENSITIVE); - + @Override protected QueryExecution createQueryExecution(String queryString, Query q, Dataset d) { return QueryExecutionFactory.create(q, d); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFService.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFService.java index 2005b90f3..4d4859571 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFService.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFService.java @@ -7,6 +7,8 @@ import java.io.OutputStream; import java.util.List; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; + import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; @@ -163,6 +165,18 @@ public class LoggingRDFService implements RDFService { innerService.unregisterListener(changeListener); } + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + innerService.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + innerService.unregisterJenaModelChangedListener(changeListener); + } + @Override public ChangeSet manufactureChangeSet() { return innerService.manufactureChangeSet(); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFServiceFactory.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFServiceFactory.java index fec57486d..0e84a447d 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFServiceFactory.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/logging/LoggingRDFServiceFactory.java @@ -2,6 +2,8 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.logging; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; + import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; @@ -40,6 +42,18 @@ public class LoggingRDFServiceFactory implements RDFServiceFactory { factory.unregisterListener(changeListener); } + @Override + public void registerJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + factory.registerJenaModelChangedListener(changeListener); + } + + @Override + public void unregisterJenaModelChangedListener(ModelChangedListener changeListener) + throws RDFServiceException { + factory.unregisterJenaModelChangedListener(changeListener); + } + @Override public String toString() { return "LoggingRDFServiceFactory[factory=" + factory + "]"; diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java index 262c5b611..fa740230e 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java @@ -16,8 +16,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; -import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,10 +63,11 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.ChangeSetImpl; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; -import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.ListeningGraph; +import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory; import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetQuadsIterator; import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetTriplesIterator; @@ -83,7 +82,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { protected String readEndpointURI; protected String updateEndpointURI; // the number of triples to be - private static final int CHUNK_SIZE = 1000; // added/removed in a single + private static final int CHUNK_SIZE = 5000; // added/removed in a single // SPARQL UPDATE protected HttpClient httpClient; @@ -170,12 +169,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { && !isPreconditionSatisfied( changeSet.getPreconditionQuery(), changeSet.getPreconditionQueryType())) { - return false; + return false; } try { - for (Object o : changeSet.getPreChangeEvents()) { - this.notifyListenersOfEvent(o); + + for (Object o : changeSet.getPreChangeEvents()) { + this.notifyListenersOfEvent(o); } Iterator csIt = changeSet.getModelChanges().iterator(); @@ -188,29 +188,8 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { modelChange.getSerializedModel().mark(Integer.MAX_VALUE); performChange(modelChange); } - - // notify listeners of triple changes - csIt = changeSet.getModelChanges().iterator(); - while (csIt.hasNext()) { - ModelChange modelChange = csIt.next(); - modelChange.getSerializedModel().reset(); - Model model = ModelFactory.createModelForGraph( - new ListeningGraph(modelChange.getGraphURI(), this)); - if (modelChange.getOperation() == ModelChange.Operation.ADD) { - model.read(modelChange.getSerializedModel(), null, - getSerializationFormatString( - modelChange.getSerializationFormat())); - } else if (modelChange.getOperation() == ModelChange.Operation.REMOVE){ - Model temp = ModelFactory.createDefaultModel(); - temp.read(modelChange.getSerializedModel(), null, - getSerializationFormatString( - modelChange.getSerializationFormat())); - model.remove(temp); - } else { - log.error("Unsupported model change type " + - modelChange.getOperation().getClass().getName()); - } - } + + notifyListenersOfChanges(changeSet); for (Object o : changeSet.getPostChangeEvents()) { this.notifyListenersOfEvent(o); @@ -468,7 +447,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { */ @Override public void getGraphMetadata() throws RDFServiceException { - + throw new UnsupportedOperationException(); } /** @@ -549,7 +528,9 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { public void addModel(Model model, String graphURI) throws RDFServiceException { try { + long start = System.currentTimeMillis(); verbModel(model, graphURI, "INSERT"); + log.info((System.currentTimeMillis() - start) + " ms to insert " + model.size() + " triples"); } finally { rebuildGraphURICache = true; } @@ -591,45 +572,45 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { } } - protected void addTriple(Triple t, String graphURI) throws RDFServiceException { - try { - StringBuffer updateString = new StringBuffer(); - updateString.append("INSERT DATA { "); - updateString.append((graphURI != null) ? "GRAPH <" + graphURI + "> { " : ""); - updateString.append(sparqlNodeUpdate(t.getSubject(), "")); - updateString.append(" "); - updateString.append(sparqlNodeUpdate(t.getPredicate(), "")); - updateString.append(" "); - updateString.append(sparqlNodeUpdate(t.getObject(), "")); - updateString.append(" }"); - updateString.append((graphURI != null) ? " } " : ""); - - executeUpdate(updateString.toString()); - notifyListeners(t, ModelChange.Operation.ADD, graphURI); - } finally { - rebuildGraphURICache = true; - } - } - - protected void removeTriple(Triple t, String graphURI) throws RDFServiceException { - try { - StringBuffer updateString = new StringBuffer(); - updateString.append("DELETE DATA { "); - updateString.append((graphURI != null) ? "GRAPH <" + graphURI + "> { " : ""); - updateString.append(sparqlNodeUpdate(t.getSubject(), "")); - updateString.append(" "); - updateString.append(sparqlNodeUpdate(t.getPredicate(), "")); - updateString.append(" "); - updateString.append(sparqlNodeUpdate(t.getObject(), "")); - updateString.append(" }"); - updateString.append((graphURI != null) ? " } " : ""); - - executeUpdate(updateString.toString()); - notifyListeners(t, ModelChange.Operation.REMOVE, graphURI); - } finally { - rebuildGraphURICache = true; - } - } +// protected void addTriple(Triple t, String graphURI) throws RDFServiceException { +// try { +// StringBuffer updateString = new StringBuffer(); +// updateString.append("INSERT DATA { "); +// updateString.append((graphURI != null) ? "GRAPH <" + graphURI + "> { " : ""); +// updateString.append(sparqlNodeUpdate(t.getSubject(), "")); +// updateString.append(" "); +// updateString.append(sparqlNodeUpdate(t.getPredicate(), "")); +// updateString.append(" "); +// updateString.append(sparqlNodeUpdate(t.getObject(), "")); +// updateString.append(" }"); +// updateString.append((graphURI != null) ? " } " : ""); +// +// executeUpdate(updateString.toString()); +// notifyListeners(t, ModelChange.Operation.ADD, graphURI); +// } finally { +// rebuildGraphURICache = true; +// } +// } +// +// protected void removeTriple(Triple t, String graphURI) throws RDFServiceException { +// try { +// StringBuffer updateString = new StringBuffer(); +// updateString.append("DELETE DATA { "); +// updateString.append((graphURI != null) ? "GRAPH <" + graphURI + "> { " : ""); +// updateString.append(sparqlNodeUpdate(t.getSubject(), "")); +// updateString.append(" "); +// updateString.append(sparqlNodeUpdate(t.getPredicate(), "")); +// updateString.append(" "); +// updateString.append(sparqlNodeUpdate(t.getObject(), "")); +// updateString.append(" }"); +// updateString.append((graphURI != null) ? " } " : ""); +// +// executeUpdate(updateString.toString()); +// notifyListeners(t, ModelChange.Operation.REMOVE, graphURI); +// } finally { +// rebuildGraphURICache = true; +// } +// } @Override protected boolean isPreconditionSatisfied(String query, diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 78aa6a4f4..13c63ccbc 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -10,14 +10,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; -import com.hp.hpl.jena.rdf.model.NodeIterator; -import com.hp.hpl.jena.rdf.model.Property; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +28,8 @@ import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.NodeIterator; +import com.hp.hpl.jena.rdf.model.Property; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.ResourceFactory; @@ -47,6 +46,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; public class ABoxRecomputer { @@ -62,7 +62,7 @@ public class ABoxRecomputer { private volatile boolean recomputing = false; private boolean stopRequested = false; - private final int BATCH_SIZE = 100; + private final int BATCH_SIZE = 500; private final int REPORTING_INTERVAL = 1000; /** @@ -92,9 +92,9 @@ public class ABoxRecomputer { } /** - * Recompute all inferences. + * Recompute all individuals */ - public void recompute() { + public void recompute() { synchronized (lock1) { if (recomputing) { return; @@ -103,49 +103,80 @@ public class ABoxRecomputer { } } try { - if (searchIndexer != null) { + if (searchIndexer != null) { searchIndexer.pause(); // Register now that we want to rebuild the index when we unpause // This allows the indexer to optimize behaviour whilst paused searchIndexer.rebuildIndex(); } + log.info("Recomputing ABox inferences."); + log.info("Finding individuals in ABox."); + QueueindividualURIs = this.getAllIndividualURIs(); + log.info("Recomputing inferences for " + individualURIs.size() + " individuals"); // Create a type cache for this execution and pass it to the recompute function // Ensures that caches are only valid for the length of one recompute - recomputeABox(new TypeCaches()); + recomputeIndividuals(individualURIs, new TypeCaches()); + log.info("Finished recomputing inferences"); } finally { - if (searchIndexer != null) { + if(searchIndexer != null) { searchIndexer.unpause(); } synchronized (lock1) { - recomputing = false; + recomputing = false; + } + } + } + + /** + * Recompute inferences for specified collection of individual URIs, + * or all URIs if parameter is null + */ + public void recompute(Queue individualURIs) { + boolean sizableRecompute = (individualURIs.size() > 20); + try { + if(sizableRecompute && searchIndexer != null) { + searchIndexer.pause(); + } + recomputeIndividuals(individualURIs); + } finally { + if (sizableRecompute && searchIndexer != null) { + searchIndexer.unpause(); } } } /* - * Recompute the entire ABox inference graph. + * Recompute the ABox inference graph for the specified collection of + * individual URIs */ - protected void recomputeABox(TypeCaches caches) { - log.info("Recomputing ABox inferences."); - log.info("Finding individuals in ABox."); - Collection individuals = this.getAllIndividualURIs(); - log.info("Recomputing inferences for " + individuals.size() + " individuals"); + private void recomputeIndividuals(Queue individuals) { + recomputeIndividuals(individuals, new TypeCaches()); + } + + /* + * Recompute the ABox inference graph for the specified collection of + * individual URIs + */ + protected void recomputeIndividuals(Queue individuals, TypeCaches caches) { + if (individuals == null) { + return; + } long start = System.currentTimeMillis(); + int size = individuals.size(); int numInds = 0; Model rebuildModel = ModelFactory.createDefaultModel(); Model additionalInferences = ModelFactory.createDefaultModel(); List individualsInBatch = new ArrayList(); - Iterator individualIt = individuals.iterator(); - while (individualIt.hasNext()) { - String individualURI = individualIt.next(); + while (!individuals.isEmpty()) { + String individualURI = individuals.poll(); try { additionalInferences.add(recomputeIndividual( - individualURI, rebuildModel, caches)); + individualURI, rebuildModel, caches, individuals)); numInds++; individualsInBatch.add(individualURI); boolean batchFilled = (numInds % BATCH_SIZE) == 0; boolean reportingInterval = (numInds % REPORTING_INTERVAL) == 0; - if (batchFilled || !individualIt.hasNext()) { + if (batchFilled || individuals.isEmpty()) { log.debug(rebuildModel.size() + " total inferences"); updateInferenceModel(rebuildModel, individualsInBatch); rebuildModel.removeAll(); @@ -153,11 +184,11 @@ public class ABoxRecomputer { } if (reportingInterval) { log.info("Still recomputing inferences (" - + numInds + "/" + individuals.size() + " individuals)"); + + numInds + "/" + size + " individuals)"); log.info((System.currentTimeMillis() - start) / numInds + " ms per individual"); } if (stopRequested) { - log.info("a stopRequested signal was received during recomputeABox. Halting Processing."); + log.info("a stopRequested signal was received during recomputeIndividuals. Halting Processing."); return; } } catch (Exception e) { @@ -165,7 +196,7 @@ public class ABoxRecomputer { } } if(additionalInferences.size() > 0) { - log.info("Writing additional inferences generated by reasoner plugins."); + log.debug("Writing additional inferences generated by reasoner plugins."); ChangeSet change = rdfService.manufactureChangeSet(); change.addAddition(makeN3InputStream(additionalInferences), RDFService.ModelSerializationFormat.N3, ModelNames.ABOX_INFERENCES); @@ -175,20 +206,20 @@ public class ABoxRecomputer { log.error("Unable to write additional inferences from reasoner plugins", e); } } - log.info("Finished recomputing inferences"); } private static final boolean RUN_PLUGINS = true; private static final boolean SKIP_PLUGINS = !RUN_PLUGINS; private Model recomputeIndividual(String individualURI, - Model rebuildModel, TypeCaches caches) throws RDFServiceException { + Model rebuildModel, TypeCaches caches, Collection individualQueue) + throws RDFServiceException { long start = System.currentTimeMillis(); Model assertions = getAssertions(individualURI); - log.trace((System.currentTimeMillis() - start) + " ms to get assertions."); + log.debug((System.currentTimeMillis() - start) + " ms to get assertions."); + long prevRebuildSize = (simpleReasoner.getSameAsEnabled()) ? rebuildModel.size() : 0; Model additionalInferences = recomputeIndividual( individualURI, null, assertions, rebuildModel, caches, RUN_PLUGINS); - if (simpleReasoner.getSameAsEnabled()) { Set sameAsInds = getSameAsIndividuals(individualURI); for (String sameAsInd : sameAsInds) { @@ -201,7 +232,15 @@ public class ABoxRecomputer { Resource indRes = ResourceFactory.createResource(individualURI); Resource sameAsIndRes = ResourceFactory.createResource(sameAsInd); if(!assertions.contains(indRes, OWL.sameAs, sameAsIndRes)) { - rebuildModel.add(indRes, OWL.sameAs, sameAsIndRes); + if(!rebuildModel.contains(indRes, OWL.sameAs, sameAsIndRes)) { + individualQueue.add(sameAsInd); + rebuildModel.add(indRes, OWL.sameAs, sameAsIndRes); + } + } + } + if(rebuildModel.size() - prevRebuildSize > 0) { + for (String sameAsInd : sameAsInds) { + individualQueue.add(sameAsInd); } } } @@ -330,7 +369,6 @@ public class ABoxRecomputer { mostSpecificTypes = getMostSpecificTypes(individual, assertedTypes); caches.cacheMostSpecificTypes(key, mostSpecificTypes); } - return mostSpecificTypes; } @@ -343,6 +381,8 @@ public class ABoxRecomputer { " FILTER NOT EXISTS { \n" + " <" + individual.getURI() + "> a ?type2 . \n" + " ?type2 <" + RDFS.subClassOf.getURI() + "> ?type. \n" + + " FILTER (?type != ?type2) \n" + + " FILTER NOT EXISTS { ?type <" + OWL.equivalentClass + "> ?type2 } \n" + " } \n" + " FILTER NOT EXISTS { \n" + " <" + individual.getURI() + "> <" + VitroVocabulary.MOST_SPECIFIC_TYPE + "> ?type \n" + @@ -395,8 +435,8 @@ public class ABoxRecomputer { /* * Get the URIs for all individuals in the system */ - protected Collection getAllIndividualURIs() { - HashSet individualURIs = new HashSet(); + protected Queue getAllIndividualURIs() { + Queue individualURIs = new IndividualURIQueue(); List classList = new ArrayList(); tboxModel.enterCriticalSection(Lock.READ); try { @@ -420,7 +460,7 @@ public class ABoxRecomputer { return individualURIs; } - protected void getIndividualURIs(String queryString, Set individuals) { + protected void getIndividualURIs(String queryString, Queue individuals) { int batchSize = 50000; int offset = 0; boolean done = false; @@ -507,7 +547,7 @@ public class ABoxRecomputer { return new ByteArrayInputStream(out.toByteArray()); } - private Set getSameAsIndividuals(String individualURI) { + public Set getSameAsIndividuals(String individualURI) { HashSet sameAsInds = new HashSet(); sameAsInds.add(individualURI); getSameAsIndividuals(individualURI, sameAsInds); @@ -519,14 +559,18 @@ public class ABoxRecomputer { try { final List addedURIs = new ArrayList(); StringBuilder builder = new StringBuilder(); - builder.append("SELECT\n") .append(" ?object\n") - .append("WHERE\n") - .append("{\n") - .append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n") + .append("WHERE {\n") + .append(" GRAPH ?g { \n") + .append(" {\n") + .append(" <" + individualUri + "> <" + OWL.sameAs + "> ?object .\n") + .append(" } UNION {\n") + .append(" ?object <" + OWL.sameAs + "> <" + individualUri + "> .\n") + .append(" }\n") + .append(" } \n") + .append(" FILTER (?g != <" + ModelNames.ABOX_INFERENCES + ">)\n") .append("}\n"); - rdfService.sparqlSelectQuery(builder.toString(), new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { @@ -537,37 +581,11 @@ public class ABoxRecomputer { } } }); - - for (String indUri : addedURIs) { - getSameAsIndividuals(indUri, sameAsInds); - } - - addedURIs.clear(); - builder = new StringBuilder(); - - builder.append("SELECT\n") - .append(" ?subject\n") - .append("WHERE\n") - .append("{\n") - .append(" ?subject <" + OWL.sameAs + "> <" + individualUri + "> .\n") - .append("}\n"); - - rdfService.sparqlSelectQuery(builder.toString(), new ResultSetConsumer() { - @Override - protected void processQuerySolution(QuerySolution qs) { - Resource object = qs.getResource("subject"); - if (object != null && !sameAsInds.contains(object.getURI())) { - sameAsInds.add(object.getURI()); - addedURIs.add(object.getURI()); - } - } - }); - for (String indUri : addedURIs) { getSameAsIndividuals(indUri, sameAsInds); } } catch (RDFServiceException e) { - + log.error(e,e); } } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/IndividualURIQueue.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/IndividualURIQueue.java new file mode 100644 index 000000000..3c1827bf0 --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/IndividualURIQueue.java @@ -0,0 +1,143 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.reasoner; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class IndividualURIQueue implements Queue { + + private ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + private ConcurrentHashMap m = new ConcurrentHashMap(); + + @Override + public synchronized boolean addAll(Collection c) { + boolean changed = false; + for (E e : c) { + if(!m.containsKey(e)) { + m.put(e, Boolean.TRUE); + q.add(e); + changed = true; + } + } + return changed; + } + + @Override + public synchronized void clear() { + m.clear(); + q.clear(); + } + + @Override + public boolean contains(Object o) { + return m.contains(o); + } + + @Override + public boolean containsAll(Collection c) { + boolean contains = true; + for(Object e : c) { + contains |= m.contains(e); + } + return contains; + } + + @Override + public boolean isEmpty() { + return q.isEmpty(); + } + + @Override + public Iterator iterator() { + return q.iterator(); + } + + @Override + public synchronized boolean remove(Object o) { + m.remove(o); + return q.remove(o); + } + + @Override + public synchronized boolean removeAll(Collection c) { + for (Object e : c) { + m.remove(e); + } + return q.removeAll(c); + } + + @Override + public synchronized boolean retainAll(Collection c) { + boolean changed = false; + Iterator it = m.keySet().iterator(); + while(it.hasNext()) { + E e = it.next(); + if(!c.contains(e)) { + m.remove(e); + q.remove(e); + changed = true; + } + } + return changed; + } + + @Override + public int size() { + return m.size(); + } + + @Override + public Object[] toArray() { + return q.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return q.toArray(a); + } + + @Override + public synchronized boolean add(E e) { + if(m.containsKey(e)) { + return false; + } else { + m.put(e, Boolean.TRUE); + q.add(e); + return true; + } + } + + @Override + public E element() { + return q.element(); + } + + @Override + public boolean offer(E e) { + return q.offer(e); + } + + @Override + public E peek() { + return q.peek(); + } + + @Override + public synchronized E poll() { + E e = q.poll(); + m.remove(e); + return e; + } + + @Override + public synchronized E remove() { + E e = q.remove(); + m.remove(e); + return e; + } + +} \ No newline at end of file diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java index 524ab7d7f..e45f1d117 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java @@ -2,12 +2,11 @@ package edu.cornell.mannlib.vitro.webapp.reasoner; -import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING; - import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -23,7 +22,9 @@ import com.hp.hpl.jena.query.DatasetFactory; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Literal; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.NodeIterator; import com.hp.hpl.jena.rdf.model.Property; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.ResIterator; @@ -37,18 +38,17 @@ import com.hp.hpl.jena.vocabulary.OWL; import com.hp.hpl.jena.vocabulary.RDF; import com.hp.hpl.jena.vocabulary.RDFS; -import edu.cornell.mannlib.vitro.webapp.dao.jena.ABoxJenaChangeListener; -import edu.cornell.mannlib.vitro.webapp.dao.jena.CumulativeDeltaModeler; import edu.cornell.mannlib.vitro.webapp.dao.jena.DifferenceGraph; import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceGraph; -import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; -import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; /** * Allows for real-time incremental materialization or retraction of RDFS- @@ -57,7 +57,8 @@ import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; * @author sjm222 */ -public class SimpleReasoner extends StatementListener { +public class SimpleReasoner extends StatementListener + implements ModelChangedListener, ChangeListener { private static final Log log = LogFactory.getLog(SimpleReasoner.class); @@ -75,18 +76,8 @@ public class SimpleReasoner extends StatementListener { VitroModelFactory.createOntologyModel()) .createAnnotationProperty(mostSpecificTypePropertyURI); - // DeltaComputer - private CumulativeDeltaModeler aBoxDeltaModeler1 = null; - private CumulativeDeltaModeler aBoxDeltaModeler2 = null; - private int batchMode = 0; // values: 0, 1 and 2 - - // Recomputer private ABoxRecomputer recomputer = null; - - private boolean stopRequested = false; - - private List pluginList = new CopyOnWriteArrayList(); - + private List pluginList = new CopyOnWriteArrayList(); private boolean doSameAs = true; /** @@ -122,17 +113,13 @@ public class SimpleReasoner extends StatementListener { tboxModel.getGraph()))); this.inferenceModel = inferenceModel; - this.batchMode = 0; - aBoxDeltaModeler1 = new CumulativeDeltaModeler(); - aBoxDeltaModeler2 = new CumulativeDeltaModeler(); recomputer = new ABoxRecomputer(tboxModel, aboxModel, rdfService, this, searchIndexer); - stopRequested = false; if (rdfService == null) { aboxModel.register(this); } else { try { - rdfService.registerListener(new ABoxJenaChangeListener(this)); + rdfService.registerListener(this); } catch (RDFServiceException e) { throw new RuntimeException("Unable to register change listener", e); } @@ -154,15 +141,10 @@ public class SimpleReasoner extends StatementListener { this.inferenceModel = inferenceModel; this.fullModel = VitroModelFactory.createUnion(aboxModel, VitroModelFactory.createOntologyModel(inferenceModel)); - aBoxDeltaModeler1 = new CumulativeDeltaModeler(); - aBoxDeltaModeler2 = new CumulativeDeltaModeler(); - this.batchMode = 0; - stopRequested = false; Dataset ds = DatasetFactory.createMem(); ds.addNamedModel(ModelNames.ABOX_ASSERTIONS, aboxModel); ds.addNamedModel(ModelNames.ABOX_INFERENCES, inferenceModel); - ds.addNamedModel(ModelNames.TBOX_ASSERTIONS, tboxModel); - + ds.addNamedModel(ModelNames.TBOX_ASSERTIONS, tboxModel); ds.setDefaultModel(ModelFactory.createUnion(fullModel, tboxModel)); recomputer = new ABoxRecomputer(tboxModel, aboxModel, new RDFServiceModel(ds), this, searchIndexer); } @@ -182,66 +164,79 @@ public class SimpleReasoner extends StatementListener { public boolean getSameAsEnabled() { return this.doSameAs; } + + public void notifyModelChange(ModelChange modelChange) { + if(isABoxInferenceGraph(modelChange.getGraphURI()) + || isTBoxGraph(modelChange.getGraphURI())) { + return; + } + Queue individualURIs = new IndividualURIQueue(); + Model m = RDFServiceUtils.parseModel(modelChange.getSerializedModel(), + modelChange.getSerializationFormat()); + StmtIterator sit = m.listStatements(); + while(sit.hasNext()) { + queueRelevantIndividuals(sit.nextStatement(), individualURIs); + } + recomputeIndividuals(individualURIs); + } + + /* + * Performs incremental ABox reasoning based + * on the addition of a new statement + * (aka assertion) to the ABox. + */ + @Override + public void addedStatement(Statement stmt) { + doPlugins(ModelUpdate.Operation.ADD,stmt); + listenToStatement(stmt, new IndividualURIQueue()); + } + + /* + * Performs incremental ABox reasoning based + * on the retraction of a statement (aka assertion) + * from the ABox. + */ + @Override + public void removedStatement(Statement stmt) { + doPlugins(ModelUpdate.Operation.RETRACT,stmt); + Queue individualURIs = new IndividualURIQueue(); + if(doSameAs && OWL.sameAs.equals(stmt.getPredicate())) { + if (stmt.getSubject().isURIResource()) { + individualURIs.addAll(this.recomputer.getSameAsIndividuals( + stmt.getSubject().getURI())); + } + if (stmt.getObject().isURIResource()) { + individualURIs.addAll(this.recomputer.getSameAsIndividuals( + stmt.getObject().asResource().getURI())); + } + } + listenToStatement(stmt, individualURIs); + } + + private void listenToStatement(Statement stmt, Queue individualURIs) { + queueRelevantIndividuals(stmt, individualURIs); + recomputeIndividuals(individualURIs); + } + + private void queueRelevantIndividuals(Statement stmt, Queue individualURIs) { + if(stmt.getSubject().isURIResource()) { + individualURIs.add(stmt.getSubject().getURI()); + } + if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) { + individualURIs.add(stmt.getObject().asResource().getURI()); + } + } - /* - * Performs incremental ABox reasoning based - * on the addition of a new statement - * (aka assertion) to the ABox. - */ - @Override - public void addedStatement(Statement stmt) { - try { - if (stmt.getPredicate().equals(RDF.type)) { - addedABoxTypeAssertion(stmt, inferenceModel, new HashSet()); - setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); - } else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) { - addedABoxSameAsAssertion(stmt, inferenceModel); - } else { - addedABoxAssertion(stmt, inferenceModel); - } - - doPlugins(ModelUpdate.Operation.ADD,stmt); - - } catch (Exception e) { // don't stop the edit if there's an exception - log.error("Exception while computing inferences: " + e.getMessage()); - } - } - - /* - * Performs incremental ABox reasoning based - * on the retraction of a statement (aka assertion) - * from the ABox. - */ - @Override - public void removedStatement(Statement stmt) { - try { - handleRemovedStatement(stmt); - } catch (Exception e) { // don't stop the edit if there's an exception - log.error("Exception while retracting inferences: ", e); - } - } - - /* - * Synchronized part of removedStatement. Interacts with DeltaComputer. - */ - protected synchronized void handleRemovedStatement(Statement stmt) { - if (batchMode == 1) { - aBoxDeltaModeler1.removedStatement(stmt); - } else if (batchMode == 2) { - aBoxDeltaModeler2.removedStatement(stmt); - } else { // batchMode == 0 - if (stmt.getPredicate().equals(RDF.type)) { - removedABoxTypeAssertion(stmt, inferenceModel); - setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); - } else if ( doSameAs && stmt.getPredicate().equals(OWL.sameAs)) { - removedABoxSameAsAssertion(stmt, inferenceModel); - } else { - removedABoxAssertion(stmt, inferenceModel); - } - doPlugins(ModelUpdate.Operation.RETRACT,stmt); - } - } - + private void recomputeIndividuals(Queue individualURIs) { + long start = System.currentTimeMillis(); + int size = individualURIs.size(); + recomputer.recompute(individualURIs); + if(size > 2) { + log.info((System.currentTimeMillis() - start) + " ms to recompute " + + size + " individuals"); + } + } + /** * Performs incremental ABox reasoning based * on changes to the class hierarchy. @@ -363,202 +358,7 @@ public class SimpleReasoner extends StatementListener { public void removedTBoxStatement(Statement stmt) { changedTBoxStatement(stmt, false); } - - protected void addedABoxTypeAssertion(Statement stmt, - Model inferenceModel, - HashSet unknownTypes) { - addedABoxTypeAssertion(stmt, inferenceModel, unknownTypes, true); - } - /** - * Performs incremental reasoning based on a new type assertion - * added to the ABox (assertion that an individual is of a certain - * type). - * - * If it is added that B is of type A, then for each superclass of - * A assert that B is of that type. - */ - protected void addedABoxTypeAssertion(Statement stmt, - Model inferenceModel, - HashSet unknownTypes, - boolean checkRedundancy) { - - tboxModel.enterCriticalSection(Lock.READ); - try { - Resource cls = null; - if ( (stmt.getObject().asResource()).getURI() != null ) { - - cls = tboxModel.getResource(stmt.getObject().asResource().getURI()); - if (cls != null) { - List parents = getParents(cls,tboxModel); - - Iterator parentIt = parents.iterator(); - - if (parentIt.hasNext()) { - while (parentIt.hasNext()) { - Resource parentClass = parentIt.next(); - - // VIVO doesn't materialize statements that assert anonymous types - // for individuals. Also, sharing an identical anonymous node is - // not allowed in owl-dl. picklist population code looks at qualities - // of classes not individuals. - if (parentClass.isAnon()) continue; - - Statement infStmt = - ResourceFactory.createStatement(stmt.getSubject(), - RDF.type, parentClass); - addInference(infStmt, inferenceModel, true, checkRedundancy); - } - } - } else { - if ( !(stmt.getObject().asResource().getNameSpace()).equals(OWL.NS)) { - if (!unknownTypes.contains(stmt.getObject().asResource().getURI())) { - unknownTypes.add(stmt.getObject().asResource().getURI()); - log.warn("Didn't find the target class (the object of an added " + - "rdf:type statement) in the TBox: " + - (stmt.getObject().asResource()).getURI() + - ". No class subsumption reasoning will be done " + - "based on type assertions of this type."); - } - } - } - } else { - log.debug("The object of this rdf:type assertion has a null URI, no reasoning" - + " will be done based on this assertion: " + stmtString(stmt)); - return; - } - } finally { - tboxModel.leaveCriticalSection(); - } - - inferenceModel.enterCriticalSection(Lock.WRITE); - try { - if (inferenceModel.contains(stmt)) { - inferenceModel.remove(stmt); - } - } finally { - inferenceModel.leaveCriticalSection(); - } - } - - /** - * If it is removed that B is of type A, then for each superclass of A remove - * the inferred statement that B is of that type UNLESS it is otherwise entailed - * that B is of that type. - * - */ - protected void removedABoxTypeAssertion(Statement stmt, Model inferenceModel) { - removedABoxTypeAssertion(stmt, inferenceModel, null); - } - - /** - * If it is removed that B is of type A, then for each superclass of A remove - * the inferred statement that B is of that type UNLESS it is otherwise entailed - * that B is of that type. - * - * remainingTypeURIs is an optional list of asserted type URIs for the subject of - * stmt, and may be null. Supplying a precompiled list can yield performance - * improvement when this method is called repeatedly for the same subject. - * - */ - protected void removedABoxTypeAssertion(Statement stmt, - Model inferenceModel, - List remainingTypeURIs) { - tboxModel.enterCriticalSection(Lock.READ); - try { - Resource cls = null; - - if ( (stmt.getObject().asResource()).getURI() != null ) { - cls = tboxModel.getResource(stmt.getObject().asResource().getURI()); - - if (cls != null) { - if (entailedType(stmt.getSubject(),cls)) { - addInference(stmt,inferenceModel,true); - } - - List parents = getParents(cls,tboxModel); - - Iterator parentIt = parents.iterator(); - - while (parentIt.hasNext()) { - - Resource parentClass = parentIt.next(); - - // VIVO doesn't materialize statements that assert anonymous types - // for individuals. Also, sharing an identical anonymous node is - // not allowed in owl-dl. picklist population code looks at qualities - // of classes not individuals. - if (parentClass.isAnon()) continue; - - List typeURIs = (remainingTypeURIs == null) - ? getRemainingAssertedTypeURIs(stmt.getSubject()) : remainingTypeURIs; - if (entailedType(stmt.getSubject(),parentClass, typeURIs)) { - continue; // if a type is still entailed without the - } - // removed statement, then don't remove it - // from the inferences - - Statement infStmt = - ResourceFactory.createStatement(stmt.getSubject(), RDF.type, parentClass); - removeInference(infStmt,inferenceModel,true,false); - } - } else { - log.warn("Didn't find target class (the object of the removed rdf:type" - + "statement) in the TBox: " - + ((Resource)stmt.getObject()).getURI() + ". No class subsumption" - +" reasoning will be performed based on the removal of this assertion."); - } - } else { - log.warn("The object of this rdf:type assertion has a null URI: " - + stmtString(stmt)); - } - } catch (Exception e) { - log.warn("exception while removing abox type assertions: " + e.getMessage()); - } finally { - tboxModel.leaveCriticalSection(); - } - } - - /** - * Performs incremental property-based reasoning. - * - * Retracts inferences based on the owl:inverseOf relationship. - * - * If it is removed that x prop1 y, and prop2 is an inverseOf prop1 - * then remove y prop2 x from the inference graph, unless it is - * otherwise entailed by the assertions graph independently of - * this removed statement. - */ - protected void removedABoxAssertion(Statement stmt, Model inferenceModel) { - - if (!stmt.getObject().isLiteral()) { - List inverseProperties = getInverseProperties(stmt); - Iterator inverseIter = inverseProperties.iterator(); - - while (inverseIter.hasNext()) { - OntProperty inverseProp = inverseIter.next(); - Statement infStmt = ResourceFactory.createStatement( - stmt.getObject().asResource(), inverseProp, stmt.getSubject()); - removeInference(infStmt,inferenceModel); - } - } - - if( doSameAs ) - doSameAsForRemovedABoxAssertion( stmt, inferenceModel ); - - // if a statement has been removed that is otherwise entailed, - // add it to the inference graph. - inferenceModel.enterCriticalSection(Lock.WRITE); - try { - if (entailedStatement(stmt) && !inferenceModel.contains(stmt)) { - inferenceModel.add(stmt); - } - } finally { - inferenceModel.leaveCriticalSection(); - } - } - - /** * If it is added that B is a subClass of A, then for each * individual that is typed as B, either in the ABox or in the @@ -717,241 +517,7 @@ public class SimpleReasoner extends StatementListener { } return sameIndividuals; } - - /** - * Materializes inferences based on the owl:sameAs relationship. - * - * If it is added that x owl:sameAs y, then all asserted and inferred - * statements about x will become inferred about y if they are not already - * asserted about y, and vice versa. - */ - protected void addedABoxSameAsAssertion(Statement stmt, Model inferenceModel) { - Resource subject = null; - Resource object = null; - - if (stmt.getSubject().isResource()) { - subject = stmt.getSubject().asResource(); - if (tboxModel.containsResource(subject) || subject.isAnon()) { - log.debug("the subject of this sameAs statement is either in the tbox or an anonymous node, no reasoning will be done: " + stmtString(stmt)); - return; - } - } else { - log.warn("the subject of this sameAs statement is not a resource, no reasoning will be done: " + stmtString(stmt)); - return; - } - - if (stmt.getObject().isResource()) { - object = stmt.getObject().asResource(); - if (tboxModel.containsResource(object) || object.isAnon()) { - log.debug("the object of this sameAs statement is either in the tbox or an anonymous node, no reasoning will be done: " + stmtString(stmt)); - return; - } - } else { - log.warn("the object of this sameAs statement is not a resource, no reasoning will be done: " + stmtString(stmt)); - return; - } - - inferenceModel.enterCriticalSection(Lock.WRITE); - try { - if (inferenceModel.contains(stmt)) { - inferenceModel.remove(stmt); - } - } finally { - inferenceModel.leaveCriticalSection(); - } - - Statement opposite = ResourceFactory.createStatement(object, OWL.sameAs, subject); - addInference(opposite,inferenceModel,true); - - generateSameAsInferences(subject, object, inferenceModel); - generateSameAsInferences(object, subject, inferenceModel); - } - - /** - * Materializes inferences based on the owl:sameAs relationship. - * - * If it is removed that x is sameAs y, then remove y sameAs x from - * the inference graph and then recompute the inferences for x and - * y based on their respective assertions. - * that x owl:sameAs y, then all asserted and inferred - */ - protected void removedABoxSameAsAssertion(Statement stmt, Model inferenceModel) { - Resource subject = null; - Resource object = null; - - if (stmt.getSubject().isResource()) { - subject = stmt.getSubject().asResource(); - if (tboxModel.containsResource(subject) || subject.isAnon()) { - log.debug("the subject of this removed sameAs statement is either in the tbox or an anonymous node, no reasoning will be done: " + stmtString(stmt)); - return; - } - } else { - log.warn("the subject of this removed sameAs statement is not a resource, no reasoning will be done: " + stmtString(stmt)); - return; - } - - if (stmt.getObject().isResource()) { - object = stmt.getObject().asResource(); - if (tboxModel.containsResource(object) || object.isAnon()) { - log.debug("the object of this removed sameAs statement is either in the tbox or an anonymous node, no reasoning will be done: " + stmtString(stmt)); - return; - } - } else { - log.warn("the object of this removed sameAs statement is not a resource, no reasoning will be done: " + stmtString(stmt)); - return; - } - - List sameIndividuals = getSameIndividuals(subject,inferenceModel); - sameIndividuals.addAll(getSameIndividuals(object, inferenceModel)); - - Iterator sIter1 = sameIndividuals.iterator(); - while (sIter1.hasNext()) { - removeInferencesForIndividual(sIter1.next(), inferenceModel); - } - - Iterator sIter2 = sameIndividuals.iterator(); - while (sIter2.hasNext()) { - computeInferencesForIndividual(sIter2.next(), inferenceModel); - } - } - protected void doSameAsForAddedABoxAssertion(Statement stmt, Model inferenceModel){ - List sameIndividuals = - getSameIndividuals(stmt.getSubject().asResource(), inferenceModel); - - Iterator sameIter = sameIndividuals.iterator(); - while (sameIter.hasNext()) { - Resource subject = sameIter.next(); - Statement sameStmt = - ResourceFactory.createStatement(subject,stmt.getPredicate(),stmt.getObject()); - addInference(sameStmt,inferenceModel, doSameAs); - } - } - - - /** - * Materializes inferences based on the owl:inverseOf relationship. - * and owl:sameAs - * - * If it is added that x prop1 y, and prop2 is an inverseOf prop1 - * then add y prop2 x to the inference graph, if it is not already in - * the assertions graph. - * - */ - protected void addedABoxAssertion(Statement stmt, Model inferenceModel) { - - if (!stmt.getObject().isLiteral()) { - List inverseProperties = getInverseProperties(stmt); - Iterator inverseIter = inverseProperties.iterator(); - - while (inverseIter.hasNext()) { - Property inverseProp = inverseIter.next(); - Statement infStmt = ResourceFactory.createStatement( - stmt.getObject().asResource(), inverseProp, stmt.getSubject()); - addInference(infStmt, inferenceModel, true); - } - } - - inferenceModel.enterCriticalSection(Lock.WRITE); - try { - if (inferenceModel.contains(stmt)) { - inferenceModel.remove(stmt); - } - } finally { - inferenceModel.leaveCriticalSection(); - } - - if(doSameAs) { - doSameAsForAddedABoxAssertion( stmt, inferenceModel); - } - } - - void doSameAsForRemovedABoxAssertion(Statement stmt, Model inferenceModel){ - List sameIndividuals = - getSameIndividuals(stmt.getSubject().asResource(), inferenceModel); - Iterator sameIter = sameIndividuals.iterator(); - while (sameIter.hasNext()) { - Statement stmtSame = - ResourceFactory.createStatement(sameIter.next(), - stmt.getPredicate(), - stmt.getObject()); - removeInference(stmtSame,inferenceModel,false,true); - } - } - - protected void generateSameAsInferences(Resource ind1, Resource ind2, Model inferenceModel) { - - OntModel unionModel = VitroModelFactory.createOntologyModel(); - unionModel.addSubModel(aboxModel); - unionModel.addSubModel(inferenceModel); - - aboxModel.enterCriticalSection(Lock.READ); - try { - Iterator iter = - unionModel.listStatements(ind1, (Property) null, (RDFNode) null); - while (iter.hasNext()) { - Statement stmt = iter.next(); - if (stmt.getObject() == null) continue; - Statement infStmt = - ResourceFactory.createStatement(ind2,stmt.getPredicate(),stmt.getObject()); - addInference(infStmt, inferenceModel,true); - } - } finally { - aboxModel.leaveCriticalSection(); - } - - return; - } - - /** - * Remove inferences for individual - */ - protected void removeInferencesForIndividual(Resource ind, Model inferenceModel) { - - Model individualInferences = ModelFactory.createDefaultModel(); - - inferenceModel.enterCriticalSection(Lock.READ); - try { - Iterator iter = - inferenceModel.listStatements(ind, (Property) null, (RDFNode) null); - - while (iter.hasNext()) { - individualInferences.add(iter.next()); - } - } finally { - inferenceModel.leaveCriticalSection(); - } - - inferenceModel.enterCriticalSection(Lock.WRITE); - try { - inferenceModel.remove(individualInferences); - } finally { - inferenceModel.leaveCriticalSection(); - } - - return; - } - - /** - * compute inferences for individual - */ - protected void computeInferencesForIndividual(Resource ind, Model inferenceModel) { - - Iterator iter = null; - aboxModel.enterCriticalSection(Lock.WRITE); - try { - iter = aboxModel.listStatements(ind, (Property) null, (RDFNode) null); - } finally { - aboxModel.leaveCriticalSection(); - } - - while (iter.hasNext()) { - Statement stmt = iter.next(); - addedStatement(stmt); - } - - return; - } - + /** * Returns true if it is entailed by class subsumption that * subject is of type cls; otherwise returns false. @@ -1087,7 +653,6 @@ public class SimpleReasoner extends StatementListener { */ protected boolean entailedStatement(Statement stmt) { //TODO think about checking class subsumption here (for convenience) - // Inverse properties List inverses = getInverseProperties(stmt); Iterator iIter = inverses.iterator(); @@ -1564,211 +1129,36 @@ public class SimpleReasoner extends StatementListener { if (recomputer != null) { recomputer.setStopRequested(); } - - this.stopRequested = true; } - - - // DeltaComputer /** - * Asynchronous reasoning mode (DeltaComputer) is used in the case of batch removals. + * Asynchronous reasoning mode (DeltaComputer) no longer used + * in the case of batch removals. */ public boolean isABoxReasoningAsynchronous() { - if (batchMode > 0) { - return true; - } else { - return false; - } - } - - private volatile boolean deltaComputerProcessing = false; - private int eventCount = 0; - - @Override - public void notifyEvent(Model model, Object event) { - - if (event instanceof BulkUpdateEvent) { - handleBulkUpdateEvent(event); - } - } - - public synchronized void handleBulkUpdateEvent(Object event) { - - if (event instanceof BulkUpdateEvent) { - if (((BulkUpdateEvent) event).getBegin()) { - - log.info("received a bulk update begin event"); - if (deltaComputerProcessing) { - eventCount++; - log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); - return; - } else { - batchMode = 1; - if (aBoxDeltaModeler1.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode."); - } - - if (aBoxDeltaModeler2.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode."); - } - - log.info("initializing batch mode 1"); - } - } else { - log.info("received a bulk update end event"); - if (!deltaComputerProcessing) { - deltaComputerProcessing = true; - VitroBackgroundThread thread = new VitroBackgroundThread(new DeltaComputer(), - "SimpleReasoner.DeltaComputer"); - thread.setWorkLevel(WORKING); - thread.start(); - } else { - eventCount--; - log.info("received a bulk update end event while currently processing in aynchronous mode. Event count = " + eventCount); - } - } - } + return false; } - private synchronized boolean switchBatchModes() { - - if (batchMode == 1) { - aBoxDeltaModeler2.getRetractions().removeAll(); - - if (aBoxDeltaModeler1.getRetractions().size() > 0) { - batchMode = 2; - log.info("entering batch mode " + batchMode); - } else { - deltaComputerProcessing = false; - if (eventCount == 0) { - batchMode = 0; - } - } - } else if (batchMode == 2) { - aBoxDeltaModeler1.getRetractions().removeAll(); - - if (aBoxDeltaModeler2.getRetractions().size() > 0) { - batchMode = 1; - log.info("entering batch mode " + batchMode); - } else { - deltaComputerProcessing = false; - if (eventCount == 0) { - batchMode = 0; - } - } - } else { - log.warn("unexpected condition, invoked when batchMode is neither 1 nor 2. batchMode = " + batchMode); - deltaComputerProcessing = false; - } - - return deltaComputerProcessing; + boolean isABoxInferenceGraph(String graphURI) { + return ModelNames.ABOX_INFERENCES.equals(graphURI); + } + + boolean isTBoxGraph(String graphURI) { + return ( ModelNames.TBOX_ASSERTIONS.equals(graphURI) + || ModelNames.TBOX_INFERENCES.equals(graphURI) + || (graphURI != null && graphURI.contains("tbox")) ); + } + + @Override + public void notifyEvent(String string, Object event) { + // don't care } - - private class DeltaComputer extends Thread { - public DeltaComputer() { - } - - @Override - public void run() { - log.info("starting DeltaComputer.run"); - boolean abort = false; - Model retractions = ModelFactory.createDefaultModel(); - String qualifier = ""; - - while (deltaComputerProcessing && !stopRequested) { - - if (switchBatchModes()) { - if (batchMode == 1) { - qualifier = "2"; - retractions = aBoxDeltaModeler2.getRetractions(); - } else if (batchMode == 2) { - qualifier = "1"; - retractions = aBoxDeltaModeler1.getRetractions(); - } - } else { - break; - } - - retractions.enterCriticalSection(Lock.READ); - int num = 0; - - try { - log.info("started computing inferences for batch " + qualifier + " updates"); - - - ResIterator subIt = retractions.listSubjects(); - while (subIt.hasNext()) { - Resource subj = subIt.nextResource(); - StmtIterator iter = retractions.listStatements( - subj, null, (RDFNode) null); - boolean typesModified = false; - try { - List typeURIs = null; - while (iter.hasNext() && !stopRequested) { - Statement stmt = iter.next(); - num++; - try { - if (stmt.getPredicate().equals(RDF.type)) { - typesModified = true; - if (typeURIs == null) { - typeURIs = getRemainingAssertedTypeURIs(stmt.getSubject()); - } - removedABoxTypeAssertion(stmt, inferenceModel, typeURIs); - } else if (doSameAs && stmt.getPredicate().equals(OWL.sameAs)) { - removedABoxSameAsAssertion(stmt, inferenceModel); - } else { - removedABoxAssertion(stmt, inferenceModel); - } - doPlugins(ModelUpdate.Operation.RETRACT,stmt); - } catch (NullPointerException npe) { - abort = true; - break; - } catch (Exception e) { - log.error("exception in batch mode ",e); - } - - if ((num % 6000) == 0) { - log.info("still computing inferences for batch " + qualifier + " update..."); - } - - if (stopRequested) { - log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); - return; - } - } - } finally { - iter.close(); - if (typesModified) { - setMostSpecificTypes(subj, inferenceModel, new HashSet()); - } - } - } - } finally { - retractions.removeAll(); - retractions.leaveCriticalSection(); - } - - if (stopRequested) { - log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); - deltaComputerProcessing = false; - return; - } - - if (abort) { - log.error("a NullPointerException was received while computing inferences in batch " + qualifier + " mode. Halting inference computation."); - deltaComputerProcessing = false; - return; - } - - log.info("finished computing inferences for batch " + qualifier + " updates"); - log.debug("\t--> processed " + num + " statements"); - } - - log.info("ending DeltaComputer.run. batchMode = " + batchMode); - } - } - + + @Override + public void notifyEvent(Model model, Object event) { + // don't care + } + /** * Utility method for logging */ @@ -1779,5 +1169,5 @@ public class SimpleReasoner extends StatementListener { ? ((Literal)statement.getObject()).getLexicalForm() + " (Literal)" : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; } - + } diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index 42899c419..091aebb0a 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -2,6 +2,9 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.REBUILD_REQUESTED; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_REBUILD; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -16,18 +19,17 @@ import org.apache.jena.riot.tokens.Tokenizer; import org.apache.jena.riot.tokens.TokenizerFactory; import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.Statement; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; -import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.*; - /** * When a change is heard, wait for an interval to see if more changes come in. * When changes stop coming in for a specified interval, send what has @@ -52,8 +54,8 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex * which is semantically equivalent to the original, and add that to the list * instead. The original statement is released. */ -public class IndexingChangeListener implements ChangeListener, - SearchIndexer.Listener { +public class IndexingChangeListener extends StatementListener + implements ModelChangedListener, SearchIndexer.Listener { private static final Log log = LogFactory .getLog(IndexingChangeListener.class); @@ -105,16 +107,16 @@ public class IndexingChangeListener implements ChangeListener, } @Override - public void addedStatement(String serializedTriple, String graphURI) { + public void addedStatement(Statement stmt) { if (!rebuildScheduled) { - noteChange(parseTriple(serializedTriple)); + noteChange(stmt); } } @Override - public void removedStatement(String serializedTriple, String graphURI) { + public void removedStatement(Statement stmt) { if (!rebuildScheduled) { - noteChange(parseTriple(serializedTriple)); + noteChange(stmt); } } @@ -122,7 +124,7 @@ public class IndexingChangeListener implements ChangeListener, * We only care about events that signal the end of an edit operation. */ @Override - public void notifyEvent(String graphURI, Object event) { + public void notifyEvent(Model model, Object event) { if ((event instanceof EditEvent)) { EditEvent editEvent = (EditEvent) event; if (!editEvent.getBegin()) { // editEvent is the end of an edit @@ -135,34 +137,33 @@ public class IndexingChangeListener implements ChangeListener, } } - // TODO avoid duplication with JenaChangeListener - private Statement parseTriple(String serializedTriple) { - try { - // Use RiotReader to parse a Triple - // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " .";' - Tokenizer tokenizer = TokenizerFactory.makeTokenizerString(serializedTriple); - Iterator it = RiotReader.createParserNTriples(tokenizer, null); - - if (it.hasNext()) { - Triple triple = it.next(); - - if (it.hasNext()) { - log.warn("More than one triple parsed from change event: '" + serializedTriple + "'"); - } - - // Use the retained defaultModel instance to convert the Triple to a Statement - // This does not add the Statement to the Model, so the Statement can be disposed when unused - // And whilst the Model is attached to the Statement, using a single instance means only one Model - // is created and attached to all of the Statements created by this instance - return defaultModel.asStatement(triple); - } else { - throw new RuntimeException("no triple parsed from change event: '" + serializedTriple + "'"); - } - } catch (RuntimeException riot) { - log.error("Failed to parse triple " + serializedTriple, riot); - throw riot; - } - } +// private Statement parseTriple(String serializedTriple) { +// try { +// // Use RiotReader to parse a Triple +// // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " .";' +// Tokenizer tokenizer = TokenizerFactory.makeTokenizerString(serializedTriple); +// Iterator it = RiotReader.createParserNTriples(tokenizer, null); +// +// if (it.hasNext()) { +// Triple triple = it.next(); +// +// if (it.hasNext()) { +// log.warn("More than one triple parsed from change event: '" + serializedTriple + "'"); +// } +// +// // Use the retained defaultModel instance to convert the Triple to a Statement +// // This does not add the Statement to the Model, so the Statement can be disposed when unused +// // And whilst the Model is attached to the Statement, using a single instance means only one Model +// // is created and attached to all of the Statements created by this instance +// return defaultModel.asStatement(triple); +// } else { +// throw new RuntimeException("no triple parsed from change event: '" + serializedTriple + "'"); +// } +// } catch (RuntimeException riot) { +// log.error("Failed to parse triple " + serializedTriple, riot); +// throw riot; +// } +// } // ---------------------------------------------------------------------- // helper classes diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java index 028b1d026..1d3a867c3 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java @@ -55,7 +55,7 @@ public class SearchIndexerSetup implements ServletContextListener { listener = new IndexingChangeListener(searchIndexer); listenerWrapper = new DeveloperDisabledChangeListener(listener, Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER); - RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( + RDFServiceUtils.getRDFServiceFactory(ctx).registerJenaModelChangedListener( listenerWrapper); this.history = new IndexHistory(); @@ -78,7 +78,7 @@ public class SearchIndexerSetup implements ServletContextListener { searchIndexer.removeListener(this.history); try { - RDFServiceUtils.getRDFServiceFactory(ctx).unregisterListener( + RDFServiceUtils.getRDFServiceFactory(ctx).unregisterJenaModelChangedListener( listenerWrapper); } catch (RDFServiceException e) { log.warn("Failed to unregister the indexing listener."); diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java index 2c5d035ef..1bb0cbe7e 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java @@ -2,7 +2,11 @@ package edu.cornell.mannlib.vitro.webapp.utils.developer.listeners; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; +import com.hp.hpl.jena.rdf.listeners.StatementListener; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelChangedListener; +import com.hp.hpl.jena.rdf.model.Statement; + import edu.cornell.mannlib.vitro.webapp.utils.developer.DeveloperSettings; import edu.cornell.mannlib.vitro.webapp.utils.developer.Key; @@ -11,11 +15,12 @@ import edu.cornell.mannlib.vitro.webapp.utils.developer.Key; * * Set the flag and this becomes opaque, passing no events through. */ -public class DeveloperDisabledChangeListener implements ChangeListener { - private final ChangeListener inner; +public class DeveloperDisabledChangeListener extends StatementListener + implements ModelChangedListener { + private final ModelChangedListener inner; private final Key disablingKey; - public DeveloperDisabledChangeListener(ChangeListener inner, + public DeveloperDisabledChangeListener(ModelChangedListener inner, Key disablingKey) { this.inner = inner; this.disablingKey = disablingKey; @@ -30,23 +35,23 @@ public class DeveloperDisabledChangeListener implements ChangeListener { // ---------------------------------------------------------------------- @Override - public void addedStatement(String serializedTriple, String graphURI) { + public void addedStatement(Statement stmt) { if (isEnabled()) { - inner.addedStatement(serializedTriple, graphURI); + inner.addedStatement(stmt); } } @Override - public void removedStatement(String serializedTriple, String graphURI) { + public void removedStatement(Statement stmt) { if (isEnabled()) { - inner.removedStatement(serializedTriple, graphURI); + inner.removedStatement(stmt); } } @Override - public void notifyEvent(String graphURI, Object event) { + public void notifyEvent(Model model, Object event) { if (isEnabled()) { - inner.notifyEvent(graphURI, event); + inner.notifyEvent(model, event); } } diff --git a/api/src/test/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java b/api/src/test/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java new file mode 100644 index 000000000..130c28373 --- /dev/null +++ b/api/src/test/java/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java @@ -0,0 +1,90 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.controller.api; + +import static org.junit.Assert.*; + +import java.io.StringReader; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; + +import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.DatasetFactory; +import com.hp.hpl.jena.query.ReadWrite; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.update.GraphStore; +import com.hp.hpl.jena.update.GraphStoreFactory; +import com.hp.hpl.jena.update.UpdateAction; +import com.hp.hpl.jena.update.UpdateFactory; + +import edu.cornell.mannlib.vitro.testing.AbstractTestClass; +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; + +/** + * Test that the SparqlQueryApiExecutor can handle all query types and all + * formats. + */ +public class SparqlUpdateApiTest extends AbstractTestClass { + + private final String GRAPH_URI = "http://example.org/graph"; + + private final String updateStr1 = + "INSERT DATA { GRAPH <" + GRAPH_URI + "> { \n" + + " a . \n" + + "} } ; \n" + + "INSERT { GRAPH <" + GRAPH_URI + "> { \n " + + " ?x a . \n " + + "} } WHERE { \n" + + " GRAPH <" + GRAPH_URI + "> { ?x a } \n " + + "}"; + + private final String result1 = + " a . \n" + + " a ." ; + + // look at how the SimpleReasoner is set up. + + private Model model; + private RDFService rdfService; + + @Before + public void setup() { + model = ModelFactory.createDefaultModel(); + Dataset ds = DatasetFactory.createMem(); + ds.addNamedModel(GRAPH_URI, model); + rdfService = new RDFServiceModel(ds); + } + + // ---------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------- + + @Test + public void nullRdfService() throws Exception { + model.removeAll(); + Model desiredResults = ModelFactory.createDefaultModel(); + desiredResults.read(new StringReader(result1), null, "N3"); + Dataset ds = new RDFServiceDataset(rdfService); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + System.out.println("yep"); + } + UpdateAction.execute(UpdateFactory.create(updateStr1), graphStore); + } finally { + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + } + assertEquals("updateStr1 yields result1", desiredResults.toString(), model.toString()); + } + +}