diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index d1f198176..55c7bd342 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -9,30 +9,27 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex import java.io.ByteArrayInputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.StringUtils; +import com.hp.hpl.jena.graph.Triple; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.hp.hpl.jena.rdf.model.Literal; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; -import com.hp.hpl.jena.rdf.model.Property; -import com.hp.hpl.jena.rdf.model.RDFNode; -import com.hp.hpl.jena.rdf.model.Resource; -import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.Statement; -import com.hp.hpl.jena.rdf.model.StmtIterator; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; +import org.apache.jena.riot.RDFLanguages; +import org.apache.jena.riot.RiotReader; /** * When a change is heard, wait for an interval to see if more changes come in. @@ -65,7 +62,7 @@ public class IndexingChangeListener implements ChangeListener, private final SearchIndexer searchIndexer; private final Ticker ticker; - private volatile boolean paused = true; + private final Model defaultModel; /** All access to the list must be synchronized. */ private final List changes; @@ -73,82 +70,28 @@ public class IndexingChangeListener implements ChangeListener, public IndexingChangeListener(SearchIndexer searchIndexer) { this.searchIndexer = searchIndexer; this.ticker = new Ticker(); + this.defaultModel = ModelFactory.createDefaultModel(); this.changes = new ArrayList<>(); searchIndexer.addListener(this); } private synchronized void noteChange(Statement stmt) { - try { - changes.add(sanitize(stmt)); - if (!paused) { - ticker.start(); - } - } catch (Exception e) { - log.warn("Failed to sanitize this statement: " + stmt); - } - } - - private Statement sanitize(Statement rawStmt) { - return ResourceFactory.createStatement( - sanitizeSubject(rawStmt.getSubject()), - sanitizePredicate(rawStmt.getPredicate()), - sanitizeObject(rawStmt.getObject())); - } - - private Resource sanitizeSubject(Resource rawSubject) { - if (rawSubject.isURIResource()) { - return ResourceFactory.createResource(rawSubject.getURI()); - } - return ResourceFactory.createResource(); - } - - private Property sanitizePredicate(Property rawPredicate) { - return ResourceFactory.createProperty(rawPredicate.getURI()); - } - - private RDFNode sanitizeObject(RDFNode rawObject) { - if (rawObject.isURIResource()) { - return ResourceFactory.createResource(rawObject.asResource() - .getURI()); - } - if (rawObject.isResource()) { - return ResourceFactory.createResource(); - } - Literal l = rawObject.asLiteral(); - if (StringUtils.isNotEmpty(l.getLanguage())) { - return ResourceFactory.createLangLiteral(l.getString(), - l.getLanguage()); - } - if (null != l.getDatatype()) { - return ResourceFactory.createTypedLiteral(l.getValue()); - } - return ResourceFactory.createPlainLiteral(l.getString()); + changes.add(stmt); + ticker.start(); } @Override public void receiveSearchIndexerEvent(Event event) { - if (event.getType() == PAUSE) { - paused = true; - } else if (event.getType() == UNPAUSE) { - paused = false; - ticker.start(); - } else if (event.getType() == START_REBUILD) { - discardChanges(); - } } private synchronized void respondToTicker() { - if (!paused && !changes.isEmpty()) { + if (!changes.isEmpty()) { searchIndexer.scheduleUpdatesForStatements(changes); changes.clear(); } } - private synchronized void discardChanges() { - changes.clear(); - } - public void shutdown() { ticker.shutdown(); } @@ -176,33 +119,38 @@ public class IndexingChangeListener implements ChangeListener, } } else { log.debug("ignoring event " + event.getClass().getName() + " " - + event); + + event); } } - // TODO avoid overhead of Model. // TODO avoid duplication with JenaChangeListener private Statement parseTriple(String serializedTriple) { - try { - Model m = ModelFactory.createDefaultModel(); - m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), - null, "N3"); - StmtIterator sit = m.listStatements(); - if (!sit.hasNext()) { - throw new RuntimeException("no triple parsed from change event"); - } else { - Statement s = sit.nextStatement(); - if (sit.hasNext()) { - log.warn("More than one triple parsed from change event"); - } - return s; - } - } catch (RuntimeException riot) { - log.error("Failed to parse triple " + serializedTriple, riot); - throw riot; - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException(uee); - } + try { + // Use RiotReader to parse a Triple + // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " ."; + Iterator it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null); + + if (it.hasNext()) { + Triple triple = it.next(); + + if (it.hasNext()) { + log.warn("More than one triple parsed from change event"); + } + + // Use the retained defaultModel instance to convert the Triple to a Statement + // This does not add the Statement to the Model, so the Statement can be disposed when unused + // And whilst the Model is attached to the Statement, using a single instance means only one Model + // is created and attached to all of the Statements created by this instance + return defaultModel.asStatement(triple); + } else { + throw new RuntimeException("no triple parsed from change event"); + } + } catch (RuntimeException riot) { + log.error("Failed to parse triple " + serializedTriple, riot); + throw riot; + } catch (UnsupportedEncodingException uee) { + throw new RuntimeException(uee); + } } // ----------------------------------------------------------------------