From d37c41cf650a93a823b6b86936ded7331ea9630b Mon Sep 17 00:00:00 2001 From: Jim Blake Date: Tue, 20 Jan 2015 17:21:09 -0500 Subject: [PATCH] VIVO-869 Improve throttling for indexing tasks. Start the implementation in "paused" state, so tasks submitted before startup() are queued. Increase the time interval on IndexingChangeListener, so we get larger batches. Change the RejectedExecutionHandler on the pool to CallerRunsPolicy, so if there is no available thread for a work unit then the thread of the task itself will run it. --- .../modules/searchIndexer/SearchIndexer.java | 19 ++++++++++++++++++- .../searchindex/IndexingChangeListener.java | 2 +- .../webapp/searchindex/SearchIndexerImpl.java | 18 ++++++++++++++---- .../searchindex/SearchIndexerSetup.java | 3 ++- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index 724d2de41..79c7e04ce 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -12,6 +12,9 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application; /** * Interface for the code that controls the contents of the search index. * + * If calls are made to schedule tasks prior to startup(), they will be queued, + * since the indexer is created in paused mode. + * * The only calls that are valid after shutdown are shutdown(), getStatus() and * removeListener(). */ @@ -26,6 +29,8 @@ public interface SearchIndexer extends Application.Module { * We accumulate a batch of affected URIs, removing duplicates if they * occur, and then submit them for updates. * + * If called before startup or while paused, this task will be queued. + * * @param urls * if null or empty, this call has no effect. * @throws IllegalStateException @@ -43,6 +48,8 @@ public interface SearchIndexer extends Application.Module { * A URI belongs in the index if it refers to an existing individual in the * model, and is not excluded. * + * If called before startup or while paused, this task will be queued. + * * @param uris * if null or empty, this call has no effect. * @throws IllegalStateException @@ -57,6 +64,8 @@ public interface SearchIndexer extends Application.Module { * If a rebuild is already pending or in progress, this method has no * effect. * + * If called before startup or while paused, this task will be queued. + * * @throws IllegalStateException * if called after shutdown() */ @@ -66,6 +75,11 @@ public interface SearchIndexer extends Application.Module { * Stop processing new tasks. Requests will be queued until a call to * unpause(). * + * The SearchIndexer is paused when created. When fully initialized, it + * should be unpaused. + * + * If already paused, this call has no effect. + * * @throws IllegalStateException * if called after shutdown() */ @@ -75,7 +89,10 @@ public interface SearchIndexer extends Application.Module { * Resume processing new tasks. Any requests that were received since the * call to pause() will now be scheduled for processing. * - * Has no effect if called after shutdown(). + * The SearchIndexer is paused when created. When fully initialized, it + * should be unpaused. + * + * Has no effect if called after shutdown() or if not paused. */ void unpause(); diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index b9012189c..958a6419a 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -160,7 +160,7 @@ public class IndexingChangeListener implements ChangeListener { if (queue.isShutdown()) { log.warn("Attempt to start ticker after shutdown request."); } else { - queue.schedule(new TickerResponse(), 500, TimeUnit.MILLISECONDS); + queue.schedule(new TickerResponse(), 1, TimeUnit.SECONDS); running = true; } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index 727652447..8091b71d9 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -263,8 +263,8 @@ public class SearchIndexerImpl implements SearchIndexer { if (status.getState() != State.SHUTDOWN) { listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status)); - pool.shutdown(); taskQueue.shutdown(); + pool.shutdown(); for (DocumentModifier dm : modifiers) { try { @@ -321,7 +321,7 @@ public class SearchIndexerImpl implements SearchIndexer { private static class Scheduler { private final TaskQueue taskQueue; private final List deferredQueue; - private volatile boolean paused; + private volatile boolean paused = true; public Scheduler(TaskQueue taskQueue) { this.taskQueue = taskQueue; @@ -461,6 +461,9 @@ public class SearchIndexerImpl implements SearchIndexer { * * The task is notified as each unit completes. * + * If no thread is available for a work unit, the thread of the task itself + * will run it. This provides automatic throttling. + * * Only one task is active at a time, so the task can simply wait until this * pool is idle to know that all of its units have completed. * @@ -474,14 +477,21 @@ public class SearchIndexerImpl implements SearchIndexer { this.pool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(50), new VitroBackgroundThread.Factory( - "SearchIndexer_ThreadPool")); + "SearchIndexer_ThreadPool"), + new ThreadPoolExecutor.CallerRunsPolicy()); } public void submit(Runnable workUnit, Task task) { try { pool.execute(new WorkUnitWrapper(workUnit, task)); } catch (RejectedExecutionException e) { - log.warn("Work unit was rejected: " + workUnit + " for " + task); + if (pool.isShutdown()) { + log.warn("Work unit was rejected: " + workUnit + " for " + + task); + } else { + log.error("Work unit was rejected: " + workUnit + " for " + + task, e); + } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java index 13fd9d33a..bb94ab607 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java @@ -32,7 +32,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisab * Start the SearchIndexer. Create a listener on the RDFService and link it to * the indexer. * - * Create a history object as a listener and make it avaiable to the + * Create a history object as a listener and make it available to the * IndexController. * * Create a listener that will call commit() on the SearchEngine every time it @@ -75,6 +75,7 @@ public class SearchIndexerSetup implements ServletContextListener { searchIndexer .startup(app, new ComponentStartupStatusImpl(this, ss)); + searchIndexer.unpause(); ss.info(this, "Setup of search indexer completed."); } catch (RDFServiceException e) {