misc. cleanup / minor fixes

This commit is contained in:
brianjlowe 2015-12-12 22:58:06 +02:00
parent e85e8a4e22
commit 17fab2894c
6 changed files with 205 additions and 238 deletions

View file

@ -12,14 +12,12 @@ import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.query.Dataset; import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.DatasetFactory; import com.hp.hpl.jena.query.DatasetFactory;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper; import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.ListeningGraph;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.RDFServiceJena; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.RDFServiceJena;
public class RDFServiceModel extends RDFServiceJena implements RDFService { public class RDFServiceModel extends RDFServiceJena implements RDFService {

View file

@ -85,7 +85,6 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService {
try { try {
beginTransaction(sdbConn); beginTransaction(sdbConn);
startBulkUpdate();
notifyListenersOfPreChangeEvents(changeSet); notifyListenersOfPreChangeEvents(changeSet);
applyChangeSetToModel(changeSet, dataset); applyChangeSetToModel(changeSet, dataset);
commitTransaction(sdbConn); commitTransaction(sdbConn);
@ -97,23 +96,10 @@ public class RDFServiceSDB extends RDFServiceJena implements RDFService {
abortTransaction(sdbConn); abortTransaction(sdbConn);
throw new RDFServiceException(e); throw new RDFServiceException(e);
} finally { } finally {
endBulkUpdate();
close(sdbConn); close(sdbConn);
} }
} }
private void startBulkUpdate() {
for (ChangeListener cl : this.getRegisteredListeners()) {
cl.notifyEvent(null, new BulkUpdateEvent(null, true));
}
}
private void endBulkUpdate() {
for (ChangeListener cl : this.getRegisteredListeners()) {
cl.notifyEvent(null, new BulkUpdateEvent(null, false));
}
}
private SDBConnection getSDBConnection() throws RDFServiceException { private SDBConnection getSDBConnection() throws RDFServiceException {
try { try {
Connection c = (conn != null) ? conn : ds.getConnection(); Connection c = (conn != null) ? conn : ds.getConnection();

View file

@ -16,10 +16,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -60,7 +56,6 @@ import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sparql.core.Quad; import com.hp.hpl.jena.sparql.core.Quad;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset; import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceDataset;
import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
@ -68,10 +63,11 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange; import edu.cornell.mannlib.vitro.webapp.rdfservice.ModelChange;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.ChangeSetImpl; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.ChangeSetImpl;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.jena.ListeningGraph; import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory;
import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetQuadsIterator; import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetQuadsIterator;
import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetTriplesIterator; import edu.cornell.mannlib.vitro.webapp.utils.sparql.ResultSetIterators.ResultSetTriplesIterator;

View file

@ -5,13 +5,11 @@ package edu.cornell.mannlib.vitro.webapp.reasoner;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
@ -164,6 +162,7 @@ public class ABoxRecomputer {
return; return;
} }
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
int size = individuals.size();
int numInds = 0; int numInds = 0;
Model rebuildModel = ModelFactory.createDefaultModel(); Model rebuildModel = ModelFactory.createDefaultModel();
Model additionalInferences = ModelFactory.createDefaultModel(); Model additionalInferences = ModelFactory.createDefaultModel();
@ -185,7 +184,7 @@ public class ABoxRecomputer {
} }
if (reportingInterval) { if (reportingInterval) {
log.info("Still recomputing inferences (" log.info("Still recomputing inferences ("
+ numInds + "/" + individuals.size() + " individuals)"); + numInds + "/" + size + " individuals)");
log.info((System.currentTimeMillis() - start) / numInds + " ms per individual"); log.info((System.currentTimeMillis() - start) / numInds + " ms per individual");
} }
if (stopRequested) { if (stopRequested) {
@ -437,7 +436,7 @@ public class ABoxRecomputer {
* Get the URIs for all individuals in the system * Get the URIs for all individuals in the system
*/ */
protected Queue<String> getAllIndividualURIs() { protected Queue<String> getAllIndividualURIs() {
Queue<String> individualURIs = new ArrayDeque<String>(); Queue<String> individualURIs = new IndividualURIQueue<String>();
List<String> classList = new ArrayList<String>(); List<String> classList = new ArrayList<String>();
tboxModel.enterCriticalSection(Lock.READ); tboxModel.enterCriticalSection(Lock.READ);
try { try {

View file

@ -0,0 +1,141 @@
package edu.cornell.mannlib.vitro.webapp.reasoner;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class IndividualURIQueue<E> implements Queue<E> {
private ConcurrentLinkedQueue<E> q = new ConcurrentLinkedQueue<E>();
private ConcurrentHashMap<E, Boolean> m = new ConcurrentHashMap<E, Boolean>();
@Override
public synchronized boolean addAll(Collection<? extends E> c) {
boolean changed = false;
for (E e : c) {
if(!m.containsKey(e)) {
m.put(e, Boolean.TRUE);
q.add(e);
changed = true;
}
}
return changed;
}
@Override
public synchronized void clear() {
m.clear();
q.clear();
}
@Override
public boolean contains(Object o) {
return m.contains(o);
}
@Override
public boolean containsAll(Collection<?> c) {
boolean contains = true;
for(Object e : c) {
contains |= m.contains(e);
}
return contains;
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<E> iterator() {
return q.iterator();
}
@Override
public synchronized boolean remove(Object o) {
m.remove(o);
return q.remove(o);
}
@Override
public synchronized boolean removeAll(Collection<?> c) {
for (Object e : c) {
m.remove(e);
}
return q.removeAll(c);
}
@Override
public synchronized boolean retainAll(Collection<?> c) {
boolean changed = false;
Iterator<E> it = m.keySet().iterator();
while(it.hasNext()) {
E e = it.next();
if(!c.contains(e)) {
m.remove(e);
q.remove(e);
changed = true;
}
}
return changed;
}
@Override
public int size() {
return m.size();
}
@Override
public Object[] toArray() {
return q.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return q.toArray(a);
}
@Override
public synchronized boolean add(E e) {
if(m.containsKey(e)) {
return false;
} else {
m.put(e, Boolean.TRUE);
q.add(e);
return true;
}
}
@Override
public E element() {
return q.element();
}
@Override
public boolean offer(E e) {
return q.offer(e);
}
@Override
public E peek() {
return q.peek();
}
@Override
public synchronized E poll() {
E e = q.poll();
m.remove(e);
return e;
}
@Override
public synchronized E remove() {
E e = q.remove();
m.remove(e);
return e;
}
}

View file

@ -3,14 +3,11 @@
package edu.cornell.mannlib.vitro.webapp.reasoner; package edu.cornell.mannlib.vitro.webapp.reasoner;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -79,7 +76,6 @@ public class SimpleReasoner extends StatementListener
VitroModelFactory.createOntologyModel()) VitroModelFactory.createOntologyModel())
.createAnnotationProperty(mostSpecificTypePropertyURI); .createAnnotationProperty(mostSpecificTypePropertyURI);
// Recomputer
private ABoxRecomputer recomputer = null; private ABoxRecomputer recomputer = null;
private List<ReasonerPlugin> pluginList = new CopyOnWriteArrayList<ReasonerPlugin>(); private List<ReasonerPlugin> pluginList = new CopyOnWriteArrayList<ReasonerPlugin>();
private boolean doSameAs = true; private boolean doSameAs = true;
@ -149,7 +145,6 @@ public class SimpleReasoner extends StatementListener
ds.addNamedModel(ModelNames.ABOX_ASSERTIONS, aboxModel); ds.addNamedModel(ModelNames.ABOX_ASSERTIONS, aboxModel);
ds.addNamedModel(ModelNames.ABOX_INFERENCES, inferenceModel); ds.addNamedModel(ModelNames.ABOX_INFERENCES, inferenceModel);
ds.addNamedModel(ModelNames.TBOX_ASSERTIONS, tboxModel); ds.addNamedModel(ModelNames.TBOX_ASSERTIONS, tboxModel);
ds.setDefaultModel(ModelFactory.createUnion(fullModel, tboxModel)); ds.setDefaultModel(ModelFactory.createUnion(fullModel, tboxModel));
recomputer = new ABoxRecomputer(tboxModel, aboxModel, new RDFServiceModel(ds), this, searchIndexer); recomputer = new ABoxRecomputer(tboxModel, aboxModel, new RDFServiceModel(ds), this, searchIndexer);
} }
@ -170,41 +165,6 @@ public class SimpleReasoner extends StatementListener
return this.doSameAs; return this.doSameAs;
} }
private void listenToStatement(Statement stmt) {
Queue<String> individualURIs = new IndividualURIQueue<String>();
listenToStatement(stmt, individualURIs);
}
private void listenToStatement(Statement stmt, Queue<String> individualURIs) {
if(stmt.getSubject().isURIResource()) {
individualURIs.add(stmt.getSubject().getURI());
}
if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) {
individualURIs.add(stmt.getObject().asResource().getURI());
}
recomputeIndividuals(individualURIs);
}
private void recomputeIndividuals(Queue<String> individualURIs) {
long start = System.currentTimeMillis();
int size = individualURIs.size();
recomputer.recompute(individualURIs);
if(size > 2) {
log.info((System.currentTimeMillis() - start) + " ms to recompute "
+ size + " individuals");
}
}
boolean isABoxInferenceGraph(String graphURI) {
return ModelNames.ABOX_INFERENCES.equals(graphURI);
}
boolean isTBoxGraph(String graphURI) {
return ( ModelNames.TBOX_ASSERTIONS.equals(graphURI)
|| ModelNames.TBOX_INFERENCES.equals(graphURI)
|| (graphURI != null && graphURI.contains("tbox")) );
}
public void notifyModelChange(ModelChange modelChange) { public void notifyModelChange(ModelChange modelChange) {
if(isABoxInferenceGraph(modelChange.getGraphURI()) if(isABoxInferenceGraph(modelChange.getGraphURI())
|| isTBoxGraph(modelChange.getGraphURI())) { || isTBoxGraph(modelChange.getGraphURI())) {
@ -230,24 +190,24 @@ public class SimpleReasoner extends StatementListener
recomputeIndividuals(individualURIs); recomputeIndividuals(individualURIs);
} }
/* /*
* Performs incremental ABox reasoning based * Performs incremental ABox reasoning based
* on the addition of a new statement * on the addition of a new statement
* (aka assertion) to the ABox. * (aka assertion) to the ABox.
*/ */
@Override @Override
public void addedStatement(Statement stmt) { public void addedStatement(Statement stmt) {
doPlugins(ModelUpdate.Operation.ADD,stmt); doPlugins(ModelUpdate.Operation.ADD,stmt);
listenToStatement(stmt); listenToStatement(stmt);
} }
/* /*
* Performs incremental ABox reasoning based * Performs incremental ABox reasoning based
* on the retraction of a statement (aka assertion) * on the retraction of a statement (aka assertion)
* from the ABox. * from the ABox.
*/ */
@Override @Override
public void removedStatement(Statement stmt) { public void removedStatement(Statement stmt) {
doPlugins(ModelUpdate.Operation.RETRACT,stmt); doPlugins(ModelUpdate.Operation.RETRACT,stmt);
Queue<String> individualURIs = new IndividualURIQueue<String>(); Queue<String> individualURIs = new IndividualURIQueue<String>();
if(doSameAs && OWL.sameAs.equals(stmt.getPredicate())) { if(doSameAs && OWL.sameAs.equals(stmt.getPredicate())) {
@ -260,8 +220,33 @@ public class SimpleReasoner extends StatementListener
stmt.getObject().asResource().getURI())); stmt.getObject().asResource().getURI()));
} }
} }
listenToStatement(stmt, individualURIs); listenToStatement(stmt, individualURIs);
} }
private void listenToStatement(Statement stmt) {
Queue<String> individualURIs = new IndividualURIQueue<String>();
listenToStatement(stmt, individualURIs);
}
private void listenToStatement(Statement stmt, Queue<String> individualURIs) {
if(stmt.getSubject().isURIResource()) {
individualURIs.add(stmt.getSubject().getURI());
}
if(stmt.getObject().isURIResource() && !(RDF.type.equals(stmt.getPredicate()))) {
individualURIs.add(stmt.getObject().asResource().getURI());
}
recomputeIndividuals(individualURIs);
}
private void recomputeIndividuals(Queue<String> individualURIs) {
long start = System.currentTimeMillis();
int size = individualURIs.size();
recomputer.recompute(individualURIs);
if(size > 2) {
log.info((System.currentTimeMillis() - start) + " ms to recompute "
+ size + " individuals");
}
}
/** /**
* Performs incremental ABox reasoning based * Performs incremental ABox reasoning based
@ -1158,12 +1143,23 @@ public class SimpleReasoner extends StatementListener
} }
/** /**
* Asynchronous reasoning mode (DeltaComputer) no longer used in the case of batch removals. * Asynchronous reasoning mode (DeltaComputer) no longer used
* in the case of batch removals.
*/ */
public boolean isABoxReasoningAsynchronous() { public boolean isABoxReasoningAsynchronous() {
return false; return false;
} }
boolean isABoxInferenceGraph(String graphURI) {
return ModelNames.ABOX_INFERENCES.equals(graphURI);
}
boolean isTBoxGraph(String graphURI) {
return ( ModelNames.TBOX_ASSERTIONS.equals(graphURI)
|| ModelNames.TBOX_INFERENCES.equals(graphURI)
|| (graphURI != null && graphURI.contains("tbox")) );
}
@Override @Override
public void notifyEvent(String string, Object event) { public void notifyEvent(String string, Object event) {
// don't care // don't care
@ -1171,24 +1167,9 @@ public class SimpleReasoner extends StatementListener
@Override @Override
public void notifyEvent(Model model, Object event) { public void notifyEvent(Model model, Object event) {
// if (event instanceof BulkUpdateEvent) { // don't care
// handleBulkUpdateEvent(event);
// }
} }
// public synchronized void handleBulkUpdateEvent(Object event) {
//
// if (event instanceof BulkUpdateEvent) {
// if (((BulkUpdateEvent) event).getBegin()) {
// this.accumulateChanges = true;
// } else {
// log.debug("received a bulk update end event");
// this.accumulateChanges = false;
// recomputeIndividuals();
// }
// }
// }
/** /**
* Utility method for logging * Utility method for logging
*/ */
@ -1200,138 +1181,4 @@ public class SimpleReasoner extends StatementListener
: ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]";
} }
private class IndividualURIQueue<E> implements Queue<E> {
private ConcurrentLinkedQueue<E> q = new ConcurrentLinkedQueue<E>();
private ConcurrentHashMap<E, Boolean> m = new ConcurrentHashMap<E, Boolean>();
@Override
public synchronized boolean addAll(Collection<? extends E> c) {
boolean changed = false;
for (E e : c) {
if(!m.containsKey(e)) {
m.put(e, Boolean.TRUE);
q.add(e);
changed = true;
}
}
return changed;
}
@Override
public synchronized void clear() {
m.clear();
q.clear();
}
@Override
public boolean contains(Object o) {
return m.contains(o);
}
@Override
public boolean containsAll(Collection<?> c) {
boolean contains = true;
for(Object e : c) {
contains |= m.contains(e);
}
return contains;
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<E> iterator() {
return q.iterator();
}
@Override
public synchronized boolean remove(Object o) {
m.remove(o);
return q.remove(o);
}
@Override
public synchronized boolean removeAll(Collection<?> c) {
for (Object e : c) {
m.remove(e);
}
return q.removeAll(c);
}
@Override
public synchronized boolean retainAll(Collection<?> c) {
boolean changed = false;
Iterator<E> it = m.keySet().iterator();
while(it.hasNext()) {
E e = it.next();
if(!c.contains(e)) {
m.remove(e);
q.remove(e);
changed = true;
}
}
return changed;
}
@Override
public int size() {
return m.size();
}
@Override
public Object[] toArray() {
return q.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return q.toArray(a);
}
@Override
public synchronized boolean add(E e) {
if(m.containsKey(e)) {
return false;
} else {
m.put(e, Boolean.TRUE);
q.add(e);
return true;
}
}
@Override
public E element() {
return q.element();
}
@Override
public boolean offer(E e) {
return q.offer(e);
}
@Override
public E peek() {
return q.peek();
}
@Override
public synchronized E poll() {
E e = q.poll();
m.remove(e);
return e;
}
@Override
public synchronized E remove() {
E e = q.remove();
m.remove(e);
return e;
}
}
} }