diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index d8b1b9d97..a8ad33786 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -5,9 +5,8 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,6 +25,8 @@ import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; @@ -53,10 +54,9 @@ public class RDFServiceGraph implements GraphWithPerform { private PrefixMapping prefixMapping = new PrefixMappingImpl(); private GraphEventManager eventManager; - private boolean queueWrites = false; - private ConcurrentLinkedQueue addTripleQueue = new ConcurrentLinkedQueue(); - private ConcurrentLinkedQueue removeTripleQueue = new ConcurrentLinkedQueue(); - + private boolean inTransaction = false; + private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); + private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); /** * Returns a SparqlGraph for the union of named graphs in a remote repository @@ -97,19 +97,20 @@ public class RDFServiceGraph implements GraphWithPerform { return sb.toString(); } - public void flush() { - log.debug("Flushing a batch"); + private synchronized void flush() { ChangeSet changeSet = rdfService.manufactureChangeSet(); try { - if(!removeTripleQueue.isEmpty()) { - String removals = serializeQueue(removeTripleQueue); + if(!removalsGraph.isEmpty()) { + String removals = serializeGraph(removalsGraph); changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), RDFService.ModelSerializationFormat.N3, graphURI); + removalsGraph.clear(); } - if(!addTripleQueue.isEmpty()) { - String additions = serializeQueue(addTripleQueue); + if(!additionsGraph.isEmpty()) { + String additions = serializeGraph(additionsGraph); changeSet.addAddition(RDFServiceUtils.toInputStream(additions), RDFService.ModelSerializationFormat.N3, graphURI); + additionsGraph.clear(); } rdfService.changeSetUpdate(changeSet); } catch (RDFServiceException rdfse) { @@ -117,27 +118,60 @@ public class RDFServiceGraph implements GraphWithPerform { } } - private String serializeQueue(Queue tripleQueue) { + private synchronized String serializeGraph(Graph graph) { String triples = ""; - while(!tripleQueue.isEmpty()) { - triples += " \n" + serialize(tripleQueue.poll()); + Iterator tripIt = graph.find(null, null, null); + while(tripIt.hasNext()) { + triples += " \n" + serialize(tripIt.next()); } return triples; } @Override public void performAdd(Triple t) { - addTripleQueue.add(t); - if(!queueWrites) { - flush(); + if(inTransaction) { + stageAddition(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private void stageAddition(Triple t) { + if(removalsGraph.contains(t)) { + removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + additionsGraph.add(t); } } @Override public void performDelete(Triple t) { - removeTripleQueue.add(t); - if(!queueWrites) { - flush(); + if(inTransaction) { + stageDeletion(t); + } else { + ChangeSet changeSet = rdfService.manufactureChangeSet(); + try { + changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), + RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + } + + private synchronized void stageDeletion(Triple t) { + if(additionsGraph.contains(t)) { + additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); + } else { + removalsGraph.add(t); } } @@ -148,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform { } String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; try { - InputStream model = rdfService.sparqlConstructQuery( - constructStr, RDFService.ModelSerializationFormat.N3); - ChangeSet changeSet = rdfService.manufactureChangeSet(); - changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); + if(inTransaction) { + Model model = ModelFactory.createDefaultModel(); + rdfService.sparqlConstructQuery(constructStr, model); + stageRemoveAll(model); + } else { + InputStream model = rdfService.sparqlConstructQuery( + constructStr, RDFService.ModelSerializationFormat.N3); + ChangeSet changeSet = rdfService.manufactureChangeSet(); + changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); + rdfService.changeSetUpdate(changeSet); + } } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } } + private void stageRemoveAll(Model removals) { + StmtIterator sit = removals.listStatements(); + while (sit.hasNext()) { + Triple t = sit.nextStatement().asTriple(); + stageDeletion(t); + } + } + @Override public void close() { // can't close a remote endpoint @@ -191,7 +239,22 @@ public class RDFServiceGraph implements GraphWithPerform { ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); execSelect(containsQuery.toString(), consumer); - return consumer.hasResult(); + boolean initialResult = consumer.hasResult(); + if(!inTransaction) { + return initialResult; + } else { + Triple t = Triple.create(subject, predicate, object); + return (initialResult || additionsGraphContains(t)) + && !removalsGraphContains(t); + } + } + + private synchronized boolean additionsGraphContains(Triple t) { + return additionsGraph.contains(t); + } + + private synchronized boolean removalsGraphContains(Triple t) { + return removalsGraph.contains(t); } @Override @@ -284,7 +347,11 @@ public class RDFServiceGraph implements GraphWithPerform { String queryString = findQuery.toString(); final List triplist = new ArrayList(); - + if(inTransaction) { + addAdditions(triplist, additionsGraph.find(subject, predicate, object)); + subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); + } + execSelect(queryString, new ResultSetConsumer() { @Override protected void processQuerySolution(QuerySolution qs) { @@ -311,6 +378,24 @@ public class RDFServiceGraph implements GraphWithPerform { return WrappedIterator.create(triplist.iterator()); } + private void addAdditions(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(!tripList.contains(t)) { + tripList.add(t); + } + } + } + + private void subtractRemovals(List tripList, ExtendedIterator tripIt) { + while(tripIt.hasNext()) { + Triple t = tripIt.next(); + if(tripList.contains(t)) { + tripList.remove(t); + } + } + } + private boolean isVar(Node node) { return (node == null || node.isVariable() || node == Node.ANY); } @@ -430,21 +515,21 @@ public class RDFServiceGraph implements GraphWithPerform { private final TransactionHandler transactionHandler = new TransactionHandler() { @Override - public void abort() { - queueWrites = false; - removeTripleQueue.clear(); - addTripleQueue.clear(); + public synchronized void abort() { + inTransaction = false; + removalsGraph.clear(); + additionsGraph.clear(); } @Override - public void begin() { - queueWrites = true; + public synchronized void begin() { + inTransaction = true; } @Override - public void commit() { + public synchronized void commit() { flush(); - queueWrites = false; + inTransaction = false; } @Override diff --git a/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java new file mode 100644 index 000000000..130c28373 --- /dev/null +++ b/webapp/test/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiTest.java @@ -0,0 +1,90 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.controller.api; + +import static org.junit.Assert.*; + +import java.io.StringReader; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Test; + +import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.DatasetFactory; +import com.hp.hpl.jena.query.ReadWrite; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.update.GraphStore; +import com.hp.hpl.jena.update.GraphStoreFactory; +import com.hp.hpl.jena.update.UpdateAction; +import com.hp.hpl.jena.update.UpdateFactory; + +import edu.cornell.mannlib.vitro.testing.AbstractTestClass; +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; + +/** + * Test that the SparqlQueryApiExecutor can handle all query types and all + * formats. + */ +public class SparqlUpdateApiTest extends AbstractTestClass { + + private final String GRAPH_URI = "http://example.org/graph"; + + private final String updateStr1 = + "INSERT DATA { GRAPH <" + GRAPH_URI + "> { \n" + + " a . \n" + + "} } ; \n" + + "INSERT { GRAPH <" + GRAPH_URI + "> { \n " + + " ?x a . \n " + + "} } WHERE { \n" + + " GRAPH <" + GRAPH_URI + "> { ?x a } \n " + + "}"; + + private final String result1 = + " a . \n" + + " a ." ; + + // look at how the SimpleReasoner is set up. + + private Model model; + private RDFService rdfService; + + @Before + public void setup() { + model = ModelFactory.createDefaultModel(); + Dataset ds = DatasetFactory.createMem(); + ds.addNamedModel(GRAPH_URI, model); + rdfService = new RDFServiceModel(ds); + } + + // ---------------------------------------------------------------------- + // Tests + // ---------------------------------------------------------------------- + + @Test + public void nullRdfService() throws Exception { + model.removeAll(); + Model desiredResults = ModelFactory.createDefaultModel(); + desiredResults.read(new StringReader(result1), null, "N3"); + Dataset ds = new RDFServiceDataset(rdfService); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + System.out.println("yep"); + } + UpdateAction.execute(UpdateFactory.create(updateStr1), graphStore); + } finally { + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + } + assertEquals("updateStr1 yields result1", desiredResults.toString(), model.toString()); + } + +}