5x speedup for add RDF via GUI; 25x speedup for SPARQL API writes. sameAs still needs fixing; will flunk tests.

This commit is contained in:
brianjlowe 2015-12-09 21:54:30 +02:00
parent dcfd95ca9d
commit 1a1606d6ff
10 changed files with 458 additions and 125 deletions

View file

@ -19,6 +19,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.ReadWrite;
import com.hp.hpl.jena.update.GraphStore; import com.hp.hpl.jena.update.GraphStore;
import com.hp.hpl.jena.update.GraphStoreFactory; import com.hp.hpl.jena.update.GraphStoreFactory;
import com.hp.hpl.jena.update.UpdateAction; import com.hp.hpl.jena.update.UpdateAction;
@ -96,17 +97,26 @@ public class SparqlUpdateApiController extends VitroApiServlet {
} }
private void executeUpdate(HttpServletRequest req, UpdateRequest parsed) { private void executeUpdate(HttpServletRequest req, UpdateRequest parsed) {
ServletContext ctx = req.getSession().getServletContext();
VitroRequest vreq = new VitroRequest(req); VitroRequest vreq = new VitroRequest(req);
SearchIndexer indexer = ApplicationUtils.instance().getSearchIndexer(); SearchIndexer indexer = ApplicationUtils.instance().getSearchIndexer();
indexer.pause(); Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService());
try { GraphStore graphStore = GraphStoreFactory.create(ds);
Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService()); try {
GraphStore graphStore = GraphStoreFactory.create(ds); if(indexer != null) {
indexer.pause();
}
if(ds.supportsTransactions()) {
ds.begin(ReadWrite.WRITE);
}
UpdateAction.execute(parsed, graphStore); UpdateAction.execute(parsed, graphStore);
} finally { } finally {
indexer.unpause(); if(ds.supportsTransactions()) {
ds.commit();
ds.end();
}
if(indexer != null) {
indexer.unpause();
}
} }
} }

View file

