diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index 4e6c5db56..5f7657379 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -43,15 +43,6 @@ public interface SearchIndexer extends Application.Module { */ void pause(); - /** - * Stop processing new tasks. If any request is received while the indexer - * is paused, the request will be ignored, but the index will be rebuilt - * when unpaused. Fires a PAUSED event to listeners. - * - * This call has no effect if already paused, or if called after shutdown. - */ - void pauseInAnticipationOfRebuild(); - /** * Resume processing new tasks. Any requests that were received since the * call to pause() will now be scheduled for processing. Fires an UNPAUSED @@ -176,7 +167,7 @@ public interface SearchIndexer extends Application.Module { START_STATEMENTS, STOP_STATEMENTS, - START_REBUILD, STOP_REBUILD, + START_REBUILD, STOP_REBUILD, REBUILD_REQUESTED, SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index bf11a95cd..595516859 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -104,12 +104,14 @@ public class ABoxRecomputer { } try { if (searchIndexer != null) { - searchIndexer.pauseInAnticipationOfRebuild(); + searchIndexer.pause(); + // Register now that we want to rebuild the index when we unpause + // This allows the indexer to optimize behaviour whilst paused + searchIndexer.rebuildIndex(); } recomputeABox(); } finally { if (searchIndexer != null) { - searchIndexer.rebuildIndex(); searchIndexer.unpause(); } synchronized (lock1) { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index 9c5996c43..42899c419 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -2,10 +2,6 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; -import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE; -import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_REBUILD; -import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -30,6 +26,8 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.*; + /** * When a change is heard, wait for an interval to see if more changes come in. * When changes stop coming in for a specified interval, send what has @@ -61,7 +59,7 @@ public class IndexingChangeListener implements ChangeListener, private final SearchIndexer searchIndexer; private final Ticker ticker; - private volatile boolean paused; + private volatile boolean rebuildScheduled; private final Model defaultModel; /** All access to the list must be synchronized. */ @@ -78,25 +76,21 @@ public class IndexingChangeListener implements ChangeListener, private synchronized void noteChange(Statement stmt) { changes.add(stmt); - if (!paused) { - ticker.start(); - } + ticker.start(); } @Override public void receiveSearchIndexerEvent(Event event) { - if (event.getType() == PAUSE) { - paused = true; - } else if (event.getType() == UNPAUSE) { - paused = false; - ticker.start(); + if (event.getType() == REBUILD_REQUESTED) { + rebuildScheduled = true; } else if (event.getType() == START_REBUILD) { + rebuildScheduled = false; discardChanges(); } } private synchronized void respondToTicker() { - if (!paused && !changes.isEmpty()) { + if (!changes.isEmpty()) { searchIndexer.scheduleUpdatesForStatements(changes); changes.clear(); } @@ -112,12 +106,16 @@ public class IndexingChangeListener implements ChangeListener, @Override public void addedStatement(String serializedTriple, String graphURI) { - noteChange(parseTriple(serializedTriple)); + if (!rebuildScheduled) { + noteChange(parseTriple(serializedTriple)); + } } @Override public void removedStatement(String serializedTriple, String graphURI) { - noteChange(parseTriple(serializedTriple)); + if (!rebuildScheduled) { + noteChange(parseTriple(serializedTriple)); + } } /** @@ -133,7 +131,7 @@ public class IndexingChangeListener implements ChangeListener, } } else { log.debug("ignoring event " + event.getClass().getName() + " " - + event); + + event); } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index b01595606..e9a6a9a81 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -4,6 +4,7 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.REBUILD_REQUESTED; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP; @@ -98,9 +99,13 @@ public class SearchIndexerImpl implements SearchIndexer { private Set uriFinders; private WebappDaoFactory wadf; - private boolean ignoreTasksWhilePaused = false; private boolean rebuildOnUnpause = false; + private volatile int paused = 0; + + private List pendingStatements = new ArrayList(); + private Collection pendingUris = new ArrayList(); + // ---------------------------------------------------------------------- // ConfigurationBeanLoader methods. // ---------------------------------------------------------------------- @@ -199,34 +204,47 @@ public class SearchIndexerImpl implements SearchIndexer { } @Override - public void pause() { - if (!isPaused() && !isShutdown()) { - ignoreTasksWhilePaused = false; - rebuildOnUnpause = false; - scheduler.pause(); - fireEvent(PAUSE); + public synchronized void pause() { + if (!isShutdown()) { + paused++; + if (paused == 1) { + fireEvent(PAUSE); + } } } - @Override - public void pauseInAnticipationOfRebuild() { - if (!isPaused() && !isShutdown()) { - ignoreTasksWhilePaused = true; - rebuildOnUnpause = false; - scheduler.pause(); - fireEvent(PAUSE); - } - } - @Override - public void unpause() { - if (isPaused() && !isShutdown()) { - scheduler.unpause(); - fireEvent(UNPAUSE); - if (rebuildOnUnpause) { - rebuildOnUnpause = false; - rebuildIndex(); - } + public synchronized void unpause() { + if (paused > 0 && !isShutdown()) { + paused--; + + // Only process if we transition to unpaused state + if (paused == 0) { + fireEvent(UNPAUSE); + if (rebuildOnUnpause) { + rebuildOnUnpause = false; + pendingStatements.clear(); + pendingUris.clear(); + rebuildIndex(); + } else { + schedulePendingStatements(); + schedulePendingUris(); + } + } + } + } + + private synchronized void schedulePendingStatements() { + if (paused == 0 && pendingStatements.size() > 0) { + scheduleUpdatesForStatements(pendingStatements); + pendingStatements = new ArrayList<>(); + } + } + + private synchronized void schedulePendingUris() { + if (paused == 0 && pendingUris.size() > 0) { + scheduleUpdatesForUris(pendingUris); + pendingUris = new ArrayList<>(); } } @@ -234,10 +252,6 @@ public class SearchIndexerImpl implements SearchIndexer { return scheduler.isStarted(); } - private boolean isPaused() { - return scheduler.isPaused(); - } - private boolean isShutdown() { return taskQueue.isShutdown(); } @@ -264,15 +278,25 @@ public class SearchIndexerImpl implements SearchIndexer { if (changes == null || changes.isEmpty()) { return; } - if (ignoreTasksWhilePaused && isPaused()) { - rebuildOnUnpause = true; - return; + if (paused > 0) { + if (addToPendingStatements(changes)) { + return; + } } scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes)); log.debug("Scheduled updates for " + changes.size() + " statements."); } + private synchronized boolean addToPendingStatements(List changes) { + if (paused > 0) { + pendingStatements.addAll(changes); + return true; + } + + return false; + } + @Override public void scheduleUpdatesForUris(Collection uris) { if (isShutdown()) { @@ -282,25 +306,38 @@ public class SearchIndexerImpl implements SearchIndexer { if (uris == null || uris.isEmpty()) { return; } - if (ignoreTasksWhilePaused && isPaused()) { - rebuildOnUnpause = true; - return; + if (paused > 0) { + if (pendingUris.addAll(uris)) { + return; + } } scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris)); log.debug("Scheduled updates for " + uris.size() + " uris."); } + private synchronized boolean addToPendingUris(Collection uris) { + if (paused > 0) { + pendingUris.addAll(uris); + return true; + } + + return false; + } + @Override public void rebuildIndex() { if (isShutdown()) { log.warn("Call to rebuildIndex after shutdown."); + return; } - if (ignoreTasksWhilePaused && isPaused()) { + fireEvent(REBUILD_REQUESTED); + if (paused > 0) { + // Make sure that we are rebuilding when we unpause + // and don't bother noting any other changes until unpaused rebuildOnUnpause = true; return; } - scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this))); log.debug("Scheduled a full rebuild."); } @@ -345,7 +382,7 @@ public class SearchIndexerImpl implements SearchIndexer { } synchronized (listeners) { listeners.add(listener); - if (isPaused()) { + if (paused > 0) { listener.receiveSearchIndexerEvent(new Event(PAUSE, getStatus())); } } @@ -399,7 +436,6 @@ public class SearchIndexerImpl implements SearchIndexer { private final TaskQueue taskQueue; private final List deferredQueue; private volatile boolean started; - private volatile boolean paused; public Scheduler(TaskQueue taskQueue) { this.taskQueue = taskQueue; @@ -410,12 +446,8 @@ public class SearchIndexerImpl implements SearchIndexer { return started; } - public boolean isPaused() { - return paused; - } - public synchronized void scheduleTask(Task task) { - if (paused || !started) { + if (!started) { deferredQueue.add(task); log.debug("added task to deferred queue: " + task); } else { @@ -426,20 +458,7 @@ public class SearchIndexerImpl implements SearchIndexer { public synchronized void start() { started = true; - if (!paused) { - processDeferredTasks(); - } - } - - public synchronized void pause() { - paused = true; - } - - public synchronized void unpause() { - paused = false; - if (started) { - processDeferredTasks(); - } + processDeferredTasks(); } private void processDeferredTasks() { @@ -711,6 +730,5 @@ public class SearchIndexerImpl implements SearchIndexer { } } - } } diff --git a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java index fac91559a..a138f8438 100644 --- a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java +++ b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java @@ -31,11 +31,6 @@ public class SearchIndexerStub implements SearchIndexer { paused = true; } - @Override - public void pauseInAnticipationOfRebuild() { - paused = true; - } - @Override public void unpause() { paused = false;