improvements to RDFServiceGraph triple batching

This commit is contained in:
brianjlowe 2015-12-20 18:44:11 +02:00
parent 6e3a256596
commit 64cfc4a370

View file

@ -26,6 +26,7 @@ import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.listeners.StatementListener;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.AddDeniedException;
import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.Command;
import com.hp.hpl.jena.shared.DeleteDeniedException; import com.hp.hpl.jena.shared.DeleteDeniedException;
@ -53,7 +54,7 @@ public class RDFServiceGraph implements GraphWithPerform {
private PrefixMapping prefixMapping = new PrefixMappingImpl(); private PrefixMapping prefixMapping = new PrefixMappingImpl();
private GraphEventManager eventManager; private GraphEventManager eventManager;
private boolean queueWrites = false; private boolean inTransaction = false;
private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph(); private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph();
private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph(); private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph();
@ -63,7 +64,6 @@ public class RDFServiceGraph implements GraphWithPerform {
*/ */
public RDFServiceGraph(RDFService rdfService) { public RDFServiceGraph(RDFService rdfService) {
this(rdfService, null); this(rdfService, null);
log.info("using graph implementation: " + additionsGraph.getClass().getName());
} }
/** /**
@ -97,19 +97,20 @@ public class RDFServiceGraph implements GraphWithPerform {
return sb.toString(); return sb.toString();
} }
public synchronized void flush() { private synchronized void flush() {
log.debug("Flushing a batch");
ChangeSet changeSet = rdfService.manufactureChangeSet(); ChangeSet changeSet = rdfService.manufactureChangeSet();
try { try {
if(!removalsGraph.isEmpty()) { if(!removalsGraph.isEmpty()) {
String removals = serializeGraph(removalsGraph); String removals = serializeGraph(removalsGraph);
changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), changeSet.addRemoval(RDFServiceUtils.toInputStream(removals),
RDFService.ModelSerializationFormat.N3, graphURI); RDFService.ModelSerializationFormat.N3, graphURI);
removalsGraph.clear();
} }
if(!additionsGraph.isEmpty()) { if(!additionsGraph.isEmpty()) {
String additions = serializeGraph(additionsGraph); String additions = serializeGraph(additionsGraph);
changeSet.addAddition(RDFServiceUtils.toInputStream(additions), changeSet.addAddition(RDFServiceUtils.toInputStream(additions),
RDFService.ModelSerializationFormat.N3, graphURI); RDFService.ModelSerializationFormat.N3, graphURI);
additionsGraph.clear();
} }
rdfService.changeSetUpdate(changeSet); rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) { } catch (RDFServiceException rdfse) {
@ -127,27 +128,51 @@ public class RDFServiceGraph implements GraphWithPerform {
} }
@Override @Override
public synchronized void performAdd(Triple t) { public void performAdd(Triple t) {
if(inTransaction) {
stageAddition(t);
} else {
ChangeSet changeSet = rdfService.manufactureChangeSet();
try {
changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)),
RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse);
}
}
}
private void stageAddition(Triple t) {
if(removalsGraph.contains(t)) { if(removalsGraph.contains(t)) {
removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else { } else {
additionsGraph.add(t); additionsGraph.add(t);
} }
if(!queueWrites) {
flush();
}
} }
@Override @Override
public synchronized void performDelete(Triple t) { public void performDelete(Triple t) {
if(inTransaction) {
stageDeletion(t);
} else {
ChangeSet changeSet = rdfService.manufactureChangeSet();
try {
changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)),
RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse);
}
}
}
private synchronized void stageDeletion(Triple t) {
if(additionsGraph.contains(t)) { if(additionsGraph.contains(t)) {
additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject()); additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else { } else {
removalsGraph.add(t); removalsGraph.add(t);
} }
if(!queueWrites) {
flush();
}
} }
public void removeAll() { public void removeAll() {
@ -157,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform {
} }
String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }";
try { try {
if(inTransaction) {
Model model = ModelFactory.createDefaultModel();
rdfService.sparqlConstructQuery(constructStr, model);
stageRemoveAll(model);
} else {
InputStream model = rdfService.sparqlConstructQuery( InputStream model = rdfService.sparqlConstructQuery(
constructStr, RDFService.ModelSerializationFormat.N3); constructStr, RDFService.ModelSerializationFormat.N3);
ChangeSet changeSet = rdfService.manufactureChangeSet(); ChangeSet changeSet = rdfService.manufactureChangeSet();
changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet); rdfService.changeSetUpdate(changeSet);
}
} catch (RDFServiceException rdfse) { } catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse); throw new RuntimeException(rdfse);
} }
} }
private void stageRemoveAll(Model removals) {
StmtIterator sit = removals.listStatements();
while (sit.hasNext()) {
Triple t = sit.nextStatement().asTriple();
stageDeletion(t);
}
}
@Override @Override
public void close() { public void close() {
// can't close a remote endpoint // can't close a remote endpoint
@ -201,14 +240,23 @@ public class RDFServiceGraph implements GraphWithPerform {
ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult();
execSelect(containsQuery.toString(), consumer); execSelect(containsQuery.toString(), consumer);
boolean initialResult = consumer.hasResult(); boolean initialResult = consumer.hasResult();
if(!queueWrites) { if(!inTransaction) {
return initialResult; return initialResult;
} else { } else {
Triple t = Triple.create(subject, predicate, object); Triple t = Triple.create(subject, predicate, object);
return (initialResult || additionsGraph.contains(t)) && !removalsGraph.contains(t); return (initialResult || additionsGraphContains(t))
&& !removalsGraphContains(t);
} }
} }
private synchronized boolean additionsGraphContains(Triple t) {
return additionsGraph.contains(t);
}
private synchronized boolean removalsGraphContains(Triple t) {
return removalsGraph.contains(t);
}
@Override @Override
public void delete(Triple arg0) throws DeleteDeniedException { public void delete(Triple arg0) throws DeleteDeniedException {
performDelete(arg0); performDelete(arg0);
@ -299,7 +347,7 @@ public class RDFServiceGraph implements GraphWithPerform {
String queryString = findQuery.toString(); String queryString = findQuery.toString();
final List<Triple> triplist = new ArrayList<Triple>(); final List<Triple> triplist = new ArrayList<Triple>();
if(queueWrites) { if(inTransaction) {
addAdditions(triplist, additionsGraph.find(subject, predicate, object)); addAdditions(triplist, additionsGraph.find(subject, predicate, object));
subtractRemovals(triplist, removalsGraph.find(subject, predicate, object)); subtractRemovals(triplist, removalsGraph.find(subject, predicate, object));
} }
@ -468,20 +516,20 @@ public class RDFServiceGraph implements GraphWithPerform {
private final TransactionHandler transactionHandler = new TransactionHandler() { private final TransactionHandler transactionHandler = new TransactionHandler() {
@Override @Override
public synchronized void abort() { public synchronized void abort() {
queueWrites = false; inTransaction = false;
removalsGraph.clear(); removalsGraph.clear();
additionsGraph.clear(); additionsGraph.clear();
} }
@Override @Override
public void begin() { public synchronized void begin() {
queueWrites = true; inTransaction = true;
} }
@Override @Override
public synchronized void commit() { public synchronized void commit() {
flush(); flush();
queueWrites = false; inTransaction = false;
} }
@Override @Override