@ -12,11 +12,14 @@ import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelChangedListener;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.vocabulary.OWL;
import com.hp.hpl.jena.vocabulary.RDF;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.servlet.setup.SimpleReasonerSetup;
/** /**
* A ChangeListener that forwards events to a Jena ModelChangedListener * A ChangeListener that forwards events to a Jena ModelChangedListener
@ -34,8 +37,10 @@ public class JenaChangeListener implements ChangeListener {
public JenaChangeListener(ModelChangedListener listener) { public JenaChangeListener(ModelChangedListener listener) {
this.listener = listener; this.listener = listener;
ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_REBUILD); m.register(listener);
ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); // these graphs no longer used
// ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_REBUILD);
// ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD);
} }
@Override @Override
@ -65,9 +70,11 @@ public class JenaChangeListener implements ChangeListener {
// TODO avoid overhead of Model // TODO avoid overhead of Model
private Statement parseTriple(String serializedTriple) { private Statement parseTriple(String serializedTriple) {
try { try {
Model m = ModelFactory.createDefaultModel(); m.enterCriticalSection(Lock.WRITE);
m.removeAll();
// Model m = ModelFactory.createDefaultModel();
m.read(new ByteArrayInputStream( m.read(new ByteArrayInputStream(
serializedTriple.getBytes("UTF-8")), null, "N3"); serializedTriple.getBytes("UTF-8")), null, "N-TRIPLE");
StmtIterator sit = m.listStatements(); StmtIterator sit = m.listStatements();
if (!sit.hasNext()) { if (!sit.hasNext()) {
throw new RuntimeException("no triple parsed from change event"); throw new RuntimeException("no triple parsed from change event");
@ -83,6 +90,8 @@ public class JenaChangeListener implements ChangeListener {
throw riot; throw riot;
} catch (UnsupportedEncodingException uee) { } catch (UnsupportedEncodingException uee) {
throw new RuntimeException(uee); throw new RuntimeException(uee);
} finally {
m.leaveCriticalSection();
} }
} }

View file

@ -5,8 +5,13 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.NodeFactory; import com.hp.hpl.jena.graph.NodeFactory;
import com.hp.hpl.jena.graph.TransactionHandler;
import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.LabelExistsException; import com.hp.hpl.jena.query.LabelExistsException;
import com.hp.hpl.jena.query.ReadWrite; import com.hp.hpl.jena.query.ReadWrite;
@ -21,6 +26,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString;
public class RDFServiceDataset implements Dataset { public class RDFServiceDataset implements Dataset {
private RDFServiceDatasetGraph g; private RDFServiceDatasetGraph g;
private ReadWrite transactionMode;
public RDFServiceDataset(RDFServiceDatasetGraph g) { public RDFServiceDataset(RDFServiceDatasetGraph g) {
this.g = g; this.g = g;
@ -55,9 +61,13 @@ public class RDFServiceDataset implements Dataset {
return g.getLock(); return g.getLock();
} }
private final static Log log = LogFactory.getLog(RDFServiceDataset.class);
@Override @Override
public Model getNamedModel(String arg0) { public Model getNamedModel(String arg0) {
return RDFServiceGraph.createRDFServiceModel(g.getGraph(NodeFactory.createURI(arg0))); Model model = RDFServiceGraph.createRDFServiceModel(
g.getGraph(NodeFactory.createURI(arg0)));
return model;
} }
@Override @Override
@ -108,36 +118,60 @@ public class RDFServiceDataset implements Dataset {
@Override @Override
public boolean supportsTransactions() { public boolean supportsTransactions() {
return false; if (g.getDefaultGraph().getTransactionHandler() == null) {
return false;
} else {
return g.getDefaultGraph().getTransactionHandler().transactionsSupported();
}
} }
@Override @Override
public boolean isInTransaction() { public boolean isInTransaction() {
return false; return (transactionMode != null);
} }
private boolean supportsTransactions(Graph graph) {
return (graph.getTransactionHandler() != null
&& graph.getTransactionHandler().transactionsSupported());
}
@Override @Override
public void begin(ReadWrite arg0) { public void begin(ReadWrite arg0) {
throw new UnsupportedOperationException(this.getClass().getSimpleName() this.transactionMode = arg0;
+ " does not support transactions."); g.begin(arg0);
for(String graphURI : g.getGraphCache().keySet()) {
Graph graph = g.getGraphCache().get(graphURI);
if (supportsTransactions(graph)) {
graph.getTransactionHandler().begin();
}
}
} }
@Override @Override
public void commit() { public void commit() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() for(String graphURI : g.getGraphCache().keySet()) {
+ " does not support transactions."); Graph graph = g.getGraphCache().get(graphURI);
if(supportsTransactions(graph)) {
graph.getTransactionHandler().commit();
}
}
} }
@Override @Override
public void abort() { public void abort() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() for(String graphURI : g.getGraphCache().keySet()) {
+ " does not support transactions."); Graph graph = g.getGraphCache().get(graphURI);
if(supportsTransactions(graph)) {
graph.getTransactionHandler().abort();
}
}
} }
@Override @Override
public void end() { public void end() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() // the Graph tranaction handlers don't seem to support .end()
+ " does not support transactions."); this.transactionMode = null;
g.end();
} }
@Override @Override

View file

@ -6,18 +6,19 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.NodeFactory; import com.hp.hpl.jena.graph.NodeFactory;
import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.ReadWrite;
import com.hp.hpl.jena.shared.Lock; import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.shared.LockMRSW; import com.hp.hpl.jena.shared.LockMRSW;
import com.hp.hpl.jena.sparql.core.DatasetGraph; import com.hp.hpl.jena.sparql.core.DatasetGraph;
import com.hp.hpl.jena.sparql.core.Quad; import com.hp.hpl.jena.sparql.core.Quad;
import com.hp.hpl.jena.sparql.resultset.JSONInput;
import com.hp.hpl.jena.sparql.util.Context; import com.hp.hpl.jena.sparql.util.Context;
import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.SingletonIterator;
import com.hp.hpl.jena.util.iterator.WrappedIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator;
@ -32,11 +33,25 @@ public class RDFServiceDatasetGraph implements DatasetGraph {
private RDFService rdfService; private RDFService rdfService;
private Lock lock = new LockMRSW(); private Lock lock = new LockMRSW();
private Context context = new Context() ; private Context context = new Context() ;
private Map<String, RDFServiceGraph> graphCache = new ConcurrentHashMap<String, RDFServiceGraph>();
private ReadWrite transactionMode;
public RDFServiceDatasetGraph(RDFService rdfService) { public RDFServiceDatasetGraph(RDFService rdfService) {
this.rdfService = rdfService; this.rdfService = rdfService;
} }
public Map<String, RDFServiceGraph> getGraphCache() {
return graphCache;
}
public void begin(ReadWrite mode) {
this.transactionMode = mode;
}
public void end() {
this.transactionMode = null;
}
private Graph getGraphFor(Quad q) { private Graph getGraphFor(Quad q) {
return getGraphFor(q.getGraph()); return getGraphFor(q.getGraph());
} }
@ -44,7 +59,7 @@ public class RDFServiceDatasetGraph implements DatasetGraph {
private Graph getGraphFor(Node g) { private Graph getGraphFor(Node g) {
return (g == Node.ANY) return (g == Node.ANY)
? new RDFServiceGraph(rdfService) ? new RDFServiceGraph(rdfService)
: new RDFServiceGraph(rdfService, g.getURI()); : getGraph(g);
} }
@Override @Override
@ -175,7 +190,22 @@ public class RDFServiceDatasetGraph implements DatasetGraph {
@Override @Override
public RDFServiceGraph getGraph(Node arg0) { public RDFServiceGraph getGraph(Node arg0) {
return new RDFServiceGraph(rdfService, arg0.getURI()); String graphURI = arg0.getURI();
if(graphCache.containsKey(graphURI)) {
return graphCache.get(graphURI);
} else {
RDFServiceGraph graph = new RDFServiceGraph(rdfService, arg0.getURI());
graphCache.put(graphURI, graph);
if(transactionMode != null && supportsTransactions(graph)) {
graph.getTransactionHandler().begin();
}
return graph;
}
}
private boolean supportsTransactions(Graph graph) {
return (graph.getTransactionHandler() != null
&& graph.getTransactionHandler().transactionsSupported());
} }
@Override @Override
@ -205,13 +235,11 @@ public class RDFServiceDatasetGraph implements DatasetGraph {
@Override @Override
public void removeGraph(Node arg0) { public void removeGraph(Node arg0) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
@Override @Override
public void setDefaultGraph(Graph arg0) { public void setDefaultGraph(Graph arg0) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
@Override @Override
@ -229,4 +257,5 @@ public class RDFServiceDatasetGraph implements DatasetGraph {
return "RDFServiceDatasetGraph[" + ToString.hashHex(this) return "RDFServiceDatasetGraph[" + ToString.hashHex(this)
+ ", " + rdfService + "]"; + ", " + rdfService + "]";
} }
} }

View file

@ -6,8 +6,9 @@ import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -23,14 +24,13 @@ import com.hp.hpl.jena.graph.TripleMatch;
import com.hp.hpl.jena.graph.impl.GraphWithPerform; import com.hp.hpl.jena.graph.impl.GraphWithPerform;
import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.graph.impl.SimpleEventManager;
import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.listeners.StatementListener;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.shared.AddDeniedException; import com.hp.hpl.jena.shared.AddDeniedException;
import com.hp.hpl.jena.shared.Command;
import com.hp.hpl.jena.shared.DeleteDeniedException; import com.hp.hpl.jena.shared.DeleteDeniedException;
import com.hp.hpl.jena.shared.PrefixMapping; import com.hp.hpl.jena.shared.PrefixMapping;
import com.hp.hpl.jena.shared.impl.PrefixMappingImpl; import com.hp.hpl.jena.shared.impl.PrefixMappingImpl;
import com.hp.hpl.jena.sparql.resultset.JSONInput;
import com.hp.hpl.jena.util.iterator.ExtendedIterator; import com.hp.hpl.jena.util.iterator.ExtendedIterator;
import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.SingletonIterator;
import com.hp.hpl.jena.util.iterator.WrappedIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator;
@ -38,6 +38,7 @@ import com.hp.hpl.jena.util.iterator.WrappedIterator;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils;
import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString;
@ -52,6 +53,11 @@ public class RDFServiceGraph implements GraphWithPerform {
private PrefixMapping prefixMapping = new PrefixMappingImpl(); private PrefixMapping prefixMapping = new PrefixMappingImpl();
private GraphEventManager eventManager; private GraphEventManager eventManager;
private boolean queueWrites = false;
private ConcurrentLinkedQueue<Triple> addTripleQueue = new ConcurrentLinkedQueue<Triple>();
private ConcurrentLinkedQueue<Triple> removeTripleQueue = new ConcurrentLinkedQueue<Triple>();
/** /**
* Returns a SparqlGraph for the union of named graphs in a remote repository * Returns a SparqlGraph for the union of named graphs in a remote repository
* @param endpointURI * @param endpointURI
@ -91,29 +97,47 @@ public class RDFServiceGraph implements GraphWithPerform {
return sb.toString(); return sb.toString();
} }
@Override public void flush() {
public void performAdd(Triple t) { log.debug("Flushing a batch");
ChangeSet changeSet = rdfService.manufactureChangeSet(); ChangeSet changeSet = rdfService.manufactureChangeSet();
try { try {
changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), if(!removeTripleQueue.isEmpty()) {
RDFService.ModelSerializationFormat.N3, graphURI); String removals = serializeQueue(removeTripleQueue);
changeSet.addRemoval(RDFServiceUtils.toInputStream(removals),
RDFService.ModelSerializationFormat.N3, graphURI);
}
if(!addTripleQueue.isEmpty()) {
String additions = serializeQueue(addTripleQueue);
changeSet.addAddition(RDFServiceUtils.toInputStream(additions),
RDFService.ModelSerializationFormat.N3, graphURI);
}
rdfService.changeSetUpdate(changeSet); rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) { } catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse); throw new RuntimeException(rdfse);
} }
}
private String serializeQueue(Queue<Triple> tripleQueue) {
String triples = "";
while(!tripleQueue.isEmpty()) {
triples += " \n" + serialize(tripleQueue.poll());
}
return triples;
}
@Override
public void performAdd(Triple t) {
addTripleQueue.add(t);
if(!queueWrites) {
flush();
}
} }
@Override @Override
public void performDelete(Triple t) { public void performDelete(Triple t) {
ChangeSet changeSet = rdfService.manufactureChangeSet(); removeTripleQueue.add(t);
try { if(!queueWrites) {
changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), flush();
RDFService.ModelSerializationFormat.N3, graphURI);
rdfService.changeSetUpdate(changeSet);
} catch (RDFServiceException rdfse) {
throw new RuntimeException(rdfse);
} }
} }
@ -325,8 +349,7 @@ public class RDFServiceGraph implements GraphWithPerform {
@Override @Override
public TransactionHandler getTransactionHandler() { public TransactionHandler getTransactionHandler() {
// TODO Auto-generated method stub return transactionHandler;
return null;
} }
@Override @Override
@ -403,6 +426,37 @@ public class RDFServiceGraph implements GraphWithPerform {
public boolean sizeAccurate() { public boolean sizeAccurate() {
return true; return true;
} }
};
private final TransactionHandler transactionHandler = new TransactionHandler() {
@Override
public void abort() {
queueWrites = false;
removeTripleQueue.clear();
addTripleQueue.clear();
}
@Override
public void begin() {
queueWrites = true;
}
@Override
public void commit() {
flush();
queueWrites = false;
}
@Override
public Object executeInTransaction(Command arg0) {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean transactionsSupported() {
return true;
}
}; };
private void execSelect(String queryStr, ResultSetConsumer consumer) { private void execSelect(String queryStr, ResultSetConsumer consumer) {

View file

@ -16,6 +16,7 @@ import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.ontology.OntModel; import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.listeners.StatementListener;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.shared.Lock; import com.hp.hpl.jena.shared.Lock;
@ -33,6 +34,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent;
import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
@ -161,9 +163,28 @@ public class VClassGroupCache implements SearchIndexer.Listener {
VclassMap = classMap; VclassMap = classMap;
} }
private boolean paused = false;
private boolean updateRequested = false;
public void pause() {
this.paused = true;
}
public void unpause() {
this.paused = false;
if(updateRequested) {
updateRequested = false;
requestCacheUpdate();
}
}
public void requestCacheUpdate() { public void requestCacheUpdate() {
log.debug("requesting update"); log.debug("requesting update");
_cacheRebuildThread.informOfQueueChange(); if(paused) {
updateRequested = true;
} else {
_cacheRebuildThread.informOfQueueChange();
}
} }
protected void requestStop() { protected void requestStop() {
@ -337,24 +358,25 @@ public class VClassGroupCache implements SearchIndexer.Listener {
} }
} }
protected static boolean isClassNameChange(Statement stmt, OntModel jenaOntModel) { protected static boolean isClassNameChange(Statement stmt, ServletContext context) {
// Check if the stmt is a rdfs:label change and that the // Check if the stmt is a rdfs:label change and that the
// subject is an owl:Class. // subject is an owl:Class.
if( RDFS.label.equals( stmt.getPredicate() )) { if( !RDFS.label.equals( stmt.getPredicate() )) {
jenaOntModel.enterCriticalSection(Lock.READ);
try{
return jenaOntModel.contains(
ResourceFactory.createStatement(
ResourceFactory.createResource(stmt.getSubject().getURI()),
RDF.type,
OWL.Class));
}finally{
jenaOntModel.leaveCriticalSection();
}
}else{
return false; return false;
} }
OntModel jenaOntModel = ModelAccess.on(context).getOntModelSelector().getTBoxModel();
jenaOntModel.enterCriticalSection(Lock.READ);
try{
return jenaOntModel.contains(
ResourceFactory.createStatement(
ResourceFactory.createResource(stmt.getSubject().getURI()),
RDF.type,
OWL.Class));
}finally{
jenaOntModel.leaveCriticalSection();
}
} }
/* ******************** RebuildGroupCacheThread **************** */ /* ******************** RebuildGroupCacheThread **************** */
protected class RebuildGroupCacheThread extends VitroBackgroundThread { protected class RebuildGroupCacheThread extends VitroBackgroundThread {
@ -460,20 +482,26 @@ public class VClassGroupCache implements SearchIndexer.Listener {
log.debug("subject: " + stmt.getSubject().getURI()); log.debug("subject: " + stmt.getSubject().getURI());
log.debug("predicate: " + stmt.getPredicate().getURI()); log.debug("predicate: " + stmt.getPredicate().getURI());
} }
if (RDF.type.getURI().equals(stmt.getPredicate().getURI())) { if (RDF.type.equals(stmt.getPredicate())) {
requestCacheUpdate(); requestCacheUpdate();
} else if (VitroVocabulary.IN_CLASSGROUP.equals(stmt.getPredicate().getURI())) { } else if (VitroVocabulary.IN_CLASSGROUP.equals(stmt.getPredicate().getURI())) {
requestCacheUpdate(); requestCacheUpdate();
} else if(VitroVocabulary.DISPLAY_RANK.equals(stmt.getPredicate().getURI())){ } else if(VitroVocabulary.DISPLAY_RANK.equals(stmt.getPredicate().getURI())){
requestCacheUpdate(); requestCacheUpdate();
} else { } else if( isClassNameChange(stmt, context) ) {
OntModel jenaOntModel = ModelAccess.on(context).getOntModel(); requestCacheUpdate();
if( isClassNameChange(stmt, jenaOntModel) ) {
requestCacheUpdate();
}
} }
} }
public void notifyEvent(Model model, Object event) {
if (event instanceof BulkUpdateEvent) {
if(((BulkUpdateEvent) event).getBegin()) {
pause();
} else {
unpause();
}
}
}
} }

