From d382e0efd66c8a692e01441c16dde7b5717e3e37 Mon Sep 17 00:00:00 2001 From: Jim Blake Date: Wed, 21 Jan 2015 12:30:44 -0500 Subject: [PATCH] VIVO-869 Don't submit updates while indexer is paused. When the pause flag is set, the IndexingChangeListener will continue to accumulate changes even if there are gaps in the stream. The listener will not submit a task until the indexer is unpaused. --- webapp/config/example.developer.properties | 2 +- .../modules/searchIndexer/SearchIndexer.java | 13 +++++--- .../searchindex/IndexingChangeListener.java | 33 ++++++++++++++++--- .../webapp/searchindex/SearchIndexerImpl.java | 5 +++ .../searchindex/SearchIndexerSetup.java | 4 ++- 5 files changed, 46 insertions(+), 11 deletions(-) diff --git a/webapp/config/example.developer.properties b/webapp/config/example.developer.properties index e0046ca8f..dd108909f 100644 --- a/webapp/config/example.developer.properties +++ b/webapp/config/example.developer.properties @@ -68,7 +68,7 @@ # developer.searchIndex.showDocuments = false # developer.searchIndex.uriOrNameRestriction = .* # developer.searchIndex.documentRestriction = .* -# developer.searchIndex.logIndexingBreakdownTimings = .* +# developer.searchIndex.logIndexingBreakdownTimings = false # developer.searchIndex.suppressModelChangeListener = false # developer.searchDeletions.enable = false diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index 79c7e04ce..8780e892a 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -29,7 +29,7 @@ public interface SearchIndexer extends Application.Module { * We accumulate a batch of affected URIs, removing duplicates if they * occur, and then submit them for updates. * - * If called before startup or while paused, this task will be queued. + * If called before startup or while paused, this task will be queued. * * @param urls * if null or empty, this call has no effect. @@ -48,7 +48,7 @@ public interface SearchIndexer extends Application.Module { * A URI belongs in the index if it refers to an existing individual in the * model, and is not excluded. * - * If called before startup or while paused, this task will be queued. + * If called before startup or while paused, this task will be queued. * * @param uris * if null or empty, this call has no effect. @@ -64,7 +64,7 @@ public interface SearchIndexer extends Application.Module { * If a rebuild is already pending or in progress, this method has no * effect. * - * If called before startup or while paused, this task will be queued. + * If called before startup or while paused, this task will be queued. * * @throws IllegalStateException * if called after shutdown() @@ -73,7 +73,7 @@ public interface SearchIndexer extends Application.Module { /** * Stop processing new tasks. Requests will be queued until a call to - * unpause(). + * unpause(). Fires a PAUSED event to listeners. * * The SearchIndexer is paused when created. When fully initialized, it * should be unpaused. @@ -87,7 +87,8 @@ public interface SearchIndexer extends Application.Module { /** * Resume processing new tasks. Any requests that were received since the - * call to pause() will now be scheduled for processing. + * call to pause() will now be scheduled for processing. Fires an UNPAUSED + * event to listeners. * * The SearchIndexer is paused when created. When fully initialized, it * should be unpaused. @@ -149,6 +150,8 @@ public interface SearchIndexer extends Application.Module { public enum Type { STARTUP, PROGRESS, + PAUSE, UNPAUSE, + START_PROCESSING_URIS, STOP_PROCESSING_URIS, START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS, diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index 958a6419a..8f66d6ff7 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -2,6 +2,9 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE; + import java.io.ByteArrayInputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; @@ -20,6 +23,7 @@ import com.hp.hpl.jena.rdf.model.StmtIterator; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; @@ -27,13 +31,18 @@ import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; * When a change is heard, wait for an interval to see if more changes come in. * When changes stop coming in for a specified interval, send what has * accumulated. + * + * When the SearchIndexer pauses, stop sending changes until the SearchIndexer + * unpauses. */ -public class IndexingChangeListener implements ChangeListener { +public class IndexingChangeListener implements ChangeListener, + SearchIndexer.Listener { private static final Log log = LogFactory .getLog(IndexingChangeListener.class); private final SearchIndexer searchIndexer; private final Ticker ticker; + private volatile boolean paused = true; /** All access to the list must be synchronized. */ private final List changes; @@ -42,16 +51,32 @@ public class IndexingChangeListener implements ChangeListener { this.searchIndexer = searchIndexer; this.ticker = new Ticker(); this.changes = new ArrayList<>(); + + searchIndexer.addListener(this); } private synchronized void noteChange(Statement stmt) { changes.add(stmt); - ticker.start(); + if (!paused) { + ticker.start(); + } + } + + @Override + public void receiveSearchIndexerEvent(Event event) { + if (event.getType() == PAUSE) { + paused = true; + } else if (event.getType() == UNPAUSE) { + paused = false; + respondToTicker(); + } } private synchronized void respondToTicker() { - searchIndexer.scheduleUpdatesForStatements(changes); - changes.clear(); + if (!paused && !changes.isEmpty()) { + searchIndexer.scheduleUpdatesForStatements(changes); + changes.clear(); + } } public void shutdown() { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index 8091b71d9..3062ca6f5 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -3,9 +3,11 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE; import static edu.cornell.mannlib.vitro.webapp.utils.developer.Key.SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS; import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.IDLE; import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING; @@ -39,6 +41,7 @@ import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; import edu.cornell.mannlib.vitro.webapp.modules.Application; import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier; @@ -235,11 +238,13 @@ public class SearchIndexerImpl implements SearchIndexer { @Override public void pause() { scheduler.pause(); + listeners.fireEvent(new Event(PAUSE, getStatus())); } @Override public void unpause() { scheduler.unpause(); + listeners.fireEvent(new Event(UNPAUSE, getStatus())); } @Override diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java index ad7d9f177..394ba688a 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java @@ -61,9 +61,11 @@ public class SearchIndexerSetup implements ServletContextListener { searchIndexer = app.getSearchIndexer(); listener = new IndexingChangeListener(searchIndexer); - + + // Wrap it so it can be disabled by a developer flag. listenerWrapper = new DeveloperDisabledChangeListener(listener, Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER); + RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( listenerWrapper);