From 64cfc4a370d8c751b239d37aa4575277d6550ab1 Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Sun, 20 Dec 2015 18:44:11 +0200 Subject: [PATCH] improvements to RDFServiceGraph triple batching --- .../webapp/dao/jena/RDFServiceGraph.java | 96 ++++++++++++++----- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index 7ef887a1a..a8ad33786 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -26,6 +26,7 @@ import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; @@ -53,7 +54,7 @@ public class RDFServiceGraph implements GraphWithPerform { private PrefixMapping prefixMapping = new PrefixMappingImpl(); private GraphEventManager eventManager; - private boolean queueWrites = false; + private boolean inTransaction = false; private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); @@ -63,7 +64,6 @@ public class RDFServiceGraph implements GraphWithPerform { */ public RDFServiceGraph(RDFService rdfService) { this(rdfService, null); - log.info("using graph implementation: " + additionsGraph.getClass().getName()); } /** @@ -97,19 +97,20 @@ public class RDFServiceGraph implements GraphWithPerform { return sb.toString(); } - public synchronized void flush() { - log.debug("Flushing a batch"); + private synchronized void flush() { ChangeSet changeSet = rdfService.manufactureChangeSet(); try { if(!removalsGraph.isEmpty()) { String removals = serializeGraph(removalsGraph); changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), RDFService.ModelSerializationFormat.N3, graphURI); + removalsGraph.clear(); } if(!additionsGraph.isEmpty()) { String additions = serializeGraph(additionsGraph); changeSet.addAddition(RDFServiceUtils.toInputStream(additions), RDFService.ModelSerializationFormat.N3, graphURI); + additionsGraph.clear(); } rdfService.changeSetUpdate(changeSet); } catch (RDFServiceException rdfse) { @@ -127,27 +128,51 @@ public class RDFServiceGraph implements GraphWithPerform { } @Override - public synchronized void performAdd(Triple t) { + public void performAdd(Triple t) { + if(inTransaction) { + stageAddition(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private void stageAddition(Triple t) { if(removalsGraph.contains(t)) { removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); } else { additionsGraph.add(t); } - if(!queueWrites) { - flush(); - } } @Override - public synchronized void performDelete(Triple t) { + public void performDelete(Triple t) { + if(inTransaction) { + stageDeletion(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private synchronized void stageDeletion(Triple t) { if(additionsGraph.contains(t)) { additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); } else { removalsGraph.add(t); } - if(!queueWrites) { - flush(); - } } public void removeAll() { @@ -157,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform { } String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; try { - InputStream model = rdfService.sparqlConstructQuery( - constructStr, RDFService.ModelSerializationFormat.N3); - ChangeSet changeSet = rdfService.manufactureChangeSet(); - changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); + if(inTransaction) { + Model model = ModelFactory.createDefaultModel(); + rdfService.sparqlConstructQuery(constructStr, model); + stageRemoveAll(model); + } else { + InputStream model = rdfService.sparqlConstructQuery( + constructStr, RDFService.ModelSerializationFormat.N3); + ChangeSet changeSet = rdfService.manufactureChangeSet(); + changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } } + private void stageRemoveAll(Model removals) { + StmtIterator sit = removals.listStatements(); + while (sit.hasNext()) { + Triple t = sit.nextStatement().asTriple(); + stageDeletion(t); + } + } + @Override public void close() { // can't close a remote endpoint @@ -201,13 +240,22 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); boolean initialResult = consumer.hasResult(); - if(!queueWrites) { + if(!inTransaction) { return initialResult; } else { Triple t = Triple.create(subject, predicate, object); - return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t); + return (initialResult || additionsGraphContains(t)) + && !removalsGraphContains(t); } } + + private synchronized boolean additionsGraphContains(Triple t) { + return additionsGraph.contains(t); + } + + private synchronized boolean removalsGraphContains(Triple t) { + return removalsGraph.contains(t); + } @Override public void delete(Triple arg0) throws DeleteDeniedException { @@ -299,7 +347,7 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - if(queueWrites) { + if(inTransaction) { addAdditions(triplist, additionsGraph.find(subject, predicate, object)); subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); } @@ -468,20 +516,20 @@ public class RDFServiceGraph implements GraphWithPerform { private final TransactionHandler transactionHandler = new TransactionHandler() { @Override public synchronized void abort() { - queueWrites = false; + inTransaction = false; removalsGraph.clear(); additionsGraph.clear(); } @Override - public void begin() { - queueWrites = true; + public synchronized void begin() { + inTransaction = true; } @Override public synchronized void commit() { flush(); - queueWrites = false; + inTransaction = false; } @Override