From 6e3a25659660aea9cf44d0cbb6b196fa2b2075e9 Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Sat, 19 Dec 2015 18:43:16 +0200 Subject: [PATCH] fix to allow for batch handling of more complex SPARQL updates --- .../webapp/dao/jena/RDFServiceGraph.java | 83 ++++++++++++----- .../controller/api/SparqlUpdateApiTest.java | 90 +++++++++++++++++++ 2 files changed, 150 insertions(+), 23 deletions(-) create mode 100644 webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index d8b1b9d97..7ef887a1a 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -5,9 +5,8 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,6 +25,7 @@ import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; @@ -54,9 +54,8 @@ public class RDFServiceGraph implements GraphWithPerform { private GraphEventManager eventManager; private boolean queueWrites = false; - private ConcurrentLinkedQueue addTripleQueue = new ConcurrentLinkedQueue(); - private ConcurrentLinkedQueue removeTripleQueue = new ConcurrentLinkedQueue(); - + private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); + private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); /** * Returns a SparqlGraph for the union of named graphs in a remote repository @@ -64,6 +63,7 @@ public class RDFServiceGraph implements GraphWithPerform { */ public RDFServiceGraph(RDFService rdfService) { this(rdfService, null); + log.info("using graph implementation: " + additionsGraph.getClass().getName()); } /** @@ -97,17 +97,17 @@ public class RDFServiceGraph implements GraphWithPerform { return sb.toString(); } - public void flush() { + public synchronized void flush() { log.debug("Flushing a batch"); ChangeSet changeSet = rdfService.manufactureChangeSet(); try { - if(!removeTripleQueue.isEmpty()) { - String removals = serializeQueue(removeTripleQueue); + if(!removalsGraph.isEmpty()) { + String removals = serializeGraph(removalsGraph); changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), RDFService.ModelSerializationFormat.N3, graphURI); } - if(!addTripleQueue.isEmpty()) { - String additions = serializeQueue(addTripleQueue); + if(!additionsGraph.isEmpty()) { + String additions = serializeGraph(additionsGraph); changeSet.addAddition(RDFServiceUtils.toInputStream(additions), RDFService.ModelSerializationFormat.N3, graphURI); } @@ -117,25 +117,34 @@ public class RDFServiceGraph implements GraphWithPerform { } } - private String serializeQueue(Queue tripleQueue) { + private synchronized String serializeGraph(Graph graph) { String triples = ""; - while(!tripleQueue.isEmpty()) { - triples += " \n" + serialize(tripleQueue.poll()); + Iterator tripIt = graph.find(null, null, null); + while(tripIt.hasNext()) { + triples += " \n" + serialize(tripIt.next()); } return triples; } @Override - public void performAdd(Triple t) { - addTripleQueue.add(t); + public synchronized void performAdd(Triple t) { + if(removalsGraph.contains(t)) { + removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + additionsGraph.add(t); + } if(!queueWrites) { flush(); } } @Override - public void performDelete(Triple t) { - removeTripleQueue.add(t); + public synchronized void performDelete(Triple t) { + if(additionsGraph.contains(t)) { + additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + removalsGraph.add(t); + } if(!queueWrites) { flush(); } @@ -191,7 +200,13 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); - return consumer.hasResult(); + boolean initialResult = consumer.hasResult(); + if(!queueWrites) { + return initialResult; + } else { + Triple t = Triple.create(subject, predicate, object); + return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t); + } } @Override @@ -284,7 +299,11 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - + if(queueWrites) { + addAdditions(triplist, additionsGraph.find(subject, predicate, object)); + subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); + } + execSelect(queryString, new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { @@ -311,6 +330,24 @@ public class RDFServiceGraph implements GraphWithPerform { return WrappedIterator.create(triplist.iterator()); } + private void addAdditions(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(!tripList.contains(t)) { + tripList.add(t); + } + } + } + + private void subtractRemovals(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(tripList.contains(t)) { + tripList.remove(t); + } + } + } + private boolean isVar(Node node) { return (node == null || node.isVariable() || node == Node.ANY); } @@ -430,10 +467,10 @@ public class RDFServiceGraph implements GraphWithPerform { private final TransactionHandler transactionHandler = new TransactionHandler() { @Override - public void abort() { + public synchronized void abort() { queueWrites = false; - removeTripleQueue.clear(); - addTripleQueue.clear(); + removalsGraph.clear(); + additionsGraph.clear(); } @Override @@ -442,7 +479,7 @@ public class RDFServiceGraph implements GraphWithPerform { } @Override - public void commit() { + public synchronized void commit() { flush(); queueWrites = false; } diff --git a/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java new file mode 100644 index 000000000..130c28373 --- /dev/null +++ b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java @@ -0,0 +1,90 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.controller.api; + +import static org.junit.Assert.*; + +import java.io.StringReader; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; + +import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.DatasetFactory; +import com.hp.hpl.jena.query.ReadWrite; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.update.GraphStore; +import com.hp.hpl.jena.update.GraphStoreFactory; +import com.hp.hpl.jena.update.UpdateAction; +import com.hp.hpl.jena.update.UpdateFactory; + +import edu.cornell.mannlib.vitro.testing.AbstractTestClass; +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; + +/** + * Test that the SparqlQueryApiExecutor can handle all query types and all + * formats. + */ +public class SparqlUpdateApiTest extends AbstractTestClass { + + private final String GRAPH_URI = "http://example.org/graph"; + + private final String updateStr1 = + "INSERT DATA { GRAPH <" + GRAPH_URI + "> { \n" + + " a . \n" + + "} } ; \n" + + "INSERT { GRAPH <" + GRAPH_URI + "> { \n " + + " ?x a . \n " + + "} } WHERE { \n" + + " GRAPH <" + GRAPH_URI + "> { ?x a } \n " + + "}"; + + private final String result1 = + " a . \n" + + " a ." ; + + // look at how the SimpleReasoner is set up. + + private Model model; + private RDFService rdfService; + + @Before + public void setup() { + model = ModelFactory.createDefaultModel(); + Dataset ds = DatasetFactory.createMem(); + ds.addNamedModel(GRAPH_URI, model); + rdfService = new RDFServiceModel(ds); + } + + // ---------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------- + + @Test + public void nullRdfService() throws Exception { + model.removeAll(); + Model desiredResults = ModelFactory.createDefaultModel(); + desiredResults.read(new StringReader(result1), null, "N3"); + Dataset ds = new RDFServiceDataset(rdfService); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + System.out.println("yep"); + } + UpdateAction.execute(UpdateFactory.create(updateStr1), graphStore); + } finally { + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + } + assertEquals("updateStr1 yields result1", desiredResults.toString(), model.toString()); + } + +}