Create deferrable task for search tasks, allowing them to be created even when the searchindexer has not been start / initialized.

Clear the deferred queue when scheduling the tasks.
This commit is contained in:
Graham Triggs 2015-02-13 06:56:55 +00:00
parent 823133688a
commit 45b753119d
4 changed files with 72 additions and 23 deletions

View file

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -86,7 +87,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private final ListenerList listeners = new ListenerList(); private final ListenerList listeners = new ListenerList();
private final TaskQueue taskQueue = new TaskQueue(); private final TaskQueue taskQueue = new TaskQueue();
private final Scheduler scheduler = new Scheduler(taskQueue); private final Scheduler scheduler = new Scheduler(this, taskQueue);
private Integer threadPoolSize; private Integer threadPoolSize;
private WorkerThreadPool pool; private WorkerThreadPool pool;
@ -245,10 +246,7 @@ public class SearchIndexerImpl implements SearchIndexer {
return; return;
} }
Task task = new UpdateStatementsTask(changes, createFindersList(), scheduler.scheduleTask(new UpdateStatementsTask.Deferrable(changes));
createExcludersList(), createModifiersList(),
wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled updates for " + changes.size() + " statements."); log.debug("Scheduled updates for " + changes.size() + " statements.");
} }
@ -262,9 +260,7 @@ public class SearchIndexerImpl implements SearchIndexer {
return; return;
} }
Task task = new UpdateUrisTask(uris, createExcludersList(), scheduler.scheduleTask(new UpdateUrisTask.Deferrable(uris));
createModifiersList(), wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled updates for " + uris.size() + " uris."); log.debug("Scheduled updates for " + uris.size() + " uris.");
} }
@ -274,9 +270,7 @@ public class SearchIndexerImpl implements SearchIndexer {
log.warn("Call to rebuildIndex after shutdown."); log.warn("Call to rebuildIndex after shutdown.");
} }
Task task = new RebuildIndexTask(createExcludersList(), scheduler.scheduleTask(new RebuildIndexTask.Deferrable());
createModifiersList(), wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled a full rebuild."); log.debug("Scheduled a full rebuild.");
} }
@ -372,13 +366,15 @@ public class SearchIndexerImpl implements SearchIndexer {
*/ */
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<DeferrableTask> deferredQueue;
private final SearchIndexerImpl indexer;
private volatile boolean started; private volatile boolean started;
private volatile boolean paused; private volatile boolean paused;
public Scheduler(TaskQueue taskQueue) { public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) {
this.indexer = indexer;
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
this.deferredQueue = new ArrayList<Task>(); this.deferredQueue = new ArrayList<DeferrableTask>();
} }
public boolean isStarted() { public boolean isStarted() {
@ -389,14 +385,21 @@ public class SearchIndexerImpl implements SearchIndexer {
return paused; return paused;
} }
public synchronized void scheduleTask(DeferrableTask task) {
if (paused || !started) {
deferredQueue.add(task);
log.debug("added task to deferred queue: " + task);
} else {
taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool));
}
}
public synchronized void scheduleTask(Task task) { public synchronized void scheduleTask(Task task) {
if (paused || !started) { if (started && !paused) {
deferredQueue.add(task);
log.debug("added task to deferred queue: " + task);
} else {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("added task to task queue: " + task); log.debug("added task to task queue: " + task);
} } else {
log.debug("indexer not running, task ignored: " + task);
}
} }
public synchronized void start() { public synchronized void start() {
@ -418,11 +421,14 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
private void processDeferredTasks() { private void processDeferredTasks() {
for (Task task : deferredQueue) { for (DeferrableTask task : deferredQueue) {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool));
log.debug("moved task from deferred queue to task queue: " log.debug("moved task from deferred queue to task queue: "
+ task); + task);
} }
// Empty out the deferred queue as we've now processed it
deferredQueue.clear();
} }
} }
@ -525,6 +531,12 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
public static interface DeferrableTask {
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders,
DocumentModifierList modifiers, IndividualDao indDao,
ListenerList listeners, WorkerThreadPool pool);
}
public static interface Task extends Runnable { public static interface Task extends Runnable {
public SearchIndexerStatus getStatus(); public SearchIndexerStatus getStatus();

View file

@ -10,6 +10,8 @@ import java.text.SimpleDateFormat;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -22,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -50,6 +53,15 @@ public class RebuildIndexTask implements Task {
private volatile SearchIndexerStatus status; private volatile SearchIndexerStatus status;
public static class Deferrable implements DeferrableTask {
public Deferrable() {}
@Override
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
return new RebuildIndexTask(excluders, modifiers, indDao, listeners, pool);
}
}
public RebuildIndexTask(SearchIndexExcluderList excluders, public RebuildIndexTask(SearchIndexExcluderList excluders,
DocumentModifierList modifiers, IndividualDao indDao, DocumentModifierList modifiers, IndividualDao indDao,
ListenerList listeners, WorkerThreadPool pool) { ListenerList listeners, WorkerThreadPool pool) {

View file

@ -14,6 +14,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -23,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -63,7 +65,18 @@ public class UpdateStatementsTask implements Task {
private final Set<String> uris; private final Set<String> uris;
private final Status status; private final Status status;
public UpdateStatementsTask(List<Statement> changes, public static class Deferrable implements DeferrableTask {
List<Statement> changes;
public Deferrable(List<Statement> changes) { this.changes = changes; }
@Override
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
return new UpdateStatementsTask(changes, uriFinders, excluders, modifiers, indDao, listeners, pool);
}
}
public UpdateStatementsTask(List<Statement> changes,
IndexingUriFinderList uriFinders, IndexingUriFinderList uriFinders,
SearchIndexExcluderList excluders, DocumentModifierList modifiers, SearchIndexExcluderList excluders, DocumentModifierList modifiers,
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {

View file

@ -13,6 +13,8 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -61,7 +63,17 @@ public class UpdateUrisTask implements Task {
private final Status status; private final Status status;
private final SearchEngine searchEngine; private final SearchEngine searchEngine;
public UpdateUrisTask(Collection<String> uris, public static class Deferrable implements SearchIndexerImpl.DeferrableTask {
Collection<String> uris;
public Deferrable(Collection<String> uris) { this.uris = uris; }
@Override
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
return new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool);
}
}
public UpdateUrisTask(Collection<String> uris,
SearchIndexExcluderList excluders, DocumentModifierList modifiers, SearchIndexExcluderList excluders, DocumentModifierList modifiers,
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
this.uris = new HashSet<>(uris); this.uris = new HashSet<>(uris);