Simplify Listener, allowing tasks to be submitted to the Indexer even when paused

This commit is contained in:
Graham Triggs 2015-02-15 13:32:16 +00:00
parent aedfb7280f
commit 2bde98e3df

View file

@ -9,30 +9,27 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils; import com.hp.hpl.jena.graph.Triple;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Literal;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.RiotReader;
/** /**
* When a change is heard, wait for an interval to see if more changes come in. * When a change is heard, wait for an interval to see if more changes come in.
@ -65,7 +62,7 @@ public class IndexingChangeListener implements ChangeListener,
private final SearchIndexer searchIndexer; private final SearchIndexer searchIndexer;
private final Ticker ticker; private final Ticker ticker;
private volatile boolean paused = true; private final Model defaultModel;
/** All access to the list must be synchronized. */ /** All access to the list must be synchronized. */
private final List<Statement> changes; private final List<Statement> changes;
@ -73,82 +70,28 @@ public class IndexingChangeListener implements ChangeListener,
public IndexingChangeListener(SearchIndexer searchIndexer) { public IndexingChangeListener(SearchIndexer searchIndexer) {
this.searchIndexer = searchIndexer; this.searchIndexer = searchIndexer;
this.ticker = new Ticker(); this.ticker = new Ticker();
this.defaultModel = ModelFactory.createDefaultModel();
this.changes = new ArrayList<>(); this.changes = new ArrayList<>();
searchIndexer.addListener(this); searchIndexer.addListener(this);
} }
private synchronized void noteChange(Statement stmt) { private synchronized void noteChange(Statement stmt) {
try { changes.add(stmt);
changes.add(sanitize(stmt));
if (!paused) {
ticker.start(); ticker.start();
} }
} catch (Exception e) {
log.warn("Failed to sanitize this statement: " + stmt);
}
}
private Statement sanitize(Statement rawStmt) {
return ResourceFactory.createStatement(
sanitizeSubject(rawStmt.getSubject()),
sanitizePredicate(rawStmt.getPredicate()),
sanitizeObject(rawStmt.getObject()));
}
private Resource sanitizeSubject(Resource rawSubject) {
if (rawSubject.isURIResource()) {
return ResourceFactory.createResource(rawSubject.getURI());
}
return ResourceFactory.createResource();
}
private Property sanitizePredicate(Property rawPredicate) {
return ResourceFactory.createProperty(rawPredicate.getURI());
}
private RDFNode sanitizeObject(RDFNode rawObject) {
if (rawObject.isURIResource()) {
return ResourceFactory.createResource(rawObject.asResource()
.getURI());
}
if (rawObject.isResource()) {
return ResourceFactory.createResource();
}
Literal l = rawObject.asLiteral();
if (StringUtils.isNotEmpty(l.getLanguage())) {
return ResourceFactory.createLangLiteral(l.getString(),
l.getLanguage());
}
if (null != l.getDatatype()) {
return ResourceFactory.createTypedLiteral(l.getValue());
}
return ResourceFactory.createPlainLiteral(l.getString());
}
@Override @Override
public void receiveSearchIndexerEvent(Event event) { public void receiveSearchIndexerEvent(Event event) {
if (event.getType() == PAUSE) {
paused = true;
} else if (event.getType() == UNPAUSE) {
paused = false;
ticker.start();
} else if (event.getType() == START_REBUILD) {
discardChanges();
}
} }
private synchronized void respondToTicker() { private synchronized void respondToTicker() {
if (!paused && !changes.isEmpty()) { if (!changes.isEmpty()) {
searchIndexer.scheduleUpdatesForStatements(changes); searchIndexer.scheduleUpdatesForStatements(changes);
changes.clear(); changes.clear();
} }
} }
private synchronized void discardChanges() {
changes.clear();
}
public void shutdown() { public void shutdown() {
ticker.shutdown(); ticker.shutdown();
} }
@ -180,22 +123,27 @@ public class IndexingChangeListener implements ChangeListener,
} }
} }
// TODO avoid overhead of Model.
// TODO avoid duplication with JenaChangeListener // TODO avoid duplication with JenaChangeListener
private Statement parseTriple(String serializedTriple) { private Statement parseTriple(String serializedTriple) {
try { try {
Model m = ModelFactory.createDefaultModel(); // Use RiotReader to parse a Triple
m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " .";
null, "N3"); Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null);
StmtIterator sit = m.listStatements();
if (!sit.hasNext()) { if (it.hasNext()) {
throw new RuntimeException("no triple parsed from change event"); Triple triple = it.next();
} else {
Statement s = sit.nextStatement(); if (it.hasNext()) {
if (sit.hasNext()) {
log.warn("More than one triple parsed from change event"); log.warn("More than one triple parsed from change event");
} }
return s;
// Use the retained defaultModel instance to convert the Triple to a Statement
// This does not add the Statement to the Model, so the Statement can be disposed when unused
// And whilst the Model is attached to the Statement, using a single instance means only one Model
// is created and attached to all of the Statements created by this instance
return defaultModel.asStatement(triple);
} else {
throw new RuntimeException("no triple parsed from change event");
} }
} catch (RuntimeException riot) { } catch (RuntimeException riot) {
log.error("Failed to parse triple " + serializedTriple, riot); log.error("Failed to parse triple " + serializedTriple, riot);