From 066d01336008e5f8747c2a15b08c350b2a63bce7 Mon Sep 17 00:00:00 2001 From: Jim Blake Date: Tue, 17 Feb 2015 12:08:09 -0500 Subject: [PATCH] Modifications to https://github.com/vivo-project/Vitro/pull/14 Restore pause logic to IndexingChangeListener. Adjust comments, remove compiler warnings and vestigial code items, rename method. --- .../modules/searchIndexer/SearchIndexer.java | 19 +-- .../vitro/webapp/reasoner/ABoxRecomputer.java | 2 +- .../searchindex/IndexingChangeListener.java | 34 +++-- .../webapp/searchindex/SearchIndexerImpl.java | 120 +++++++++--------- .../tasks/UpdateStatementsTask.java | 1 - .../searchindex/tasks/UpdateUrisTask.java | 5 +- .../searchIndexer/SearchIndexerStub.java | 2 +- 7 files changed, 102 insertions(+), 81 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index fc3b3667c..4e6c5db56 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -40,18 +40,19 @@ public interface SearchIndexer extends Application.Module { * unpause(). Fires a PAUSED event to listeners. * * This call has no effect if already paused, or if called after shutdown. - */ + */ void pause(); - /** - * Stop processing new tasks. Requests will be ignored and the index rebuilt when unpaused. - * Fires a PAUSED event to listeners. - * - * This call has no effect if already paused, or if called after shutdown. - */ - void pauseWithoutDeferring(); + /** + * Stop processing new tasks. If any request is received while the indexer + * is paused, the request will be ignored, but the index will be rebuilt + * when unpaused. Fires a PAUSED event to listeners. + * + * This call has no effect if already paused, or if called after shutdown. + */ + void pauseInAnticipationOfRebuild(); - /** + /** * Resume processing new tasks. Any requests that were received since the * call to pause() will now be scheduled for processing. Fires an UNPAUSED * event to listeners. diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 2dcd771d4..bf11a95cd 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -104,7 +104,7 @@ public class ABoxRecomputer { } try { if (searchIndexer != null) { - searchIndexer.pauseWithoutDeferring(); + searchIndexer.pauseInAnticipationOfRebuild(); } recomputeABox(); } finally { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index 55c7bd342..0ba849f06 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -15,10 +15,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.hp.hpl.jena.graph.Triple; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.jena.riot.RDFLanguages; +import org.apache.jena.riot.RiotReader; +import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.Statement; @@ -28,8 +30,6 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; -import org.apache.jena.riot.RDFLanguages; -import org.apache.jena.riot.RiotReader; /** * When a change is heard, wait for an interval to see if more changes come in. @@ -62,6 +62,7 @@ public class IndexingChangeListener implements ChangeListener, private final SearchIndexer searchIndexer; private final Ticker ticker; + private volatile boolean paused = true; private final Model defaultModel; /** All access to the list must be synchronized. */ @@ -77,21 +78,35 @@ public class IndexingChangeListener implements ChangeListener, } private synchronized void noteChange(Statement stmt) { - changes.add(stmt); - ticker.start(); + changes.add(stmt); + if (!paused) { + ticker.start(); + } } @Override public void receiveSearchIndexerEvent(Event event) { + if (event.getType() == PAUSE) { + paused = true; + } else if (event.getType() == UNPAUSE) { + paused = false; + ticker.start(); + } else if (event.getType() == START_REBUILD) { + discardChanges(); + } } private synchronized void respondToTicker() { - if (!changes.isEmpty()) { + if (!paused && !changes.isEmpty()) { searchIndexer.scheduleUpdatesForStatements(changes); changes.clear(); } } + private synchronized void discardChanges() { + changes.clear(); + } + public void shutdown() { ticker.shutdown(); } @@ -128,13 +143,14 @@ public class IndexingChangeListener implements ChangeListener, try { // Use RiotReader to parse a Triple // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " ."; - Iterator it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null); + ByteArrayInputStream input = new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")); + Iterator it = RiotReader.createIteratorTriples(input, RDFLanguages.NTRIPLES, null); if (it.hasNext()) { Triple triple = it.next(); if (it.hasNext()) { - log.warn("More than one triple parsed from change event"); + log.warn("More than one triple parsed from change event: '" + serializedTriple + "'"); } // Use the retained defaultModel instance to convert the Triple to a Statement @@ -143,7 +159,7 @@ public class IndexingChangeListener implements ChangeListener, // is created and attached to all of the Statements created by this instance return defaultModel.asStatement(triple); } else { - throw new RuntimeException("no triple parsed from change event"); + throw new RuntimeException("no triple parsed from change event: '" + serializedTriple + "'"); } } catch (RuntimeException riot) { log.error("Failed to parse triple " + serializedTriple, riot); diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index c9a94329c..15883603e 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -28,12 +28,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletContext; -import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.rdf.model.Statement; +import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; @@ -87,7 +87,7 @@ public class SearchIndexerImpl implements SearchIndexer { private final ListenerList listeners = new ListenerList(); private final TaskQueue taskQueue = new TaskQueue(); - private final Scheduler scheduler = new Scheduler(this, taskQueue); + private final Scheduler scheduler = new Scheduler(taskQueue); private Integer threadPoolSize; private WorkerThreadPool pool; @@ -209,7 +209,7 @@ public class SearchIndexerImpl implements SearchIndexer { } @Override - public void pauseWithoutDeferring() { + public void pauseInAnticipationOfRebuild() { if (!isPaused() && !isShutdown()) { ignoreTasksWhilePaused = true; rebuildOnUnpause = false; @@ -398,12 +398,10 @@ public class SearchIndexerImpl implements SearchIndexer { private static class Scheduler { private final TaskQueue taskQueue; private final List deferredQueue; - private final SearchIndexerImpl indexer; private volatile boolean started; private volatile boolean paused; - public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) { - this.indexer = indexer; + public Scheduler(TaskQueue taskQueue) { this.taskQueue = taskQueue; this.deferredQueue = new ArrayList(); } @@ -449,9 +447,7 @@ public class SearchIndexerImpl implements SearchIndexer { taskQueue.scheduleTask(task); log.debug("moved task from deferred queue to task queue: " + task); } - - // Empty out the deferred queue as we've now processed it - deferredQueue.clear(); + deferredQueue.clear(); } } @@ -554,35 +550,65 @@ public class SearchIndexerImpl implements SearchIndexer { } } - /** - * Interface for tasks to access the Indexer config - */ - public static interface IndexerConfig { - public IndexingUriFinderList uriFinderList(); - public SearchIndexExcluderList excluderList(); - public DocumentModifierList documentModifierList(); - public IndividualDao individualDao(); - public ListenerList listenerList(); - public WorkerThreadPool workerThreadPool(); - } + /** + * Interface for tasks to access the Indexer config + */ + public static interface IndexerConfig { + public IndexingUriFinderList uriFinderList(); - /** - * Implementation of IndexerConfig - * Defers access to the configuration until the task is running, so a Task - * created and deferred before the indexer is started will not cause a NullPointerException - */ - private static class IndexerConfigImpl implements IndexerConfig { - private final SearchIndexerImpl sii; + public SearchIndexExcluderList excluderList(); - public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; } + public DocumentModifierList documentModifierList(); - public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); } - public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); } - public DocumentModifierList documentModifierList() { return sii.createModifiersList(); } - public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); } - public ListenerList listenerList() { return sii.listeners; } - public WorkerThreadPool workerThreadPool() { return sii.pool; } - } + public IndividualDao individualDao(); + + public ListenerList listenerList(); + + public WorkerThreadPool workerThreadPool(); + } + + /** + * Implementation of IndexerConfig Defers access to the configuration until + * the task is running, so a Task created and deferred before the indexer is + * started will not cause a NullPointerException + */ + private static class IndexerConfigImpl implements IndexerConfig { + private final SearchIndexerImpl sii; + + public IndexerConfigImpl(SearchIndexerImpl sii) { + this.sii = sii; + } + + @Override + public IndexingUriFinderList uriFinderList() { + return sii.createFindersList(); + } + + @Override + public SearchIndexExcluderList excluderList() { + return sii.createExcludersList(); + } + + @Override + public DocumentModifierList documentModifierList() { + return sii.createModifiersList(); + } + + @Override + public IndividualDao individualDao() { + return sii.wadf.getIndividualDao(); + } + + @Override + public ListenerList listenerList() { + return sii.listeners; + } + + @Override + public WorkerThreadPool workerThreadPool() { + return sii.pool; + } + } public static interface Task extends Runnable { public SearchIndexerStatus getStatus(); @@ -688,28 +714,4 @@ public class SearchIndexerImpl implements SearchIndexer { } } - - private static class StatementList { - List changes; - - public StatementList() { - changes = new ArrayList(); - } - - public synchronized void addStatement(Statement stmt) { - changes.add(stmt); - } - - public synchronized List getStatements() { - try { - return new ArrayList<>(changes); - } finally { - changes.clear(); - } - } - - public synchronized int size() { - return changes.size(); - } - }; } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java index 9a12a04bc..50c96e7b7 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java @@ -14,7 +14,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java index 2bfc7db38..123610910 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java @@ -7,7 +7,10 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; -import java.util.*; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java index bd902cb58..fac91559a 100644 --- a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java +++ b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java @@ -32,7 +32,7 @@ public class SearchIndexerStub implements SearchIndexer { } @Override - public void pauseWithoutDeferring() { + public void pauseInAnticipationOfRebuild() { paused = true; }