From 6e3a25659660aea9cf44d0cbb6b196fa2b2075e9 Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Sat, 19 Dec 2015 18:43:16 +0200 Subject: [PATCH 1/2] fix to allow for batch handling of more complex SPARQL updates --- .../webapp/dao/jena/RDFServiceGraph.java | 83 ++++++++++++----- .../controller/api/SparqlUpdateApiTest.java | 90 +++++++++++++++++++ 2 files changed, 150 insertions(+), 23 deletions(-) create mode 100644 webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index d8b1b9d97..7ef887a1a 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -5,9 +5,8 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,6 +25,7 @@ import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; @@ -54,9 +54,8 @@ public class RDFServiceGraph implements GraphWithPerform { private GraphEventManager eventManager; private boolean queueWrites = false; - private ConcurrentLinkedQueue addTripleQueue = new ConcurrentLinkedQueue(); - private ConcurrentLinkedQueue removeTripleQueue = new ConcurrentLinkedQueue(); - + private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); + private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); /** * Returns a SparqlGraph for the union of named graphs in a remote repository @@ -64,6 +63,7 @@ public class RDFServiceGraph implements GraphWithPerform { */ public RDFServiceGraph(RDFService rdfService) { this(rdfService, null); + log.info("using graph implementation: " + additionsGraph.getClass().getName()); } /** @@ -97,17 +97,17 @@ public class RDFServiceGraph implements GraphWithPerform { return sb.toString(); } - public void flush() { + public synchronized void flush() { log.debug("Flushing a batch"); ChangeSet changeSet = rdfService.manufactureChangeSet(); try { - if(!removeTripleQueue.isEmpty()) { - String removals = serializeQueue(removeTripleQueue); + if(!removalsGraph.isEmpty()) { + String removals = serializeGraph(removalsGraph); changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), RDFService.ModelSerializationFormat.N3, graphURI); } - if(!addTripleQueue.isEmpty()) { - String additions = serializeQueue(addTripleQueue); + if(!additionsGraph.isEmpty()) { + String additions = serializeGraph(additionsGraph); changeSet.addAddition(RDFServiceUtils.toInputStream(additions), RDFService.ModelSerializationFormat.N3, graphURI); } @@ -117,25 +117,34 @@ public class RDFServiceGraph implements GraphWithPerform { } } - private String serializeQueue(Queue tripleQueue) { + private synchronized String serializeGraph(Graph graph) { String triples = ""; - while(!tripleQueue.isEmpty()) { - triples += " \n" + serialize(tripleQueue.poll()); + Iterator tripIt = graph.find(null, null, null); + while(tripIt.hasNext()) { + triples += " \n" + serialize(tripIt.next()); } return triples; } @Override - public void performAdd(Triple t) { - addTripleQueue.add(t); + public synchronized void performAdd(Triple t) { + if(removalsGraph.contains(t)) { + removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + additionsGraph.add(t); + } if(!queueWrites) { flush(); } } @Override - public void performDelete(Triple t) { - removeTripleQueue.add(t); + public synchronized void performDelete(Triple t) { + if(additionsGraph.contains(t)) { + additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + removalsGraph.add(t); + } if(!queueWrites) { flush(); } @@ -191,7 +200,13 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); - return consumer.hasResult(); + boolean initialResult = consumer.hasResult(); + if(!queueWrites) { + return initialResult; + } else { + Triple t = Triple.create(subject, predicate, object); + return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t); + } } @Override @@ -284,7 +299,11 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - + if(queueWrites) { + addAdditions(triplist, additionsGraph.find(subject, predicate, object)); + subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); + } + execSelect(queryString, new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { @@ -311,6 +330,24 @@ public class RDFServiceGraph implements GraphWithPerform { return WrappedIterator.create(triplist.iterator()); } + private void addAdditions(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(!tripList.contains(t)) { + tripList.add(t); + } + } + } + + private void subtractRemovals(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(tripList.contains(t)) { + tripList.remove(t); + } + } + } + private boolean isVar(Node node) { return (node == null || node.isVariable() || node == Node.ANY); } @@ -430,10 +467,10 @@ public class RDFServiceGraph implements GraphWithPerform { private final TransactionHandler transactionHandler = new TransactionHandler() { @Override - public void abort() { + public synchronized void abort() { queueWrites = false; - removeTripleQueue.clear(); - addTripleQueue.clear(); + removalsGraph.clear(); + additionsGraph.clear(); } @Override @@ -442,7 +479,7 @@ public class RDFServiceGraph implements GraphWithPerform { } @Override - public void commit() { + public synchronized void commit() { flush(); queueWrites = false; } diff --git a/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java new file mode 100644 index 000000000..130c28373 --- /dev/null +++ b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java @@ -0,0 +1,90 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.controller.api; + +import static org.junit.Assert.*; + +import java.io.StringReader; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; + +import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.DatasetFactory; +import com.hp.hpl.jena.query.ReadWrite; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.update.GraphStore; +import com.hp.hpl.jena.update.GraphStoreFactory; +import com.hp.hpl.jena.update.UpdateAction; +import com.hp.hpl.jena.update.UpdateFactory; + +import edu.cornell.mannlib.vitro.testing.AbstractTestClass; +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; + +/** + * Test that the SparqlQueryApiExecutor can handle all query types and all + * formats. + */ +public class SparqlUpdateApiTest extends AbstractTestClass { + + private final String GRAPH_URI = "http://example.org/graph"; + + private final String updateStr1 = + "INSERT DATA { GRAPH <" + GRAPH_URI + "> { \n" + + " a . \n" + + "} } ; \n" + + "INSERT { GRAPH <" + GRAPH_URI + "> { \n " + + " ?x a . \n " + + "} } WHERE { \n" + + " GRAPH <" + GRAPH_URI + "> { ?x a } \n " + + "}"; + + private final String result1 = + " a . \n" + + " a ." ; + + // look at how the SimpleReasoner is set up. + + private Model model; + private RDFService rdfService; + + @Before + public void setup() { + model = ModelFactory.createDefaultModel(); + Dataset ds = DatasetFactory.createMem(); + ds.addNamedModel(GRAPH_URI, model); + rdfService = new RDFServiceModel(ds); + } + + // ---------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------- + + @Test + public void nullRdfService() throws Exception { + model.removeAll(); + Model desiredResults = ModelFactory.createDefaultModel(); + desiredResults.read(new StringReader(result1), null, "N3"); + Dataset ds = new RDFServiceDataset(rdfService); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + System.out.println("yep"); + } + UpdateAction.execute(UpdateFactory.create(updateStr1), graphStore); + } finally { + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + } + assertEquals("updateStr1 yields result1", desiredResults.toString(), model.toString()); + } + +} From 64cfc4a370d8c751b239d37aa4575277d6550ab1 Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Sun, 20 Dec 2015 18:44:11 +0200 Subject: [PATCH 2/2] improvements to RDFServiceGraph triple batching --- .../webapp/dao/jena/RDFServiceGraph.java | 96 ++++++++++++++----- 1 file changed, 72 insertions(+), 24 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index 7ef887a1a..a8ad33786 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -26,6 +26,7 @@ import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; @@ -53,7 +54,7 @@ public class RDFServiceGraph implements GraphWithPerform { private PrefixMapping prefixMapping = new PrefixMappingImpl(); private GraphEventManager eventManager; - private boolean queueWrites = false; + private boolean inTransaction = false; private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); @@ -63,7 +64,6 @@ public class RDFServiceGraph implements GraphWithPerform { */ public RDFServiceGraph(RDFService rdfService) { this(rdfService, null); - log.info("using graph implementation: " + additionsGraph.getClass().getName()); } /** @@ -97,19 +97,20 @@ public class RDFServiceGraph implements GraphWithPerform { return sb.toString(); } - public synchronized void flush() { - log.debug("Flushing a batch"); + private synchronized void flush() { ChangeSet changeSet = rdfService.manufactureChangeSet(); try { if(!removalsGraph.isEmpty()) { String removals = serializeGraph(removalsGraph); changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), RDFService.ModelSerializationFormat.N3, graphURI); + removalsGraph.clear(); } if(!additionsGraph.isEmpty()) { String additions = serializeGraph(additionsGraph); changeSet.addAddition(RDFServiceUtils.toInputStream(additions), RDFService.ModelSerializationFormat.N3, graphURI); + additionsGraph.clear(); } rdfService.changeSetUpdate(changeSet); } catch (RDFServiceException rdfse) { @@ -127,27 +128,51 @@ public class RDFServiceGraph implements GraphWithPerform { } @Override - public synchronized void performAdd(Triple t) { + public void performAdd(Triple t) { + if(inTransaction) { + stageAddition(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private void stageAddition(Triple t) { if(removalsGraph.contains(t)) { removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); } else { additionsGraph.add(t); } - if(!queueWrites) { - flush(); - } } @Override - public synchronized void performDelete(Triple t) { + public void performDelete(Triple t) { + if(inTransaction) { + stageDeletion(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private synchronized void stageDeletion(Triple t) { if(additionsGraph.contains(t)) { additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); } else { removalsGraph.add(t); } - if(!queueWrites) { - flush(); - } } public void removeAll() { @@ -157,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform { } String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; try { - InputStream model = rdfService.sparqlConstructQuery( - constructStr, RDFService.ModelSerializationFormat.N3); - ChangeSet changeSet = rdfService.manufactureChangeSet(); - changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); + if(inTransaction) { + Model model = ModelFactory.createDefaultModel(); + rdfService.sparqlConstructQuery(constructStr, model); + stageRemoveAll(model); + } else { + InputStream model = rdfService.sparqlConstructQuery( + constructStr, RDFService.ModelSerializationFormat.N3); + ChangeSet changeSet = rdfService.manufactureChangeSet(); + changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } } + private void stageRemoveAll(Model removals) { + StmtIterator sit = removals.listStatements(); + while (sit.hasNext()) { + Triple t = sit.nextStatement().asTriple(); + stageDeletion(t); + } + } + @Override public void close() { // can't close a remote endpoint @@ -201,13 +240,22 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); boolean initialResult = consumer.hasResult(); - if(!queueWrites) { + if(!inTransaction) { return initialResult; } else { Triple t = Triple.create(subject, predicate, object); - return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t); + return (initialResult || additionsGraphContains(t)) + && !removalsGraphContains(t); } } + + private synchronized boolean additionsGraphContains(Triple t) { + return additionsGraph.contains(t); + } + + private synchronized boolean removalsGraphContains(Triple t) { + return removalsGraph.contains(t); + } @Override public void delete(Triple arg0) throws DeleteDeniedException { @@ -299,7 +347,7 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - if(queueWrites) { + if(inTransaction) { addAdditions(triplist, additionsGraph.find(subject, predicate, object)); subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); } @@ -468,20 +516,20 @@ public class RDFServiceGraph implements GraphWithPerform { private final TransactionHandler transactionHandler = new TransactionHandler() { @Override public synchronized void abort() { - queueWrites = false; + inTransaction = false; removalsGraph.clear(); additionsGraph.clear(); } @Override - public void begin() { - queueWrites = true; + public synchronized void begin() { + inTransaction = true; } @Override public synchronized void commit() { flush(); - queueWrites = false; + inTransaction = false; } @Override