VIVO-869 Improve throttling for indexing tasks.

Start the implementation in "paused" state, so tasks submitted before startup() are queued.
Increase the time interval on IndexingChangeListener, so we get larger batches.
Change the RejectedExecutionHandler on the pool to CallerRunsPolicy,
   so if there is no available thread for a work unit then the thread of the task itself will run it.
This commit is contained in:
Jim Blake 2015-01-20 17:21:09 -05:00
parent fcfd2e7be6
commit d37c41cf65
4 changed files with 35 additions and 7 deletions

View file

@ -12,6 +12,9 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application;
/** /**
* Interface for the code that controls the contents of the search index. * Interface for the code that controls the contents of the search index.
* *
* If calls are made to schedule tasks prior to startup(), they will be queued,
* since the indexer is created in paused mode.
*
* The only calls that are valid after shutdown are shutdown(), getStatus() and * The only calls that are valid after shutdown are shutdown(), getStatus() and
* removeListener(). * removeListener().
*/ */
@ -26,6 +29,8 @@ public interface SearchIndexer extends Application.Module {
* We accumulate a batch of affected URIs, removing duplicates if they * We accumulate a batch of affected URIs, removing duplicates if they
* occur, and then submit them for updates. * occur, and then submit them for updates.
* *
* If called before startup or while paused, this task will be queued.
*
* @param urls * @param urls
* if null or empty, this call has no effect. * if null or empty, this call has no effect.
* @throws IllegalStateException * @throws IllegalStateException
@ -43,6 +48,8 @@ public interface SearchIndexer extends Application.Module {
* A URI belongs in the index if it refers to an existing individual in the * A URI belongs in the index if it refers to an existing individual in the
* model, and is not excluded. * model, and is not excluded.
* *
* If called before startup or while paused, this task will be queued.
*
* @param uris * @param uris
* if null or empty, this call has no effect. * if null or empty, this call has no effect.
* @throws IllegalStateException * @throws IllegalStateException
@ -57,6 +64,8 @@ public interface SearchIndexer extends Application.Module {
* If a rebuild is already pending or in progress, this method has no * If a rebuild is already pending or in progress, this method has no
* effect. * effect.
* *
* If called before startup or while paused, this task will be queued.
*
* @throws IllegalStateException * @throws IllegalStateException
* if called after shutdown() * if called after shutdown()
*/ */
@ -66,6 +75,11 @@ public interface SearchIndexer extends Application.Module {
* Stop processing new tasks. Requests will be queued until a call to * Stop processing new tasks. Requests will be queued until a call to
* unpause(). * unpause().
* *
* The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* If already paused, this call has no effect.
*
* @throws IllegalStateException * @throws IllegalStateException
* if called after shutdown() * if called after shutdown()
*/ */
@ -75,7 +89,10 @@ public interface SearchIndexer extends Application.Module {
* Resume processing new tasks. Any requests that were received since the * Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing. * call to pause() will now be scheduled for processing.
* *
* Has no effect if called after shutdown(). * The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* Has no effect if called after shutdown() or if not paused.
*/ */
void unpause(); void unpause();

View file

@ -160,7 +160,7 @@ public class IndexingChangeListener implements ChangeListener {
if (queue.isShutdown()) { if (queue.isShutdown()) {
log.warn("Attempt to start ticker after shutdown request."); log.warn("Attempt to start ticker after shutdown request.");
} else { } else {
queue.schedule(new TickerResponse(), 500, TimeUnit.MILLISECONDS); queue.schedule(new TickerResponse(), 1, TimeUnit.SECONDS);
running = true; running = true;
} }
} }

View file

@ -263,8 +263,8 @@ public class SearchIndexerImpl implements SearchIndexer {
if (status.getState() != State.SHUTDOWN) { if (status.getState() != State.SHUTDOWN) {
listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status)); listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status));
pool.shutdown();
taskQueue.shutdown(); taskQueue.shutdown();
pool.shutdown();
for (DocumentModifier dm : modifiers) { for (DocumentModifier dm : modifiers) {
try { try {
@ -321,7 +321,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<Task> deferredQueue;
private volatile boolean paused; private volatile boolean paused = true;
public Scheduler(TaskQueue taskQueue) { public Scheduler(TaskQueue taskQueue) {
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
@ -461,6 +461,9 @@ public class SearchIndexerImpl implements SearchIndexer {
* *
* The task is notified as each unit completes. * The task is notified as each unit completes.
* *
* If no thread is available for a work unit, the thread of the task itself
* will run it. This provides automatic throttling.
*
* Only one task is active at a time, so the task can simply wait until this * Only one task is active at a time, so the task can simply wait until this
* pool is idle to know that all of its units have completed. * pool is idle to know that all of its units have completed.
* *
@ -474,14 +477,21 @@ public class SearchIndexerImpl implements SearchIndexer {
this.pool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, this.pool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50), 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50),
new VitroBackgroundThread.Factory( new VitroBackgroundThread.Factory(
"SearchIndexer_ThreadPool")); "SearchIndexer_ThreadPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
public void submit(Runnable workUnit, Task task) { public void submit(Runnable workUnit, Task task) {
try { try {
pool.execute(new WorkUnitWrapper(workUnit, task)); pool.execute(new WorkUnitWrapper(workUnit, task));
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
log.warn("Work unit was rejected: " + workUnit + " for " + task); if (pool.isShutdown()) {
log.warn("Work unit was rejected: " + workUnit + " for "
+ task);
} else {
log.error("Work unit was rejected: " + workUnit + " for "
+ task, e);
}
} }
} }

View file

@ -32,7 +32,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisab
* Start the SearchIndexer. Create a listener on the RDFService and link it to * Start the SearchIndexer. Create a listener on the RDFService and link it to
* the indexer. * the indexer.
* *
* Create a history object as a listener and make it avaiable to the * Create a history object as a listener and make it available to the
* IndexController. * IndexController.
* *
* Create a listener that will call commit() on the SearchEngine every time it * Create a listener that will call commit() on the SearchEngine every time it
@ -75,6 +75,7 @@ public class SearchIndexerSetup implements ServletContextListener {
searchIndexer searchIndexer
.startup(app, new ComponentStartupStatusImpl(this, ss)); .startup(app, new ComponentStartupStatusImpl(this, ss));
searchIndexer.unpause();
ss.info(this, "Setup of search indexer completed."); ss.info(this, "Setup of search indexer completed.");
} catch (RDFServiceException e) { } catch (RDFServiceException e) {