Merge branch 'writePerformance-1.8' into writePerformance

This commit is contained in:
brianjlowe 2016-02-11 16:30:19 +02:00
commit 724d8df369
2 changed files with 211 additions and 36 deletions

View file

@ -5,9 +5,8 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -26,6 +25,8 @@ import com.hp.hpl.jena.graph.impl.SimpleEventManager;
import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.listeners.StatementListener;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.AddDeniedException;
import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.Command;
import com.hp.hpl.jena.shared.DeleteDeniedException; import com.hp.hpl.jena.shared.DeleteDeniedException;
@ -53,10 +54,9 @@ public class RDFServiceGraph implements GraphWithPerform {
private PrefixMapping prefixMapping = new PrefixMappingImpl(); private PrefixMapping prefixMapping = new PrefixMappingImpl();
private GraphEventManager eventManager; private GraphEventManager eventManager;
private boolean queueWrites = false; private boolean inTransaction = false;
private ConcurrentLinkedQueue<Triple> addTripleQueue = new ConcurrentLinkedQueue<Triple>(); private Graph additionsGraph = ModelFactory.createDefaultModel().getGraph();
private ConcurrentLinkedQueue<Triple> removeTripleQueue = new ConcurrentLinkedQueue<Triple>(); private Graph removalsGraph = ModelFactory.createDefaultModel().getGraph();
/** /**
* Returns a SparqlGraph for the union of named graphs in a remote repository * Returns a SparqlGraph for the union of named graphs in a remote repository
@ -97,19 +97,20 @@ public class RDFServiceGraph implements GraphWithPerform {
return sb.toString(); return sb.toString();
} }
public void flush() { private synchronized void flush() {
log.debug("Flushing a batch");
ChangeSet changeSet = rdfService.manufactureChangeSet(); ChangeSet changeSet = rdfService.manufactureChangeSet();
try { try {
if(!removeTripleQueue.isEmpty()) { if(!removalsGraph.isEmpty()) {
String removals = serializeQueue(removeTripleQueue); String removals = serializeGraph(removalsGraph);
changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), changeSet.addRemoval(RDFServiceUtils.toInputStream(removals),
RDFService.ModelSerializationFormat.N3, graphURI); RDFService.ModelSerializationFormat.N3, graphURI);
removalsGraph.clear();
} }
if(!addTripleQueue.isEmpty()) { if(!additionsGraph.isEmpty()) {
String additions = serializeQueue(addTripleQueue); String additions = serializeGraph(additionsGraph);
changeSet.addAddition(RDFServiceUtils.toInputStream(additions), changeSet.addAddition(RDFServiceUtils.toInputStream(additions),
RDFService.ModelSerializationFormat.N3, graphURI); RDFService.ModelSerializationFormat.N3, graphURI);
additionsGraph.clear();
} }
rdfService.changeSetUpdate(changeSet); rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) { } catch (RDFServiceException rdfse) {
@ -117,27 +118,60 @@ public class RDFServiceGraph implements GraphWithPerform {
} }
} }
private String serializeQueue(Queue<Triple> tripleQueue) { private synchronized String serializeGraph(Graph graph) {
String triples = ""; String triples = "";
while(!tripleQueue.isEmpty()) { Iterator<Triple> tripIt = graph.find(null, null, null);
triples += " \n" + serialize(tripleQueue.poll()); while(tripIt.hasNext()) {
triples += " \n" + serialize(tripIt.next());
} }
return triples; return triples;
} }
@Override @Override
public void performAdd(Triple t) { public void performAdd(Triple t) {
addTripleQueue.add(t); if(inTransaction) {
if(!queueWrites) { stageAddition(t);
flush(); } else {
ChangeSet changeSet = rdfService.manufactureChangeSet();
try {
changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)),
RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse);
}
}
}
private void stageAddition(Triple t) {
if(removalsGraph.contains(t)) {
removalsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else {
additionsGraph.add(t);
} }
} }
@Override @Override
public void performDelete(Triple t) { public void performDelete(Triple t) {
removeTripleQueue.add(t); if(inTransaction) {
if(!queueWrites) { stageDeletion(t);
flush(); } else {
ChangeSet changeSet = rdfService.manufactureChangeSet();
try {
changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)),
RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse);
}
}
}
private synchronized void stageDeletion(Triple t) {
if(additionsGraph.contains(t)) {
additionsGraph.remove(t.getSubject(), t.getPredicate(), t.getObject());
} else {
removalsGraph.add(t);
} }
} }
@ -148,16 +182,30 @@ public class RDFServiceGraph implements GraphWithPerform {
} }
String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }"; String constructStr = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + graphURI + "> { ?s ?p ?o } }";
try { try {
InputStream model = rdfService.sparqlConstructQuery( if(inTransaction) {
constructStr, RDFService.ModelSerializationFormat.N3); Model model = ModelFactory.createDefaultModel();
ChangeSet changeSet = rdfService.manufactureChangeSet(); rdfService.sparqlConstructQuery(constructStr, model);
changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI); stageRemoveAll(model);
rdfService.changeSetUpdate(changeSet); } else {
InputStream model = rdfService.sparqlConstructQuery(
constructStr, RDFService.ModelSerializationFormat.N3);
ChangeSet changeSet = rdfService.manufactureChangeSet();
changeSet.addRemoval(model, RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
}
} catch (RDFServiceException rdfse) { } catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse); throw new RuntimeException(rdfse);
} }
} }
private void stageRemoveAll(Model removals) {
StmtIterator sit = removals.listStatements();
while (sit.hasNext()) {
Triple t = sit.nextStatement().asTriple();
stageDeletion(t);
}
}
@Override @Override
public void close() { public void close() {
// can't close a remote endpoint // can't close a remote endpoint
@ -191,7 +239,22 @@ public class RDFServiceGraph implements GraphWithPerform {
ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult(); ResultSetConsumer.HasResult consumer = new ResultSetConsumer.HasResult();
execSelect(containsQuery.toString(), consumer); execSelect(containsQuery.toString(), consumer);
return consumer.hasResult(); boolean initialResult = consumer.hasResult();
if(!inTransaction) {
return initialResult;
} else {
Triple t = Triple.create(subject, predicate, object);
return (initialResult || additionsGraphContains(t))
&& !removalsGraphContains(t);
}
}
private synchronized boolean additionsGraphContains(Triple t) {
return additionsGraph.contains(t);
}
private synchronized boolean removalsGraphContains(Triple t) {
return removalsGraph.contains(t);
} }
@Override @Override
@ -284,6 +347,10 @@ public class RDFServiceGraph implements GraphWithPerform {
String queryString = findQuery.toString(); String queryString = findQuery.toString();
final List<Triple> triplist = new ArrayList<Triple>(); final List<Triple> triplist = new ArrayList<Triple>();
if(inTransaction) {
addAdditions(triplist, additionsGraph.find(subject, predicate, object));
subtractRemovals(triplist, removalsGraph.find(subject, predicate, object));
}
execSelect(queryString, new ResultSetConsumer() { execSelect(queryString, new ResultSetConsumer() {
@Override @Override
@ -311,6 +378,24 @@ public class RDFServiceGraph implements GraphWithPerform {
return WrappedIterator.create(triplist.iterator()); return WrappedIterator.create(triplist.iterator());
} }
private void addAdditions(List<Triple> tripList, ExtendedIterator<Triple> tripIt) {
while(tripIt.hasNext()) {
Triple t = tripIt.next();
if(!tripList.contains(t)) {
tripList.add(t);
}
}
}
private void subtractRemovals(List<Triple> tripList, ExtendedIterator<Triple> tripIt) {
while(tripIt.hasNext()) {
Triple t = tripIt.next();
if(tripList.contains(t)) {
tripList.remove(t);
}
}
}
private boolean isVar(Node node) { private boolean isVar(Node node) {
return (node == null || node.isVariable() || node == Node.ANY); return (node == null || node.isVariable() || node == Node.ANY);
} }
@ -430,21 +515,21 @@ public class RDFServiceGraph implements GraphWithPerform {
private final TransactionHandler transactionHandler = new TransactionHandler() { private final TransactionHandler transactionHandler = new TransactionHandler() {
@Override @Override
public void abort() { public synchronized void abort() {
queueWrites = false; inTransaction = false;
removeTripleQueue.clear(); removalsGraph.clear();
addTripleQueue.clear(); additionsGraph.clear();
} }
@Override @Override
public void begin() { public synchronized void begin() {
queueWrites = true; inTransaction = true;
} }
@Override @Override
public void commit() { public synchronized void commit() {
flush(); flush();
queueWrites = false; inTransaction = false;
} }
@Override @Override

View file

@ -0,0 +1,90 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.controller.api;
import static org.junit.Assert.*;
import java.io.StringReader;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.DatasetFactory;
import com.hp.hpl.jena.query.ReadWrite;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.update.GraphStore;
import com.hp.hpl.jena.update.GraphStoreFactory;
import com.hp.hpl.jena.update.UpdateAction;
import com.hp.hpl.jena.update.UpdateFactory;
import edu.cornell.mannlib.vitro.testing.AbstractTestClass;
import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel;
/**
* Test that the SparqlQueryApiExecutor can handle all query types and all
* formats.
*/
public class SparqlUpdateApiTest extends AbstractTestClass {
private final String GRAPH_URI = "http://example.org/graph";
private final String updateStr1 =
"INSERT DATA { GRAPH <" + GRAPH_URI + "> { \n" +
" <http://here.edu/n1> a <http://here.edu/Class1> . \n" +
"} } ; \n" +
"INSERT { GRAPH <" + GRAPH_URI + "> { \n " +
" ?x a <http://here.edu/Class2> . \n " +
"} } WHERE { \n" +
" GRAPH <" + GRAPH_URI + "> { ?x a <http://here.edu/Class1> } \n " +
"}";
private final String result1 =
"<http://here.edu/n1> a <http://here.edu/Class1> . \n" +
"<http://here.edu/n1> a <http://here.edu/Class2> ." ;
// look at how the SimpleReasoner is set up.
private Model model;
private RDFService rdfService;
@Before
public void setup() {
model = ModelFactory.createDefaultModel();
Dataset ds = DatasetFactory.createMem();
ds.addNamedModel(GRAPH_URI, model);
rdfService = new RDFServiceModel(ds);
}
// ----------------------------------------------------------------------
// Tests
// ----------------------------------------------------------------------
@Test
public void nullRdfService() throws Exception {
model.removeAll();
Model desiredResults = ModelFactory.createDefaultModel();
desiredResults.read(new StringReader(result1), null, "N3");
Dataset ds = new RDFServiceDataset(rdfService);
GraphStore graphStore = GraphStoreFactory.create(ds);
try {
if(ds.supportsTransactions()) {
ds.begin(ReadWrite.WRITE);
System.out.println("yep");
}
UpdateAction.execute(UpdateFactory.create(updateStr1), graphStore);
} finally {
if(ds.supportsTransactions()) {
ds.commit();
ds.end();
}
}
assertEquals("updateStr1 yields result1", desiredResults.toString(), model.toString());
}
}