View file

@ -2,10 +2,9 @@
package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.sdb; package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.sdb;
import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.sql.DataSource; import javax.sql.DataSource;
@ -23,6 +22,8 @@ import com.hp.hpl.jena.sdb.sql.SDBConnection;
import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper; import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper;
import edu.cornell.mannlib.vitro.webapp.dao.jena.StaticDatasetFactory; import edu.cornell.mannlib.vitro.webapp.dao.jena.StaticDatasetFactory;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
@ -76,30 +77,43 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService {
SDBConnection sdbConn = getSDBConnection(); SDBConnection sdbConn = getSDBConnection();
Dataset dataset = getDataset(sdbConn); Dataset dataset = getDataset(sdbConn);
try {
insureThatInputStreamsAreResettable(changeSet);
} catch (IOException e) {
throw new RuntimeException(e);
}
try { try {
insureThatInputStreamsAreResettable(changeSet); beginTransaction(sdbConn);
startBulkUpdate();
beginTransaction(sdbConn);
notifyListenersOfPreChangeEvents(changeSet); notifyListenersOfPreChangeEvents(changeSet);
applyChangeSetToModel(changeSet, dataset); applyChangeSetToModel(changeSet, dataset);
commitTransaction(sdbConn); commitTransaction(sdbConn);
notifyListenersOfChanges(changeSet); notifyListenersOfChanges(changeSet);
notifyListenersOfPostChangeEvents(changeSet); notifyListenersOfPostChangeEvents(changeSet);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error(e, e); log.error(e, e);
abortTransaction(sdbConn); abortTransaction(sdbConn);
throw new RDFServiceException(e); throw new RDFServiceException(e);
} finally { } finally {
endBulkUpdate();
close(sdbConn); close(sdbConn);
} }
} }
private void startBulkUpdate() {
for (ChangeListener cl : this.getRegisteredListeners()) {
cl.notifyEvent(null, new BulkUpdateEvent(null, true));
}
}
private void endBulkUpdate() {
for (ChangeListener cl : this.getRegisteredListeners()) {
cl.notifyEvent(null, new BulkUpdateEvent(null, false));
}
}
private SDBConnection getSDBConnection() throws RDFServiceException { private SDBConnection getSDBConnection() throws RDFServiceException {
try { try {
Connection c = (conn != null) ? conn : ds.getConnection(); Connection c = (conn != null) ? conn : ds.getConnection();
@ -140,10 +154,6 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService {
} }
} }
private static final Pattern OPTIONAL_PATTERN = Pattern.compile("optional", Pattern.CASE_INSENSITIVE);
private static final Pattern GRAPH_PATTERN = Pattern.compile("graph", Pattern.CASE_INSENSITIVE);
@Override @Override
protected QueryExecution createQueryExecution(String queryString, Query q, Dataset d) { protected QueryExecution createQueryExecution(String queryString, Query q, Dataset d) {
return QueryExecutionFactory.create(q, d); return QueryExecutionFactory.create(q, d);

View file

@ -16,8 +16,10 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory; import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -58,6 +60,7 @@ import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sparql.core.Quad; import com.hp.hpl.jena.sparql.core.Quad;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset;
import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
@ -83,7 +86,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
protected String readEndpointURI; protected String readEndpointURI;
protected String updateEndpointURI; protected String updateEndpointURI;
// the number of triples to be // the number of triples to be
private static final int CHUNK_SIZE = 1000; // added/removed in a single private static final int CHUNK_SIZE = 5000; // added/removed in a single
// SPARQL UPDATE // SPARQL UPDATE
protected HttpClient httpClient; protected HttpClient httpClient;
@ -170,12 +173,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
&& !isPreconditionSatisfied( && !isPreconditionSatisfied(
changeSet.getPreconditionQuery(), changeSet.getPreconditionQuery(),
changeSet.getPreconditionQueryType())) { changeSet.getPreconditionQueryType())) {
return false; return false;
} }
try { try {
for (Object o : changeSet.getPreChangeEvents()) {
this.notifyListenersOfEvent(o); for (Object o : changeSet.getPreChangeEvents()) {
this.notifyListenersOfEvent(o);
} }
Iterator<ModelChange> csIt = changeSet.getModelChanges().iterator(); Iterator<ModelChange> csIt = changeSet.getModelChanges().iterator();
@ -196,10 +200,18 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
modelChange.getSerializedModel().reset(); modelChange.getSerializedModel().reset();
Model model = ModelFactory.createModelForGraph( Model model = ModelFactory.createModelForGraph(
new ListeningGraph(modelChange.getGraphURI(), this)); new ListeningGraph(modelChange.getGraphURI(), this));
long start = System.currentTimeMillis();
if (modelChange.getOperation() == ModelChange.Operation.ADD) { if (modelChange.getOperation() == ModelChange.Operation.ADD) {
model.read(modelChange.getSerializedModel(), null, Model temp = ModelFactory.createDefaultModel();
temp.read(modelChange.getSerializedModel(), null,
getSerializationFormatString( getSerializationFormatString(
modelChange.getSerializationFormat())); modelChange.getSerializationFormat()));
StmtIterator sit = temp.listStatements();
while(sit.hasNext()) {
Triple triple = sit.nextStatement().asTriple();
this.notifyListeners(triple, ModelChange.Operation.ADD, modelChange.getGraphURI());
}
//model.add(temp);
} else if (modelChange.getOperation() == ModelChange.Operation.REMOVE){ } else if (modelChange.getOperation() == ModelChange.Operation.REMOVE){
Model temp = ModelFactory.createDefaultModel(); Model temp = ModelFactory.createDefaultModel();
temp.read(modelChange.getSerializedModel(), null, temp.read(modelChange.getSerializedModel(), null,
@ -210,6 +222,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
log.error("Unsupported model change type " + log.error("Unsupported model change type " +
modelChange.getOperation().getClass().getName()); modelChange.getOperation().getClass().getName());
} }
log.info((System.currentTimeMillis() - start) + " ms to notify " + this.getRegisteredListeners().size() + " listeners");
} }
for (Object o : changeSet.getPostChangeEvents()) { for (Object o : changeSet.getPostChangeEvents()) {
@ -468,7 +481,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
*/ */
@Override @Override
public void getGraphMetadata() throws RDFServiceException { public void getGraphMetadata() throws RDFServiceException {
throw new UnsupportedOperationException();
} }
/** /**
@ -549,7 +562,9 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
public void addModel(Model model, String graphURI) throws RDFServiceException { public void addModel(Model model, String graphURI) throws RDFServiceException {
try { try {
long start = System.currentTimeMillis();
verbModel(model, graphURI, "INSERT"); verbModel(model, graphURI, "INSERT");
log.info((System.currentTimeMillis() - start) + " ms to insert " + model.size() + " triples");
} finally { } finally {
rebuildGraphURICache = true; rebuildGraphURICache = true;
} }

View file

@ -64,7 +64,7 @@ public class ABoxRecomputer {
private volatile boolean recomputing = false; private volatile boolean recomputing = false;
private boolean stopRequested = false; private boolean stopRequested = false;
private final int BATCH_SIZE = 100; private final int BATCH_SIZE = 500;
private final int REPORTING_INTERVAL = 1000; private final int REPORTING_INTERVAL = 1000;
/** /**
@ -109,18 +109,24 @@ public class ABoxRecomputer {
recomputing = true; recomputing = true;
} }
} }
boolean fullRecompute = (individualURIs == null);
boolean sizableRecompute = (!fullRecompute && individualURIs.size() > 2);
try { try {
if (searchIndexer != null) { if(fullRecompute || sizableRecompute) { // if doing a full rebuild
searchIndexer.pause(); if (searchIndexer != null) {
// Register now that we want to rebuild the index when we unpause searchIndexer.pause();
// This allows the indexer to optimize behaviour whilst paused // Register now that we want to rebuild the index when we unpause
searchIndexer.rebuildIndex(); // This allows the indexer to optimize behaviour whilst paused
if(fullRecompute) {
searchIndexer.rebuildIndex();
}
}
} }
// Create a type cache for this execution and pass it to the recompute function // Create a type cache for this execution and pass it to the recompute function
// Ensures that caches are only valid for the length of one recompute // Ensures that caches are only valid for the length of one recompute
recomputeABox(individualURIs, new TypeCaches()); recomputeABox(individualURIs, new TypeCaches());
} finally { } finally {
if (searchIndexer != null) { if ((fullRecompute || sizableRecompute) && searchIndexer != null) {
searchIndexer.unpause(); searchIndexer.unpause();
} }
synchronized (lock1) { synchronized (lock1) {
@ -199,7 +205,7 @@ public class ABoxRecomputer {
Model rebuildModel, TypeCaches caches) throws RDFServiceException { Model rebuildModel, TypeCaches caches) throws RDFServiceException {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Model assertions = getAssertions(individualURI); Model assertions = getAssertions(individualURI);
log.trace((System.currentTimeMillis() - start) + " ms to get assertions."); log.debug((System.currentTimeMillis() - start) + " ms to get assertions.");
Model additionalInferences = recomputeIndividual( Model additionalInferences = recomputeIndividual(
individualURI, null, assertions, rebuildModel, caches, RUN_PLUGINS); individualURI, null, assertions, rebuildModel, caches, RUN_PLUGINS);
@ -588,7 +594,7 @@ public class ABoxRecomputer {
getSameAsIndividuals(indUri, sameAsInds); getSameAsIndividuals(indUri, sameAsInds);
} }
} catch (RDFServiceException e) { } catch (RDFServiceException e) {
log.error(e,e);
} }
} }

View file

@ -2,14 +2,14 @@
package edu.cornell.mannlib.vitro.webapp.reasoner; package edu.cornell.mannlib.vitro.webapp.reasoner;
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -50,7 +50,6 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
/** /**
* Allows for real-time incremental materialization or retraction of RDFS- * Allows for real-time incremental materialization or retraction of RDFS-
@ -77,7 +76,7 @@ public class SimpleReasoner extends StatementListener {
VitroModelFactory.createOntologyModel()) VitroModelFactory.createOntologyModel())
.createAnnotationProperty(mostSpecificTypePropertyURI); .createAnnotationProperty(mostSpecificTypePropertyURI);
private Queue<String> individualURIqueue = new ConcurrentLinkedQueue<String>(); private Queue<String> individualURIqueue = new IndividualURIQueue<String>();
// DeltaComputer // DeltaComputer
private CumulativeDeltaModeler aBoxDeltaModeler1 = null; private CumulativeDeltaModeler aBoxDeltaModeler1 = null;
@ -189,16 +188,12 @@ public class SimpleReasoner extends StatementListener {
private void listenToStatement(Statement stmt) { private void listenToStatement(Statement stmt) {
if(stmt.getSubject().isURIResource()) { if(stmt.getSubject().isURIResource()) {
if (!individualURIqueue.contains(stmt.getSubject().getURI())) { individualURIqueue.add(stmt.getSubject().getURI());
individualURIqueue.add(stmt.getSubject().getURI());
}
} }
if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) { if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) {
if (!individualURIqueue.contains(stmt.getObject().asResource().getURI())) { individualURIqueue.add(stmt.getObject().asResource().getURI());
individualURIqueue.add(stmt.getObject().asResource().getURI());
}
} }
if(!accumulateChanges || individualURIqueue.size() > SAFETY_VALVE) { if(!accumulateChanges) {
recomputeIndividuals(); recomputeIndividuals();
} }
} }
@ -206,8 +201,17 @@ public class SimpleReasoner extends StatementListener {
private static final int SAFETY_VALVE = 1000000; // one million individuals private static final int SAFETY_VALVE = 1000000; // one million individuals
private void recomputeIndividuals() { private void recomputeIndividuals() {
if(recomputer.isRecomputing()) {
return;
}
long start = System.currentTimeMillis();
int size = individualURIqueue.size();
recomputer.recompute(individualURIqueue); recomputer.recompute(individualURIqueue);
individualURIqueue.clear(); //individualURIqueue.clear();
if(size > 2) {
log.info((System.currentTimeMillis() - start) + " ms to recompute "
+ size + " individuals");
}
} }
private boolean accumulateChanges = false; private boolean accumulateChanges = false;
@ -220,8 +224,7 @@ public class SimpleReasoner extends StatementListener {
@Override @Override
public void addedStatement(Statement stmt) { public void addedStatement(Statement stmt) {
doPlugins(ModelUpdate.Operation.ADD,stmt); doPlugins(ModelUpdate.Operation.ADD,stmt);
listenToStatement(stmt); listenToStatement(stmt);;
// try { // try {
// if (stmt.getPredicate().equals(RDF.type)) { // if (stmt.getPredicate().equals(RDF.type)) {
// addedABoxTypeAssertion(stmt, inferenceModel, new HashSet<String>()); // addedABoxTypeAssertion(stmt, inferenceModel, new HashSet<String>());
@ -1633,7 +1636,7 @@ public class SimpleReasoner extends StatementListener {
if (((BulkUpdateEvent) event).getBegin()) { if (((BulkUpdateEvent) event).getBegin()) {
this.accumulateChanges = true; this.accumulateChanges = true;
log.info("received a bulk update begin event"); //log.info("received a bulk update begin event");
// if (deltaComputerProcessing) { // if (deltaComputerProcessing) {
// eventCount++; // eventCount++;
// log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); // log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount);
@ -1651,7 +1654,7 @@ public class SimpleReasoner extends StatementListener {
// log.info("initializing batch mode 1"); // log.info("initializing batch mode 1");
// } // }
} else { } else {
log.info("received a bulk update end event"); //log.info("received a bulk update end event");
this.accumulateChanges = false; this.accumulateChanges = false;
recomputeIndividuals(); recomputeIndividuals();
// if (!deltaComputerProcessing) { // if (!deltaComputerProcessing) {
@ -1817,4 +1820,139 @@ public class SimpleReasoner extends StatementListener {
: ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]";
} }
private class IndividualURIQueue<E> implements Queue<E> {
private ConcurrentLinkedQueue<E> q = new ConcurrentLinkedQueue<E>();
private ConcurrentHashMap<E, Boolean> m = new ConcurrentHashMap<E, Boolean>();
@Override
public boolean addAll(Collection<? extends E> c) {
boolean changed = false;
for (E e : c) {
if(!m.containsKey(e)) {
m.put(e, Boolean.TRUE);
q.add(e);
changed = true;
}
}
return changed;
}
@Override
public void clear() {
m.clear();
q.clear();
}
@Override
public boolean contains(Object o) {
return m.contains(o);
}
@Override
public boolean containsAll(Collection<?> c) {
boolean contains = true;
for(Object e : c) {
contains |= m.contains(e);
}
return contains;
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<E> iterator() {
return q.iterator();
}
@Override
public boolean remove(Object o) {
m.remove(o);
return q.remove(o);
}
@Override
public boolean removeAll(Collection<?> c) {
for (Object e : c) {
m.remove(e);
}
return q.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
boolean changed = false;
Iterator<E> it = m.keySet().iterator();
while(it.hasNext()) {
E e = it.next();
if(!c.contains(e)) {
m.remove(e);
q.remove(e);
changed = true;
}
}
return changed;
}
@Override
public int size() {
return m.size();
}
@Override
public Object[] toArray() {
return q.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return q.toArray(a);
}
@Override
public boolean add(E e) {
if(m.containsKey(e)) {
return false;
} else {
m.put(e, Boolean.TRUE);
q.add(e);
return true;
}
}
@Override
public E element() {
return q.element();
}
@Override
public boolean offer(E e) {
return q.offer(e);
}
@Override
public E peek() {
return q.peek();
}
@Override
public E poll() {
E e = q.poll();
m.remove(e);
return e;
}
@Override
public E remove() {
E e = q.remove();
m.remove(e);
return e;
}
}
} }