VIVO-869 More formal coordination between startup(), pause(), and unpause().

This commit is contained in:
Jim Blake 2015-02-07 13:13:48 -05:00
parent b213718a72
commit 9f9cea06e3
4 changed files with 200 additions and 123 deletions

View file

@ -8,17 +8,62 @@ import java.util.List;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import edu.cornell.mannlib.vitro.webapp.modules.Application; import edu.cornell.mannlib.vitro.webapp.modules.Application;
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
/** /**
* Interface for the code that controls the contents of the search index. * Interface for the code that controls the contents of the search index.
* *
* If calls are made to schedule tasks prior to startup(), they will be queued, * The search indexer is started rather late in the startup sequence, since it
* since the indexer is created in paused mode. * requires the search engine, the triple stores and the filters.
*
* If calls are made to schedule tasks prior to startup(), they will be queued.
* Calls to pause or unpause set the state as expected, but tasks will not be
* run until the indexer is both started and unpaused.
* *
* The only calls that are valid after shutdown are shutdown(), getStatus() and * The only calls that are valid after shutdown are shutdown(), getStatus() and
* removeListener(). * removeListener(). Calls to other methods produce a warning, but have no other
* effect.
*/ */
public interface SearchIndexer extends Application.Module { public interface SearchIndexer extends Application.Module {
/**
* Start processing. If unpaused, schedule any queued tasks.
*
* @throws IllegalStateException
* if called after shutdown, or if called more than once.
*/
@Override
void startup(Application app, ComponentStartupStatus ss);
/**
* Stop processing new tasks. Requests will be queued until a call to
* unpause(). Fires a PAUSED event to listeners.
*
* This call has no effect if already paused, or if called after shutdown.
*/
void pause();
/**
* Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing. Fires an UNPAUSED
* event to listeners.
*
* If startup has not been called, the unpaused state will be recorded, but
* tasks will still await the call to startup.
*
* This call has no effect if not paused, or if called after shutdown.
*/
void unpause();
/**
* Stop processing and release resources. This call should block until the
* dependent threads are stopped.
*
* Repeated calls have no effect.
*/
@Override
void shutdown(Application app);
/** /**
* Update any search documents that are affected by these statements. * Update any search documents that are affected by these statements.
* *
@ -29,12 +74,11 @@ public interface SearchIndexer extends Application.Module {
* We accumulate a batch of affected URIs, removing duplicates if they * We accumulate a batch of affected URIs, removing duplicates if they
* occur, and then submit them for updates. * occur, and then submit them for updates.
* *
* If called before startup or while paused, this task will be queued. * If called before startup or while paused, the task will be queued. If
* called after shutdown, this has no effect.
* *
* @param urls * @param urls
* if null or empty, this call has no effect. * if null or empty, this call has no effect.
* @throws IllegalStateException
* if called after shutdown()
*/ */
void scheduleUpdatesForStatements(List<Statement> changes); void scheduleUpdatesForStatements(List<Statement> changes);
@ -48,12 +92,11 @@ public interface SearchIndexer extends Application.Module {
* A URI belongs in the index if it refers to an existing individual in the * A URI belongs in the index if it refers to an existing individual in the
* model, and is not excluded. * model, and is not excluded.
* *
* If called before startup or while paused, this task will be queued. * If called before startup or while paused, the task will be queued. If
* called after shutdown, this has no effect.
* *
* @param uris * @param uris
* if null or empty, this call has no effect. * if null or empty, this call has no effect.
* @throws IllegalStateException
* if called after shutdown()
*/ */
void scheduleUpdatesForUris(Collection<String> uris); void scheduleUpdatesForUris(Collection<String> uris);
@ -64,39 +107,11 @@ public interface SearchIndexer extends Application.Module {
* If a rebuild is already pending or in progress, this method has no * If a rebuild is already pending or in progress, this method has no
* effect. * effect.
* *
* If called before startup or while paused, this task will be queued. * If called before startup or while paused, the task will be queued. If
* * called after shutdown, this has no effect.
* @throws IllegalStateException
* if called after shutdown()
*/ */
void rebuildIndex(); void rebuildIndex();
/**
* Stop processing new tasks. Requests will be queued until a call to
* unpause(). Fires a PAUSED event to listeners.
*
* The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* If already paused, this call has no effect.
*
* @throws IllegalStateException
* if called after shutdown()
*/
void pause();
/**
* Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing. Fires an UNPAUSED
* event to listeners.
*
* The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* Has no effect if called after shutdown() or if not paused.
*/
void unpause();
/** /**
* What is the current status of the indexer? * What is the current status of the indexer?
* *
@ -105,36 +120,32 @@ public interface SearchIndexer extends Application.Module {
SearchIndexerStatus getStatus(); SearchIndexerStatus getStatus();
/** /**
* Add this listener, allowing it to receive events from the indexer. If * Add this listener, allowing it to receive events from the indexer.
* this listener has already been added, this method has no effect. *
* The listener can safely assume that the SearchIndexer is not paused. If
* the SearchIndexer is indeed paused when the listener is added, then a
* PAUSE event will immediately be passed to the listener.
*
* If this listener has already been added, or if called after shutdown,
* this method has no effect.
* *
* @param listener * @param listener
* if null, this method has no effect. * if null, this method has no effect.
* @throws IllegalStateException
* if called after shutdown()
*/ */
void addListener(Listener listener); void addListener(Listener listener);
/** /**
* Remove this listener, meaning that it will no longer receive events from * Remove this listener, meaning that it will no longer receive events from
* the indexer. If this listener is not active, this method has no effect. * the indexer.
* *
* Has no effect if called after shutdown(). * If this listener is not active, or if called after shutdown, this method
* has no effect.
* *
* @param listener * @param listener
* if null, this method has no effect. * if null, this method has no effect.
*/ */
void removeListener(Listener listener); void removeListener(Listener listener);
/**
* Stop processing and release resources. This call should block until the
* dependent threads are stopped.
*
* Repeated calls have no effect.
*/
@Override
void shutdown(Application app);
/** /**
* A listener that will be notified of events from the SearchIndexer. * A listener that will be notified of events from the SearchIndexer.
*/ */

View file

@ -25,7 +25,6 @@ import com.hp.hpl.jena.rdf.model.StmtIterator;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;

View file

@ -43,7 +43,6 @@ import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierList; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierList;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierListBasic; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierListBasic;
@ -98,6 +97,10 @@ public class SearchIndexerImpl implements SearchIndexer {
private Set<IndexingUriFinder> uriFinders; private Set<IndexingUriFinder> uriFinders;
private WebappDaoFactory wadf; private WebappDaoFactory wadf;
// ----------------------------------------------------------------------
// ConfigurationBeanLoader methods.
// ----------------------------------------------------------------------
@Property(uri = "http://vitro.mannlib.cornell.edu/ns/vitro/ApplicationSetup#threadPoolSize") @Property(uri = "http://vitro.mannlib.cornell.edu/ns/vitro/ApplicationSetup#threadPoolSize")
public void setThreadPoolSize(String size) { public void setThreadPoolSize(String size) {
if (threadPoolSize == null) { if (threadPoolSize == null) {
@ -119,16 +122,27 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
// ----------------------------------------------------------------------
// State management.
// ----------------------------------------------------------------------
@Override @Override
public void startup(Application application, ComponentStartupStatus ss) { public void startup(Application application, ComponentStartupStatus ss) {
if (isStarted()) {
throw new IllegalStateException("startup() called more than once.");
}
if (isShutdown()) {
throw new IllegalStateException(
"startup() called after shutdown().");
}
try { try {
this.ctx = application.getServletContext(); this.ctx = application.getServletContext();
this.wadf = getFilteredWebappDaoFactory();
loadConfiguration(); loadConfiguration();
this.wadf = getFilteredWebappDaoFactory(); fireEvent(STARTUP);
scheduler.start();
listeners.fireEvent(new Event(STARTUP, getStatus()));
ss.info("Configured SearchIndexer: excluders=" + excluders ss.info("Configured SearchIndexer: excluders=" + excluders
+ ", modifiers=" + modifiers + ", uriFinders=" + uriFinders); + ", modifiers=" + modifiers + ", uriFinders=" + uriFinders);
} catch (Exception e) { } catch (Exception e) {
@ -136,6 +150,13 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
/** With a filtered factory, only public data goes into the search index. */
private WebappDaoFactory getFilteredWebappDaoFactory() {
WebappDaoFactory rawWadf = ModelAccess.on(ctx).getWebappDaoFactory();
VitroFilters vf = VitroFilterUtils.getPublicFilter(ctx);
return new WebappDaoFactoryFiltering(rawWadf, vf);
}
private void loadConfiguration() throws ConfigurationBeanLoaderException { private void loadConfiguration() throws ConfigurationBeanLoaderException {
ConfigurationBeanLoader beanLoader = new ConfigurationBeanLoader( ConfigurationBeanLoader beanLoader = new ConfigurationBeanLoader(
ModelAccess.on(ctx).getOntModel(DISPLAY), ctx); ModelAccess.on(ctx).getOntModel(DISPLAY), ctx);
@ -151,24 +172,77 @@ public class SearchIndexerImpl implements SearchIndexer {
modifiers.addAll(beanLoader.loadAll(DocumentModifier.class)); modifiers.addAll(beanLoader.loadAll(DocumentModifier.class));
} }
/**
* Use a filtered DAO factory, so only public data goes into the search
* index.
*/
private WebappDaoFactory getFilteredWebappDaoFactory() {
WebappDaoFactory rawWadf = ModelAccess.on(ctx).getWebappDaoFactory();
VitroFilters vf = VitroFilterUtils.getPublicFilter(ctx);
return new WebappDaoFactoryFiltering(rawWadf, vf);
}
@Override @Override
public void scheduleUpdatesForStatements(List<Statement> changes) { public synchronized void shutdown(Application application) {
if (changes == null || changes.isEmpty()) { if (isShutdown()) {
return; return;
} }
if (taskQueue.isShutdown()) { fireEvent(SHUTDOWN_REQUESTED);
throw new IllegalStateException("SearchIndexer is shut down.");
taskQueue.shutdown();
pool.shutdown();
for (DocumentModifier dm : modifiers) {
try {
dm.shutdown();
} catch (Exception e) {
log.warn("Failed to shut down document modifier " + dm, e);
}
}
fireEvent(SHUTDOWN_COMPLETE);
}
@Override
public void pause() {
if (!isPaused() && !isShutdown()) {
scheduler.pause();
fireEvent(PAUSE);
}
}
@Override
public void unpause() {
if (isPaused() && !isShutdown()) {
scheduler.unpause();
fireEvent(UNPAUSE);
}
}
private boolean isStarted() {
return scheduler.isStarted();
}
private boolean isPaused() {
return scheduler.isPaused();
}
private boolean isShutdown() {
return taskQueue.isShutdown();
}
@Override
public SearchIndexerStatus getStatus() {
return taskQueue.getStatus();
}
private void fireEvent(Type type) {
listeners.fireEvent(new Event(type, getStatus()));
}
// ----------------------------------------------------------------------
// Tasks
// ----------------------------------------------------------------------
@Override
public void scheduleUpdatesForStatements(List<Statement> changes) {
if (isShutdown()) {
log.warn("Call to scheduleUpdatesForStatements after shutdown.");
return;
}
if (changes == null || changes.isEmpty()) {
return;
} }
Task task = new UpdateStatementsTask(changes, createFindersList(), Task task = new UpdateStatementsTask(changes, createFindersList(),
@ -180,12 +254,12 @@ public class SearchIndexerImpl implements SearchIndexer {
@Override @Override
public void scheduleUpdatesForUris(Collection<String> uris) { public void scheduleUpdatesForUris(Collection<String> uris) {
if (uris == null || uris.isEmpty()) { if (isShutdown()) {
log.warn("Call to scheduleUpdatesForUris after shutdown.");
return; return;
} }
if (uris == null || uris.isEmpty()) {
if (taskQueue.isShutdown()) { return;
throw new IllegalStateException("SearchIndexer is shut down.");
} }
Task task = new UpdateUrisTask(uris, createExcludersList(), Task task = new UpdateUrisTask(uris, createExcludersList(),
@ -196,8 +270,8 @@ public class SearchIndexerImpl implements SearchIndexer {
@Override @Override
public void rebuildIndex() { public void rebuildIndex() {
if (taskQueue.isShutdown()) { if (isShutdown()) {
throw new IllegalStateException("SearchIndexer is shut down."); log.warn("Call to rebuildIndex after shutdown.");
} }
Task task = new RebuildIndexTask(createExcludersList(), Task task = new RebuildIndexTask(createExcludersList(),
@ -235,21 +309,21 @@ public class SearchIndexerImpl implements SearchIndexer {
SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS); SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS);
} }
@Override // ----------------------------------------------------------------------
public void pause() { // Listeners
scheduler.pause(); // ----------------------------------------------------------------------
listeners.fireEvent(new Event(PAUSE, getStatus()));
}
@Override
public void unpause() {
scheduler.unpause();
listeners.fireEvent(new Event(UNPAUSE, getStatus()));
}
@Override @Override
public void addListener(Listener listener) { public void addListener(Listener listener) {
listeners.add(listener); if (isShutdown()) {
return;
}
synchronized (listeners) {
listeners.add(listener);
if (isPaused()) {
listener.receiveSearchIndexerEvent(new Event(PAUSE, getStatus()));
}
}
} }
@Override @Override
@ -257,33 +331,6 @@ public class SearchIndexerImpl implements SearchIndexer {
listeners.remove(listener); listeners.remove(listener);
} }
@Override
public SearchIndexerStatus getStatus() {
return taskQueue.getStatus();
}
@Override
public synchronized void shutdown(Application application) {
SearchIndexerStatus status = taskQueue.getStatus();
if (status.getState() != State.SHUTDOWN) {
listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status));
taskQueue.shutdown();
pool.shutdown();
for (DocumentModifier dm : modifiers) {
try {
dm.shutdown();
} catch (Exception e) {
log.warn("Failed to shut down document modifier " + dm, e);
}
}
listeners.fireEvent(new Event(SHUTDOWN_COMPLETE, taskQueue
.getStatus()));
}
}
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// Helper classes // Helper classes
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@ -326,6 +373,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<Task> deferredQueue;
private volatile boolean started;
private volatile boolean paused = true; private volatile boolean paused = true;
public Scheduler(TaskQueue taskQueue) { public Scheduler(TaskQueue taskQueue) {
@ -333,8 +381,16 @@ public class SearchIndexerImpl implements SearchIndexer {
this.deferredQueue = new ArrayList<Task>(); this.deferredQueue = new ArrayList<Task>();
} }
public boolean isStarted() {
return started;
}
public boolean isPaused() {
return paused;
}
public synchronized void scheduleTask(Task task) { public synchronized void scheduleTask(Task task) {
if (paused) { if (paused || !started) {
deferredQueue.add(task); deferredQueue.add(task);
log.debug("added task to deferred queue: " + task); log.debug("added task to deferred queue: " + task);
} else { } else {
@ -343,18 +399,32 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
public synchronized void start() {
started = true;
if (!paused) {
processDeferredTasks();
}
}
public synchronized void pause() { public synchronized void pause() {
paused = true; paused = true;
} }
public synchronized void unpause() { public synchronized void unpause() {
paused = false; paused = false;
if (started) {
processDeferredTasks();
}
}
private void processDeferredTasks() {
for (Task task : deferredQueue) { for (Task task : deferredQueue) {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("moved task from deferred queue to task queue: " log.debug("moved task from deferred queue to task queue: "
+ task); + task);
} }
} }
} }
/** /**

View file

@ -51,12 +51,10 @@ public class SearchIndexerSetup implements ServletContextListener {
try { try {
searchIndexer = app.getSearchIndexer(); searchIndexer = app.getSearchIndexer();
// A change listener, wrapped so it can respond to a developer flag.
listener = new IndexingChangeListener(searchIndexer); listener = new IndexingChangeListener(searchIndexer);
// Wrap it so it can be disabled by a developer flag.
listenerWrapper = new DeveloperDisabledChangeListener(listener, listenerWrapper = new DeveloperDisabledChangeListener(listener,
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER); Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER);
RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( RDFServiceUtils.getRDFServiceFactory(ctx).registerListener(
listenerWrapper); listenerWrapper);
@ -66,7 +64,6 @@ public class SearchIndexerSetup implements ServletContextListener {
searchIndexer searchIndexer
.startup(app, new ComponentStartupStatusImpl(this, ss)); .startup(app, new ComponentStartupStatusImpl(this, ss));
searchIndexer.unpause();
ss.info(this, "Setup of search indexer completed."); ss.info(this, "Setup of search indexer completed.");
} catch (RDFServiceException e) { } catch (RDFServiceException e) {