diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index a1ffb6330..21753ada5 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -8,17 +8,62 @@ import java.util.List; import com.hp.hpl.jena.rdf.model.Statement; import edu.cornell.mannlib.vitro.webapp.modules.Application; +import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus; /** * Interface for the code that controls the contents of the search index. * - * If calls are made to schedule tasks prior to startup(), they will be queued, - * since the indexer is created in paused mode. + * The search indexer is started rather late in the startup sequence, since it + * requires the search engine, the triple stores and the filters. + * + * If calls are made to schedule tasks prior to startup(), they will be queued. + * Calls to pause or unpause set the state as expected, but tasks will not be + * run until the indexer is both started and unpaused. * * The only calls that are valid after shutdown are shutdown(), getStatus() and - * removeListener(). + * removeListener(). Calls to other methods produce a warning, but have no other + * effect. */ public interface SearchIndexer extends Application.Module { + + /** + * Start processing. If unpaused, schedule any queued tasks. + * + * @throws IllegalStateException + * if called after shutdown, or if called more than once. + */ + @Override + void startup(Application app, ComponentStartupStatus ss); + + /** + * Stop processing new tasks. Requests will be queued until a call to + * unpause(). Fires a PAUSED event to listeners. + * + * This call has no effect if already paused, or if called after shutdown. + */ + void pause(); + + /** + * Resume processing new tasks. Any requests that were received since the + * call to pause() will now be scheduled for processing. Fires an UNPAUSED + * event to listeners. + * + * If startup has not been called, the unpaused state will be recorded, but + * tasks will still await the call to startup. + * + * This call has no effect if not paused, or if called after shutdown. + */ + void unpause(); + + /** + * Stop processing and release resources. This call should block until the + * dependent threads are stopped. + * + * Repeated calls have no effect. + */ + @Override + void shutdown(Application app); + /** * Update any search documents that are affected by these statements. * @@ -29,12 +74,11 @@ public interface SearchIndexer extends Application.Module { * We accumulate a batch of affected URIs, removing duplicates if they * occur, and then submit them for updates. * - * If called before startup or while paused, this task will be queued. + * If called before startup or while paused, the task will be queued. If + * called after shutdown, this has no effect. * * @param urls * if null or empty, this call has no effect. - * @throws IllegalStateException - * if called after shutdown() */ void scheduleUpdatesForStatements(List changes); @@ -48,12 +92,11 @@ public interface SearchIndexer extends Application.Module { * A URI belongs in the index if it refers to an existing individual in the * model, and is not excluded. * - * If called before startup or while paused, this task will be queued. + * If called before startup or while paused, the task will be queued. If + * called after shutdown, this has no effect. * * @param uris * if null or empty, this call has no effect. - * @throws IllegalStateException - * if called after shutdown() */ void scheduleUpdatesForUris(Collection uris); @@ -64,39 +107,11 @@ public interface SearchIndexer extends Application.Module { * If a rebuild is already pending or in progress, this method has no * effect. * - * If called before startup or while paused, this task will be queued. - * - * @throws IllegalStateException - * if called after shutdown() + * If called before startup or while paused, the task will be queued. If + * called after shutdown, this has no effect. */ void rebuildIndex(); - /** - * Stop processing new tasks. Requests will be queued until a call to - * unpause(). Fires a PAUSED event to listeners. - * - * The SearchIndexer is paused when created. When fully initialized, it - * should be unpaused. - * - * If already paused, this call has no effect. - * - * @throws IllegalStateException - * if called after shutdown() - */ - void pause(); - - /** - * Resume processing new tasks. Any requests that were received since the - * call to pause() will now be scheduled for processing. Fires an UNPAUSED - * event to listeners. - * - * The SearchIndexer is paused when created. When fully initialized, it - * should be unpaused. - * - * Has no effect if called after shutdown() or if not paused. - */ - void unpause(); - /** * What is the current status of the indexer? * @@ -105,36 +120,32 @@ public interface SearchIndexer extends Application.Module { SearchIndexerStatus getStatus(); /** - * Add this listener, allowing it to receive events from the indexer. If - * this listener has already been added, this method has no effect. + * Add this listener, allowing it to receive events from the indexer. + * + * The listener can safely assume that the SearchIndexer is not paused. If + * the SearchIndexer is indeed paused when the listener is added, then a + * PAUSE event will immediately be passed to the listener. + * + * If this listener has already been added, or if called after shutdown, + * this method has no effect. * * @param listener * if null, this method has no effect. - * @throws IllegalStateException - * if called after shutdown() */ void addListener(Listener listener); /** * Remove this listener, meaning that it will no longer receive events from - * the indexer. If this listener is not active, this method has no effect. + * the indexer. * - * Has no effect if called after shutdown(). + * If this listener is not active, or if called after shutdown, this method + * has no effect. * * @param listener * if null, this method has no effect. */ void removeListener(Listener listener); - /** - * Stop processing and release resources. This call should block until the - * dependent threads are stopped. - * - * Repeated calls have no effect. - */ - @Override - void shutdown(Application app); - /** * A listener that will be notified of events from the SearchIndexer. */ diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index 645115a56..7231f5909 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -25,7 +25,6 @@ import com.hp.hpl.jena.rdf.model.StmtIterator; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; -import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index 3062ca6f5..bc22d9d2c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -43,7 +43,6 @@ import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; -import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierList; import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifierListBasic; @@ -98,6 +97,10 @@ public class SearchIndexerImpl implements SearchIndexer { private Set uriFinders; private WebappDaoFactory wadf; + // ---------------------------------------------------------------------- + // ConfigurationBeanLoader methods. + // ---------------------------------------------------------------------- + @Property(uri = "http://vitro.mannlib.cornell.edu/ns/vitro/ApplicationSetup#threadPoolSize") public void setThreadPoolSize(String size) { if (threadPoolSize == null) { @@ -119,16 +122,27 @@ public class SearchIndexerImpl implements SearchIndexer { } } + // ---------------------------------------------------------------------- + // State management. + // ---------------------------------------------------------------------- + @Override public void startup(Application application, ComponentStartupStatus ss) { + if (isStarted()) { + throw new IllegalStateException("startup() called more than once."); + } + if (isShutdown()) { + throw new IllegalStateException( + "startup() called after shutdown()."); + } try { this.ctx = application.getServletContext(); - + this.wadf = getFilteredWebappDaoFactory(); loadConfiguration(); - this.wadf = getFilteredWebappDaoFactory(); + fireEvent(STARTUP); + scheduler.start(); - listeners.fireEvent(new Event(STARTUP, getStatus())); ss.info("Configured SearchIndexer: excluders=" + excluders + ", modifiers=" + modifiers + ", uriFinders=" + uriFinders); } catch (Exception e) { @@ -136,6 +150,13 @@ public class SearchIndexerImpl implements SearchIndexer { } } + /** With a filtered factory, only public data goes into the search index. */ + private WebappDaoFactory getFilteredWebappDaoFactory() { + WebappDaoFactory rawWadf = ModelAccess.on(ctx).getWebappDaoFactory(); + VitroFilters vf = VitroFilterUtils.getPublicFilter(ctx); + return new WebappDaoFactoryFiltering(rawWadf, vf); + } + private void loadConfiguration() throws ConfigurationBeanLoaderException { ConfigurationBeanLoader beanLoader = new ConfigurationBeanLoader( ModelAccess.on(ctx).getOntModel(DISPLAY), ctx); @@ -151,24 +172,77 @@ public class SearchIndexerImpl implements SearchIndexer { modifiers.addAll(beanLoader.loadAll(DocumentModifier.class)); } - /** - * Use a filtered DAO factory, so only public data goes into the search - * index. - */ - private WebappDaoFactory getFilteredWebappDaoFactory() { - WebappDaoFactory rawWadf = ModelAccess.on(ctx).getWebappDaoFactory(); - VitroFilters vf = VitroFilterUtils.getPublicFilter(ctx); - return new WebappDaoFactoryFiltering(rawWadf, vf); - } - @Override - public void scheduleUpdatesForStatements(List changes) { - if (changes == null || changes.isEmpty()) { + public synchronized void shutdown(Application application) { + if (isShutdown()) { return; } - if (taskQueue.isShutdown()) { - throw new IllegalStateException("SearchIndexer is shut down."); + fireEvent(SHUTDOWN_REQUESTED); + + taskQueue.shutdown(); + pool.shutdown(); + + for (DocumentModifier dm : modifiers) { + try { + dm.shutdown(); + } catch (Exception e) { + log.warn("Failed to shut down document modifier " + dm, e); + } + } + + fireEvent(SHUTDOWN_COMPLETE); + } + + @Override + public void pause() { + if (!isPaused() && !isShutdown()) { + scheduler.pause(); + fireEvent(PAUSE); + } + } + + @Override + public void unpause() { + if (isPaused() && !isShutdown()) { + scheduler.unpause(); + fireEvent(UNPAUSE); + } + } + + private boolean isStarted() { + return scheduler.isStarted(); + } + + private boolean isPaused() { + return scheduler.isPaused(); + } + + private boolean isShutdown() { + return taskQueue.isShutdown(); + } + + @Override + public SearchIndexerStatus getStatus() { + return taskQueue.getStatus(); + } + + private void fireEvent(Type type) { + listeners.fireEvent(new Event(type, getStatus())); + } + + // ---------------------------------------------------------------------- + // Tasks + // ---------------------------------------------------------------------- + + @Override + public void scheduleUpdatesForStatements(List changes) { + if (isShutdown()) { + log.warn("Call to scheduleUpdatesForStatements after shutdown."); + return; + } + if (changes == null || changes.isEmpty()) { + return; } Task task = new UpdateStatementsTask(changes, createFindersList(), @@ -180,12 +254,12 @@ public class SearchIndexerImpl implements SearchIndexer { @Override public void scheduleUpdatesForUris(Collection uris) { - if (uris == null || uris.isEmpty()) { + if (isShutdown()) { + log.warn("Call to scheduleUpdatesForUris after shutdown."); return; } - - if (taskQueue.isShutdown()) { - throw new IllegalStateException("SearchIndexer is shut down."); + if (uris == null || uris.isEmpty()) { + return; } Task task = new UpdateUrisTask(uris, createExcludersList(), @@ -196,8 +270,8 @@ public class SearchIndexerImpl implements SearchIndexer { @Override public void rebuildIndex() { - if (taskQueue.isShutdown()) { - throw new IllegalStateException("SearchIndexer is shut down."); + if (isShutdown()) { + log.warn("Call to rebuildIndex after shutdown."); } Task task = new RebuildIndexTask(createExcludersList(), @@ -235,21 +309,21 @@ public class SearchIndexerImpl implements SearchIndexer { SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS); } - @Override - public void pause() { - scheduler.pause(); - listeners.fireEvent(new Event(PAUSE, getStatus())); - } - - @Override - public void unpause() { - scheduler.unpause(); - listeners.fireEvent(new Event(UNPAUSE, getStatus())); - } + // ---------------------------------------------------------------------- + // Listeners + // ---------------------------------------------------------------------- @Override public void addListener(Listener listener) { - listeners.add(listener); + if (isShutdown()) { + return; + } + synchronized (listeners) { + listeners.add(listener); + if (isPaused()) { + listener.receiveSearchIndexerEvent(new Event(PAUSE, getStatus())); + } + } } @Override @@ -257,33 +331,6 @@ public class SearchIndexerImpl implements SearchIndexer { listeners.remove(listener); } - @Override - public SearchIndexerStatus getStatus() { - return taskQueue.getStatus(); - } - - @Override - public synchronized void shutdown(Application application) { - SearchIndexerStatus status = taskQueue.getStatus(); - if (status.getState() != State.SHUTDOWN) { - listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status)); - - taskQueue.shutdown(); - pool.shutdown(); - - for (DocumentModifier dm : modifiers) { - try { - dm.shutdown(); - } catch (Exception e) { - log.warn("Failed to shut down document modifier " + dm, e); - } - } - - listeners.fireEvent(new Event(SHUTDOWN_COMPLETE, taskQueue - .getStatus())); - } - } - // ---------------------------------------------------------------------- // Helper classes // ---------------------------------------------------------------------- @@ -326,6 +373,7 @@ public class SearchIndexerImpl implements SearchIndexer { private static class Scheduler { private final TaskQueue taskQueue; private final List deferredQueue; + private volatile boolean started; private volatile boolean paused = true; public Scheduler(TaskQueue taskQueue) { @@ -333,8 +381,16 @@ public class SearchIndexerImpl implements SearchIndexer { this.deferredQueue = new ArrayList(); } + public boolean isStarted() { + return started; + } + + public boolean isPaused() { + return paused; + } + public synchronized void scheduleTask(Task task) { - if (paused) { + if (paused || !started) { deferredQueue.add(task); log.debug("added task to deferred queue: " + task); } else { @@ -343,18 +399,32 @@ public class SearchIndexerImpl implements SearchIndexer { } } + public synchronized void start() { + started = true; + if (!paused) { + processDeferredTasks(); + } + } + public synchronized void pause() { paused = true; } public synchronized void unpause() { paused = false; + if (started) { + processDeferredTasks(); + } + } + + private void processDeferredTasks() { for (Task task : deferredQueue) { taskQueue.scheduleTask(task); log.debug("moved task from deferred queue to task queue: " + task); } } + } /** diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java index 01d407b7f..028b1d026 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java @@ -51,12 +51,10 @@ public class SearchIndexerSetup implements ServletContextListener { try { searchIndexer = app.getSearchIndexer(); + // A change listener, wrapped so it can respond to a developer flag. listener = new IndexingChangeListener(searchIndexer); - - // Wrap it so it can be disabled by a developer flag. listenerWrapper = new DeveloperDisabledChangeListener(listener, Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER); - RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( listenerWrapper); @@ -66,7 +64,6 @@ public class SearchIndexerSetup implements ServletContextListener { searchIndexer .startup(app, new ComponentStartupStatusImpl(this, ss)); - searchIndexer.unpause(); ss.info(this, "Setup of search indexer completed."); } catch (RDFServiceException e) {