fix to allow for batch handling of more complex SPARQL updates

This commit is contained in:
brianjlowe 2015-12-19 18:43:16 +02:00
parent ce1ec1158f
commit 6e3a256596
2 changed files with 150 additions and 23 deletions

View file

@ -5,9 +5,8 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -26,6 +25,7 @@ import com.hp.hpl.jena.graph.impl.SimpleEventManager;
import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.rdf.listeners.StatementListener;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.shared.AddDeniedException;
import com.hp.hpl.jena.shared.Command;
import com.hp.hpl.jena.shared.DeleteDeniedException;
@ -54,9 +54,8 @@ public class RDFServiceGraph implements GraphWithPerform {
private GraphEventManager eventManager;
private boolean queueWrites = false;
private ConcurrentLinkedQueue<Triple> addTripleQueue = new ConcurrentLinkedQueue<Triple>();
private ConcurrentLinkedQueue<Triple> removeTripleQueue = new ConcurrentLinkedQueue<Triple>();
private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph();
private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph();
/**
* Returns a SparqlGraph for the union of named graphs in a remote repository
@ -64,6 +63,7 @@ public class RDFServiceGraph implements GraphWithPerform {
*/
public RDFServiceGraph(RDFService rdfService) {
this(rdfService, null);
log.info("using graph implementation: " + additionsGraph.getClass().getName());
}
/**
@ -97,17 +97,17 @@ public class RDFServiceGraph implements GraphWithPerform {
return sb.toString();
}
public void flush() {
public synchronized void flush() {
log.debug("Flushing a batch");
ChangeSet changeSet = rdfService.manufactureChangeSet();
try {
if(!removeTripleQueue.isEmpty()) {
String removals = serializeQueue(removeTripleQueue);
if(!removalsGraph.isEmpty()) {
String removals = serializeGraph(removalsGraph);
changeSet.addRemoval(RDFServiceUtils.toInputStream(removals),
RDFService.ModelSerializationFormat.N3, graphURI);
}
if(!addTripleQueue.isEmpty()) {
String additions = serializeQueue(addTripleQueue);
if(!additionsGraph.isEmpty()) {
String additions = serializeGraph(additionsGraph);
changeSet.addAddition(RDFServiceUtils.toInputStream(additions),
RDFService.ModelSerializationFormat.N3, graphURI);
}
@ -117,25 +117,34 @@ public class RDFServiceGraph implements GraphWithPerform {
}
}
private String serializeQueue(Queue<Triple> tripleQueue) {
private synchronized String serializeGraph(Graph graph) {
String triples = "";
while(!tripleQueue.isEmpty()) {
triples += " \n" + serialize(tripleQueue.poll());
Iterator<Triple> tripIt = graph.find(null, null, null);
while(tripIt.hasNext()) {
triples += " \n" + serialize(tripIt.next());
}
return triples;
}
@Override
public void performAdd(Triple t) {
addTripleQueue.add(t);
public synchronized void performAdd(Triple t) {
if(removalsGraph.contains(t)) {
removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else {
additionsGraph.add(t);
}
if(!queueWrites) {
flush();
}
}
@Override
public void performDelete(Triple t) {
removeTripleQueue.add(t);
public synchronized void performDelete(Triple t) {
if(additionsGraph.contains(t)) {
additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else {
removalsGraph.add(t);
}
if(!queueWrites) {
flush();
}
@ -191,7 +200,13 @@ public class RDFServiceGraph implements GraphWithPerform {
ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult();
execSelect(containsQuery.toString(), consumer);
return consumer.hasResult();
boolean initialResult = consumer.hasResult();
if(!queueWrites) {
return initialResult;
} else {
Triple t = Triple.create(subject, predicate, object);
return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t);
}
}
@Override
@ -284,7 +299,11 @@ public class RDFServiceGraph implements GraphWithPerform {
String queryString = findQuery.toString();
final List<Triple> triplist = new ArrayList<Triple>();
if(queueWrites) {
addAdditions(triplist, additionsGraph.find(subject, predicate, object));
subtractRemovals(triplist, removalsGraph.find(subject, predicate, object));
}
execSelect(queryString, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
@ -311,6 +330,24 @@ public class RDFServiceGraph implements GraphWithPerform {
return WrappedIterator.create(triplist.iterator());
}
private void addAdditions(List<Triple> tripList, ExtendedIterator<Triple> tripIt) {
while(tripIt.hasNext()) {
Triple t = tripIt.next();
if(!tripList.contains(t)) {
tripList.add(t);
}
}
}
private void subtractRemovals(List<Triple> tripList, ExtendedIterator<Triple> tripIt) {
while(tripIt.hasNext()) {
Triple t = tripIt.next();
if(tripList.contains(t)) {
tripList.remove(t);
}
}
}
private boolean isVar(Node node) {
return (node == null || node.isVariable() || node == Node.ANY);
}
@ -430,10 +467,10 @@ public class RDFServiceGraph implements GraphWithPerform {
private final TransactionHandler transactionHandler = new TransactionHandler() {
@Override
public void abort() {
public synchronized void abort() {
queueWrites = false;
removeTripleQueue.clear();
addTripleQueue.clear();
removalsGraph.clear();
additionsGraph.clear();
}
@Override
@ -442,7 +479,7 @@ public class RDFServiceGraph implements GraphWithPerform {
}
@Override
public void commit() {
public synchronized void commit() {
flush();
queueWrites = false;
}