diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java index 21ba21aea..0be8b50f5 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/controller/api/SparqlUpdateApiController.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.query.Dataset; +import com.hp.hpl.jena.query.ReadWrite; import com.hp.hpl.jena.update.GraphStore; import com.hp.hpl.jena.update.GraphStoreFactory; import com.hp.hpl.jena.update.UpdateAction; @@ -96,17 +97,26 @@ public class SparqlUpdateApiController extends VitroApiServlet { } private void executeUpdate(HttpServletRequest req, UpdateRequest parsed) { - ServletContext ctx = req.getSession().getServletContext(); VitroRequest vreq = new VitroRequest(req); - SearchIndexer indexer = ApplicationUtils.instance().getSearchIndexer(); - indexer.pause(); - try { - Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService()); - GraphStore graphStore = GraphStoreFactory.create(ds); + Dataset ds = new RDFServiceDataset(vreq.getUnfilteredRDFService()); + GraphStore graphStore = GraphStoreFactory.create(ds); + try { + if(indexer != null) { + indexer.pause(); + } + if(ds.supportsTransactions()) { + ds.begin(ReadWrite.WRITE); + } UpdateAction.execute(parsed, graphStore); } finally { - indexer.unpause(); + if(ds.supportsTransactions()) { + ds.commit(); + ds.end(); + } + if(indexer != null) { + indexer.unpause(); + } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java index 9304d99a7..595d3dfef 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/JenaChangeListener.java @@ -12,11 +12,14 @@ import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.StmtIterator; +import com.hp.hpl.jena.shared.Lock; +import com.hp.hpl.jena.vocabulary.OWL; +import com.hp.hpl.jena.vocabulary.RDF; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; -import edu.cornell.mannlib.vitro.webapp.servlet.setup.SimpleReasonerSetup; /** * A ChangeListener that forwards events to a Jena ModelChangedListener @@ -34,8 +37,10 @@ public class JenaChangeListener implements ChangeListener { public JenaChangeListener(ModelChangedListener listener) { this.listener = listener; - ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_REBUILD); - ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); + m.register(listener); + // these graphs no longer used +// ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_REBUILD); +// ignoredGraphs.add(SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); } @Override @@ -65,9 +70,11 @@ public class JenaChangeListener implements ChangeListener { // TODO avoid overhead of Model private Statement parseTriple(String serializedTriple) { try { - Model m = ModelFactory.createDefaultModel(); + m.enterCriticalSection(Lock.WRITE); + m.removeAll(); + // Model m = ModelFactory.createDefaultModel(); m.read(new ByteArrayInputStream( - serializedTriple.getBytes("UTF-8")), null, "N3"); + serializedTriple.getBytes("UTF-8")), null, "N-TRIPLE"); StmtIterator sit = m.listStatements(); if (!sit.hasNext()) { throw new RuntimeException("no triple parsed from change event"); @@ -83,6 +90,8 @@ public class JenaChangeListener implements ChangeListener { throw riot; } catch (UnsupportedEncodingException uee) { throw new RuntimeException(uee); + } finally { + m.leaveCriticalSection(); } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java index ae2bbfc34..b4f84cc18 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDataset.java @@ -5,8 +5,13 @@ package edu.cornell.mannlib.vitro.webapp.dao.jena; import java.util.ArrayList; import java.util.Iterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.NodeFactory; +import com.hp.hpl.jena.graph.TransactionHandler; import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.LabelExistsException; import com.hp.hpl.jena.query.ReadWrite; @@ -21,6 +26,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; public class RDFServiceDataset implements Dataset { private RDFServiceDatasetGraph g; + private ReadWrite transactionMode; public RDFServiceDataset(RDFServiceDatasetGraph g) { this.g = g; @@ -55,9 +61,13 @@ public class RDFServiceDataset implements Dataset { return g.getLock(); } + private final static Log log = LogFactory.getLog(RDFServiceDataset.class); + @Override public Model getNamedModel(String arg0) { - return RDFServiceGraph.createRDFServiceModel(g.getGraph(NodeFactory.createURI(arg0))); + Model model = RDFServiceGraph.createRDFServiceModel( + g.getGraph(NodeFactory.createURI(arg0))); + return model; } @Override @@ -108,36 +118,60 @@ public class RDFServiceDataset implements Dataset { @Override public boolean supportsTransactions() { - return false; + if (g.getDefaultGraph().getTransactionHandler() == null) { + return false; + } else { + return g.getDefaultGraph().getTransactionHandler().transactionsSupported(); + } } @Override public boolean isInTransaction() { - return false; + return (transactionMode != null); } + + private boolean supportsTransactions(Graph graph) { + return (graph.getTransactionHandler() != null + && graph.getTransactionHandler().transactionsSupported()); + } @Override public void begin(ReadWrite arg0) { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + this.transactionMode = arg0; + g.begin(arg0); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if (supportsTransactions(graph)) { + graph.getTransactionHandler().begin(); + } + } } @Override public void commit() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if(supportsTransactions(graph)) { + graph.getTransactionHandler().commit(); + } + } } @Override public void abort() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + for(String graphURI : g.getGraphCache().keySet()) { + Graph graph = g.getGraphCache().get(graphURI); + if(supportsTransactions(graph)) { + graph.getTransactionHandler().abort(); + } + } } @Override public void end() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() - + " does not support transactions."); + // the Graph tranaction handlers don't seem to support .end() + this.transactionMode = null; + g.end(); } @Override diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java index b6a60f6e1..5553a55aa 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceDatasetGraph.java @@ -6,18 +6,19 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.NodeFactory; import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; +import com.hp.hpl.jena.query.ReadWrite; import com.hp.hpl.jena.shared.Lock; import com.hp.hpl.jena.shared.LockMRSW; import com.hp.hpl.jena.sparql.core.DatasetGraph; import com.hp.hpl.jena.sparql.core.Quad; -import com.hp.hpl.jena.sparql.resultset.JSONInput; import com.hp.hpl.jena.sparql.util.Context; import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator; @@ -32,11 +33,25 @@ public class RDFServiceDatasetGraph implements DatasetGraph { private RDFService rdfService; private Lock lock = new LockMRSW(); private Context context = new Context() ; - + private Map graphCache = new ConcurrentHashMap(); + private ReadWrite transactionMode; + public RDFServiceDatasetGraph(RDFService rdfService) { this.rdfService = rdfService; } + public Map getGraphCache() { + return graphCache; + } + + public void begin(ReadWrite mode) { + this.transactionMode = mode; + } + + public void end() { + this.transactionMode = null; + } + private Graph getGraphFor(Quad q) { return getGraphFor(q.getGraph()); } @@ -44,7 +59,7 @@ public class RDFServiceDatasetGraph implements DatasetGraph { private Graph getGraphFor(Node g) { return (g == Node.ANY) ? new RDFServiceGraph(rdfService) - : new RDFServiceGraph(rdfService, g.getURI()); + : getGraph(g); } @Override @@ -172,10 +187,25 @@ public class RDFServiceDatasetGraph implements DatasetGraph { public RDFServiceGraph getDefaultGraph() { return new RDFServiceGraph(rdfService); } - + @Override public RDFServiceGraph getGraph(Node arg0) { - return new RDFServiceGraph(rdfService, arg0.getURI()); + String graphURI = arg0.getURI(); + if(graphCache.containsKey(graphURI)) { + return graphCache.get(graphURI); + } else { + RDFServiceGraph graph = new RDFServiceGraph(rdfService, arg0.getURI()); + graphCache.put(graphURI, graph); + if(transactionMode != null && supportsTransactions(graph)) { + graph.getTransactionHandler().begin(); + } + return graph; + } + } + + private boolean supportsTransactions(Graph graph) { + return (graph.getTransactionHandler() != null + && graph.getTransactionHandler().transactionsSupported()); } @Override @@ -205,13 +235,11 @@ public class RDFServiceDatasetGraph implements DatasetGraph { @Override public void removeGraph(Node arg0) { // TODO Auto-generated method stub - } @Override public void setDefaultGraph(Graph arg0) { // TODO Auto-generated method stub - } @Override @@ -229,4 +257,5 @@ public class RDFServiceDatasetGraph implements DatasetGraph { return "RDFServiceDatasetGraph[" + ToString.hashHex(this) + ", " + rdfService + "]"; } + } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java index ff262da04..d8b1b9d97 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/RDFServiceGraph.java @@ -6,8 +6,9 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; -import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -23,14 +24,13 @@ import com.hp.hpl.jena.graph.TripleMatch; import com.hp.hpl.jena.graph.impl.GraphWithPerform; import com.hp.hpl.jena.graph.impl.SimpleEventManager; import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.rdf.listeners.StatementListener; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.shared.AddDeniedException; +import com.hp.hpl.jena.shared.Command; import com.hp.hpl.jena.shared.DeleteDeniedException; import com.hp.hpl.jena.shared.PrefixMapping; import com.hp.hpl.jena.shared.impl.PrefixMappingImpl; -import com.hp.hpl.jena.sparql.resultset.JSONInput; import com.hp.hpl.jena.util.iterator.ExtendedIterator; import com.hp.hpl.jena.util.iterator.SingletonIterator; import com.hp.hpl.jena.util.iterator.WrappedIterator; @@ -38,6 +38,7 @@ import com.hp.hpl.jena.util.iterator.WrappedIterator; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString; @@ -51,6 +52,11 @@ public class RDFServiceGraph implements GraphWithPerform { private BulkUpdateHandler bulkUpdateHandler; private PrefixMapping prefixMapping = new PrefixMappingImpl(); private GraphEventManager eventManager; + + private boolean queueWrites = false; + private ConcurrentLinkedQueue addTripleQueue = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue removeTripleQueue = new ConcurrentLinkedQueue(); + /** * Returns a SparqlGraph for the union of named graphs in a remote repository @@ -90,30 +96,48 @@ public class RDFServiceGraph implements GraphWithPerform { .append(sparqlNodeUpdate(t.getObject(), "")).append(" ."); return sb.toString(); } - - @Override - public void performAdd(Triple t) { + public void flush() { + log.debug("Flushing a batch"); ChangeSet changeSet = rdfService.manufactureChangeSet(); try { - changeSet.addAddition(RDFServiceUtils.toInputStream(serialize(t)), - RDFService.ModelSerializationFormat.N3, graphURI); + if(!removeTripleQueue.isEmpty()) { + String removals = serializeQueue(removeTripleQueue); + changeSet.addRemoval(RDFServiceUtils.toInputStream(removals), + RDFService.ModelSerializationFormat.N3, graphURI); + } + if(!addTripleQueue.isEmpty()) { + String additions = serializeQueue(addTripleQueue); + changeSet.addAddition(RDFServiceUtils.toInputStream(additions), + RDFService.ModelSerializationFormat.N3, graphURI); + } rdfService.changeSetUpdate(changeSet); } catch (RDFServiceException rdfse) { throw new RuntimeException(rdfse); } - + } + + private String serializeQueue(Queue tripleQueue) { + String triples = ""; + while(!tripleQueue.isEmpty()) { + triples += " \n" + serialize(tripleQueue.poll()); + } + return triples; + } + + @Override + public void performAdd(Triple t) { + addTripleQueue.add(t); + if(!queueWrites) { + flush(); + } } @Override public void performDelete(Triple t) { - ChangeSet changeSet = rdfService.manufactureChangeSet(); - try { - changeSet.addRemoval(RDFServiceUtils.toInputStream(serialize(t)), - RDFService.ModelSerializationFormat.N3, graphURI); - rdfService.changeSetUpdate(changeSet); - } catch (RDFServiceException rdfse) { - throw new RuntimeException(rdfse); + removeTripleQueue.add(t); + if(!queueWrites) { + flush(); } } @@ -321,12 +345,11 @@ public class RDFServiceGraph implements GraphWithPerform { @Override public GraphStatisticsHandler getStatisticsHandler() { return null; - } - + } + @Override public TransactionHandler getTransactionHandler() { - // TODO Auto-generated method stub - return null; + return transactionHandler; } @Override @@ -398,11 +421,42 @@ public class RDFServiceGraph implements GraphWithPerform { public boolean iteratorRemoveAllowed() { return false; } - + @Override public boolean sizeAccurate() { return true; } + }; + + private final TransactionHandler transactionHandler = new TransactionHandler() { + @Override + public void abort() { + queueWrites = false; + removeTripleQueue.clear(); + addTripleQueue.clear(); + } + + @Override + public void begin() { + queueWrites = true; + } + + @Override + public void commit() { + flush(); + queueWrites = false; + } + + @Override + public Object executeInTransaction(Command arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean transactionsSupported() { + return true; + } }; private void execSelect(String queryStr, ResultSetConsumer consumer) { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java index 9d2c36bad..9a16f8ddb 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/dao/jena/VClassGroupCache.java @@ -16,6 +16,7 @@ import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.ontology.OntModel; import com.hp.hpl.jena.rdf.listeners.StatementListener; +import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.shared.Lock; @@ -33,6 +34,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters; +import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException; @@ -161,9 +163,28 @@ public class VClassGroupCache implements SearchIndexer.Listener { VclassMap = classMap; } + private boolean paused = false; + private boolean updateRequested = false; + + public void pause() { + this.paused = true; + } + + public void unpause() { + this.paused = false; + if(updateRequested) { + updateRequested = false; + requestCacheUpdate(); + } + } + public void requestCacheUpdate() { - log.debug("requesting update"); - _cacheRebuildThread.informOfQueueChange(); + log.debug("requesting update"); + if(paused) { + updateRequested = true; + } else { + _cacheRebuildThread.informOfQueueChange(); + } } protected void requestStop() { @@ -337,24 +358,25 @@ public class VClassGroupCache implements SearchIndexer.Listener { } } - protected static boolean isClassNameChange(Statement stmt, OntModel jenaOntModel) { + protected static boolean isClassNameChange(Statement stmt, ServletContext context) { // Check if the stmt is a rdfs:label change and that the // subject is an owl:Class. - if( RDFS.label.equals( stmt.getPredicate() )) { - jenaOntModel.enterCriticalSection(Lock.READ); - try{ - return jenaOntModel.contains( - ResourceFactory.createStatement( - ResourceFactory.createResource(stmt.getSubject().getURI()), - RDF.type, - OWL.Class)); - }finally{ - jenaOntModel.leaveCriticalSection(); - } - }else{ + if( !RDFS.label.equals( stmt.getPredicate() )) { return false; + } + OntModel jenaOntModel = ModelAccess.on(context).getOntModelSelector().getTBoxModel(); + jenaOntModel.enterCriticalSection(Lock.READ); + try{ + return jenaOntModel.contains( + ResourceFactory.createStatement( + ResourceFactory.createResource(stmt.getSubject().getURI()), + RDF.type, + OWL.Class)); + }finally{ + jenaOntModel.leaveCriticalSection(); } } + /* ******************** RebuildGroupCacheThread **************** */ protected class RebuildGroupCacheThread extends VitroBackgroundThread { @@ -460,20 +482,26 @@ public class VClassGroupCache implements SearchIndexer.Listener { log.debug("subject: " + stmt.getSubject().getURI()); log.debug("predicate: " + stmt.getPredicate().getURI()); } - if (RDF.type.getURI().equals(stmt.getPredicate().getURI())) { + if (RDF.type.equals(stmt.getPredicate())) { requestCacheUpdate(); } else if (VitroVocabulary.IN_CLASSGROUP.equals(stmt.getPredicate().getURI())) { requestCacheUpdate(); } else if(VitroVocabulary.DISPLAY_RANK.equals(stmt.getPredicate().getURI())){ requestCacheUpdate(); - } else { - OntModel jenaOntModel = ModelAccess.on(context).getOntModel(); - if( isClassNameChange(stmt, jenaOntModel) ) { - requestCacheUpdate(); - } + } else if( isClassNameChange(stmt, context) ) { + requestCacheUpdate(); } } + public void notifyEvent(Model model, Object event) { + if (event instanceof BulkUpdateEvent) { + if(((BulkUpdateEvent) event).getBegin()) { + pause(); + } else { + unpause(); + } + } + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java index 717142695..acf560acc 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/jena/sdb/RDFServiceSDB.java @@ -2,10 +2,9 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.sdb; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import javax.sql.DataSource; @@ -23,6 +22,8 @@ import com.hp.hpl.jena.sdb.sql.SDBConnection; import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper; import edu.cornell.mannlib.vitro.webapp.dao.jena.StaticDatasetFactory; +import edu.cornell.mannlib.vitro.webapp.dao.jena.event.BulkUpdateEvent; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; @@ -74,32 +75,45 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService { return false; } - SDBConnection sdbConn = getSDBConnection(); + SDBConnection sdbConn = getSDBConnection(); Dataset dataset = getDataset(sdbConn); - - try { - insureThatInputStreamsAreResettable(changeSet); - - beginTransaction(sdbConn); + try { + insureThatInputStreamsAreResettable(changeSet); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + beginTransaction(sdbConn); + startBulkUpdate(); notifyListenersOfPreChangeEvents(changeSet); - applyChangeSetToModel(changeSet, dataset); - + applyChangeSetToModel(changeSet, dataset); commitTransaction(sdbConn); - notifyListenersOfChanges(changeSet); - notifyListenersOfPostChangeEvents(changeSet); - + notifyListenersOfPostChangeEvents(changeSet); return true; } catch (Exception e) { log.error(e, e); abortTransaction(sdbConn); throw new RDFServiceException(e); } finally { + endBulkUpdate(); close(sdbConn); } } + private void startBulkUpdate() { + for (ChangeListener cl : this.getRegisteredListeners()) { + cl.notifyEvent(null, new BulkUpdateEvent(null, true)); + } + } + + private void endBulkUpdate() { + for (ChangeListener cl : this.getRegisteredListeners()) { + cl.notifyEvent(null, new BulkUpdateEvent(null, false)); + } + } + private SDBConnection getSDBConnection() throws RDFServiceException { try { Connection c = (conn != null) ? conn : ds.getConnection(); @@ -139,11 +153,7 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService { sdbConn.getTransactionHandler().abort(); } } - - private static final Pattern OPTIONAL_PATTERN = Pattern.compile("optional", Pattern.CASE_INSENSITIVE); - - private static final Pattern GRAPH_PATTERN = Pattern.compile("graph", Pattern.CASE_INSENSITIVE); - + @Override protected QueryExecution createQueryExecution(String queryString, Query q, Dataset d) { return QueryExecutionFactory.create(q, d); diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java index a4edc0995..421f44ef4 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java @@ -16,8 +16,10 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,6 +60,7 @@ import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.sparql.core.Quad; +import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils; import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; @@ -83,7 +86,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { protected String readEndpointURI; protected String updateEndpointURI; // the number of triples to be - private static final int CHUNK_SIZE = 1000; // added/removed in a single + private static final int CHUNK_SIZE = 5000; // added/removed in a single // SPARQL UPDATE protected HttpClient httpClient; @@ -170,12 +173,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { && !isPreconditionSatisfied( changeSet.getPreconditionQuery(), changeSet.getPreconditionQueryType())) { - return false; + return false; } try { - for (Object o : changeSet.getPreChangeEvents()) { - this.notifyListenersOfEvent(o); + + for (Object o : changeSet.getPreChangeEvents()) { + this.notifyListenersOfEvent(o); } Iterator csIt = changeSet.getModelChanges().iterator(); @@ -188,18 +192,26 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { modelChange.getSerializedModel().mark(Integer.MAX_VALUE); performChange(modelChange); } - + // notify listeners of triple changes csIt = changeSet.getModelChanges().iterator(); - while (csIt.hasNext()) { + while (csIt.hasNext()) { ModelChange modelChange = csIt.next(); modelChange.getSerializedModel().reset(); Model model = ModelFactory.createModelForGraph( new ListeningGraph(modelChange.getGraphURI(), this)); + long start = System.currentTimeMillis(); if (modelChange.getOperation() == ModelChange.Operation.ADD) { - model.read(modelChange.getSerializedModel(), null, + Model temp = ModelFactory.createDefaultModel(); + temp.read(modelChange.getSerializedModel(), null, getSerializationFormatString( modelChange.getSerializationFormat())); + StmtIterator sit = temp.listStatements(); + while(sit.hasNext()) { + Triple triple = sit.nextStatement().asTriple(); + this.notifyListeners(triple, ModelChange.Operation.ADD, modelChange.getGraphURI()); + } + //model.add(temp); } else if (modelChange.getOperation() == ModelChange.Operation.REMOVE){ Model temp = ModelFactory.createDefaultModel(); temp.read(modelChange.getSerializedModel(), null, @@ -210,6 +222,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { log.error("Unsupported model change type " + modelChange.getOperation().getClass().getName()); } + log.info((System.currentTimeMillis() - start) + " ms to notify " + this.getRegisteredListeners().size() + " listeners"); } for (Object o : changeSet.getPostChangeEvents()) { @@ -468,7 +481,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { */ @Override public void getGraphMetadata() throws RDFServiceException { - + throw new UnsupportedOperationException(); } /** @@ -549,7 +562,9 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { public void addModel(Model model, String graphURI) throws RDFServiceException { try { + long start = System.currentTimeMillis(); verbModel(model, graphURI, "INSERT"); + log.info((System.currentTimeMillis() - start) + " ms to insert " + model.size() + " triples"); } finally { rebuildGraphURICache = true; } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 8725bc7f9..2560e3884 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -64,7 +64,7 @@ public class ABoxRecomputer { private volatile boolean recomputing = false; private boolean stopRequested = false; - private final int BATCH_SIZE = 100; + private final int BATCH_SIZE = 500; private final int REPORTING_INTERVAL = 1000; /** @@ -109,18 +109,24 @@ public class ABoxRecomputer { recomputing = true; } } + boolean fullRecompute = (individualURIs == null); + boolean sizableRecompute = (!fullRecompute && individualURIs.size() > 2); try { - if (searchIndexer != null) { - searchIndexer.pause(); - // Register now that we want to rebuild the index when we unpause - // This allows the indexer to optimize behaviour whilst paused - searchIndexer.rebuildIndex(); + if(fullRecompute || sizableRecompute) { // if doing a full rebuild + if (searchIndexer != null) { + searchIndexer.pause(); + // Register now that we want to rebuild the index when we unpause + // This allows the indexer to optimize behaviour whilst paused + if(fullRecompute) { + searchIndexer.rebuildIndex(); + } + } } // Create a type cache for this execution and pass it to the recompute function // Ensures that caches are only valid for the length of one recompute recomputeABox(individualURIs, new TypeCaches()); } finally { - if (searchIndexer != null) { + if ((fullRecompute || sizableRecompute) && searchIndexer != null) { searchIndexer.unpause(); } synchronized (lock1) { @@ -199,7 +205,7 @@ public class ABoxRecomputer { Model rebuildModel, TypeCaches caches) throws RDFServiceException { long start = System.currentTimeMillis(); Model assertions = getAssertions(individualURI); - log.trace((System.currentTimeMillis() - start) + " ms to get assertions."); + log.debug((System.currentTimeMillis() - start) + " ms to get assertions."); Model additionalInferences = recomputeIndividual( individualURI, null, assertions, rebuildModel, caches, RUN_PLUGINS); @@ -588,7 +594,7 @@ public class ABoxRecomputer { getSameAsIndividuals(indUri, sameAsInds); } } catch (RDFServiceException e) { - + log.error(e,e); } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java index 7a75ba3d0..8a278b0d9 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java @@ -2,14 +2,14 @@ package edu.cornell.mannlib.vitro.webapp.reasoner; -import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING; - import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -50,7 +50,6 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.adapters.VitroModelFactory; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.model.RDFServiceModel; -import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; /** * Allows for real-time incremental materialization or retraction of RDFS- @@ -77,7 +76,7 @@ public class SimpleReasoner extends StatementListener { VitroModelFactory.createOntologyModel()) .createAnnotationProperty(mostSpecificTypePropertyURI); - private Queue individualURIqueue = new ConcurrentLinkedQueue(); + private Queue individualURIqueue = new IndividualURIQueue(); // DeltaComputer private CumulativeDeltaModeler aBoxDeltaModeler1 = null; @@ -186,19 +185,15 @@ public class SimpleReasoner extends StatementListener { public boolean getSameAsEnabled() { return this.doSameAs; } - + private void listenToStatement(Statement stmt) { if(stmt.getSubject().isURIResource()) { - if (!individualURIqueue.contains(stmt.getSubject().getURI())) { - individualURIqueue.add(stmt.getSubject().getURI()); - } + individualURIqueue.add(stmt.getSubject().getURI()); } if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) { - if (!individualURIqueue.contains(stmt.getObject().asResource().getURI())) { - individualURIqueue.add(stmt.getObject().asResource().getURI()); - } + individualURIqueue.add(stmt.getObject().asResource().getURI()); } - if(!accumulateChanges || individualURIqueue.size() > SAFETY_VALVE) { + if(!accumulateChanges) { recomputeIndividuals(); } } @@ -206,8 +201,17 @@ public class SimpleReasoner extends StatementListener { private static final int SAFETY_VALVE = 1000000; // one million individuals private void recomputeIndividuals() { + if(recomputer.isRecomputing()) { + return; + } + long start = System.currentTimeMillis(); + int size = individualURIqueue.size(); recomputer.recompute(individualURIqueue); - individualURIqueue.clear(); + //individualURIqueue.clear(); + if(size > 2) { + log.info((System.currentTimeMillis() - start) + " ms to recompute " + + size + " individuals"); + } } private boolean accumulateChanges = false; @@ -220,8 +224,7 @@ public class SimpleReasoner extends StatementListener { @Override public void addedStatement(Statement stmt) { doPlugins(ModelUpdate.Operation.ADD,stmt); - listenToStatement(stmt); - + listenToStatement(stmt);; // try { // if (stmt.getPredicate().equals(RDF.type)) { // addedABoxTypeAssertion(stmt, inferenceModel, new HashSet()); @@ -1633,7 +1636,7 @@ public class SimpleReasoner extends StatementListener { if (((BulkUpdateEvent) event).getBegin()) { this.accumulateChanges = true; - log.info("received a bulk update begin event"); + //log.info("received a bulk update begin event"); // if (deltaComputerProcessing) { // eventCount++; // log.info("received a bulk update begin event while processing in asynchronous mode. Event count = " + eventCount); @@ -1651,7 +1654,7 @@ public class SimpleReasoner extends StatementListener { // log.info("initializing batch mode 1"); // } } else { - log.info("received a bulk update end event"); + //log.info("received a bulk update end event"); this.accumulateChanges = false; recomputeIndividuals(); // if (!deltaComputerProcessing) { @@ -1816,5 +1819,140 @@ public class SimpleReasoner extends StatementListener { ? ((Literal)statement.getObject()).getLexicalForm() + " (Literal)" : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; } + + private class IndividualURIQueue implements Queue { + + private ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + private ConcurrentHashMap m = new ConcurrentHashMap(); + + @Override + public boolean addAll(Collection c) { + boolean changed = false; + for (E e : c) { + if(!m.containsKey(e)) { + m.put(e, Boolean.TRUE); + q.add(e); + changed = true; + } + } + return changed; + } + + @Override + public void clear() { + m.clear(); + q.clear(); + } + + @Override + public boolean contains(Object o) { + return m.contains(o); + } + + @Override + public boolean containsAll(Collection c) { + boolean contains = true; + for(Object e : c) { + contains |= m.contains(e); + } + return contains; + } + + @Override + public boolean isEmpty() { + return q.isEmpty(); + } + + @Override + public Iterator iterator() { + return q.iterator(); + } + + @Override + public boolean remove(Object o) { + m.remove(o); + return q.remove(o); + } + + @Override + public boolean removeAll(Collection c) { + for (Object e : c) { + m.remove(e); + } + return q.removeAll(c); + } + + @Override + public boolean retainAll(Collection c) { + boolean changed = false; + Iterator it = m.keySet().iterator(); + while(it.hasNext()) { + E e = it.next(); + if(!c.contains(e)) { + m.remove(e); + q.remove(e); + changed = true; + } + } + return changed; + } + + @Override + public int size() { + return m.size(); + } + + @Override + public Object[] toArray() { + return q.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return q.toArray(a); + } + + @Override + public boolean add(E e) { + if(m.containsKey(e)) { + return false; + } else { + m.put(e, Boolean.TRUE); + q.add(e); + return true; + } + } + + @Override + public E element() { + return q.element(); + } + + @Override + public boolean offer(E e) { + return q.offer(e); + } + + @Override + public E peek() { + return q.peek(); + } + + @Override + public E poll() { + E e = q.poll(); + m.remove(e); + return e; + } + + @Override + public E remove() { + E e = q.remove(); + m.remove(e); + return e; + } + + } + }