Pause counting to avoid problems of overlapping pause / unpauses, add event to notify of the intention to rebuild the index, so that overhead of tracking unnecessary indexing can be reduced

This commit is contained in:
grahamtriggs 2015-10-03 08:44:26 +01:00
parent 615a531de4
commit 7fbaf1cadc
5 changed files with 97 additions and 93 deletions

View file

@ -43,15 +43,6 @@ public interface SearchIndexer extends Application.Module {
*/ */
void pause(); void pause();
/**
* Stop processing new tasks. If any request is received while the indexer
* is paused, the request will be ignored, but the index will be rebuilt
* when unpaused. Fires a PAUSED event to listeners.
*
* This call has no effect if already paused, or if called after shutdown.
*/
void pauseInAnticipationOfRebuild();
/** /**
* Resume processing new tasks. Any requests that were received since the * Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing. Fires an UNPAUSED * call to pause() will now be scheduled for processing. Fires an UNPAUSED
@ -176,7 +167,7 @@ public interface SearchIndexer extends Application.Module {
START_STATEMENTS, STOP_STATEMENTS, START_STATEMENTS, STOP_STATEMENTS,
START_REBUILD, STOP_REBUILD, START_REBUILD, STOP_REBUILD, REBUILD_REQUESTED,
SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE
} }

View file

@ -104,12 +104,14 @@ public class ABoxRecomputer {
} }
try { try {
if (searchIndexer != null) { if (searchIndexer != null) {
searchIndexer.pauseInAnticipationOfRebuild(); searchIndexer.pause();
// Register now that we want to rebuild the index when we unpause
// This allows the indexer to optimize behaviour whilst paused
searchIndexer.rebuildIndex();
} }
recomputeABox(); recomputeABox();
} finally { } finally {
if (searchIndexer != null) { if (searchIndexer != null) {
searchIndexer.rebuildIndex();
searchIndexer.unpause(); searchIndexer.unpause();
} }
synchronized (lock1) { synchronized (lock1) {

View file

@ -2,10 +2,6 @@
package edu.cornell.mannlib.vitro.webapp.searchindex; package edu.cornell.mannlib.vitro.webapp.searchindex;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_REBUILD;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -30,6 +26,8 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.*;
/** /**
* When a change is heard, wait for an interval to see if more changes come in. * When a change is heard, wait for an interval to see if more changes come in.
* When changes stop coming in for a specified interval, send what has * When changes stop coming in for a specified interval, send what has
@ -61,7 +59,7 @@ public class IndexingChangeListener implements ChangeListener,
private final SearchIndexer searchIndexer; private final SearchIndexer searchIndexer;
private final Ticker ticker; private final Ticker ticker;
private volatile boolean paused; private volatile boolean rebuildScheduled;
private final Model defaultModel; private final Model defaultModel;
/** All access to the list must be synchronized. */ /** All access to the list must be synchronized. */
@ -78,25 +76,21 @@ public class IndexingChangeListener implements ChangeListener,
private synchronized void noteChange(Statement stmt) { private synchronized void noteChange(Statement stmt) {
changes.add(stmt); changes.add(stmt);
if (!paused) {
ticker.start(); ticker.start();
} }
}
@Override @Override
public void receiveSearchIndexerEvent(Event event) { public void receiveSearchIndexerEvent(Event event) {
if (event.getType() == PAUSE) { if (event.getType() == REBUILD_REQUESTED) {
paused = true; rebuildScheduled = true;
} else if (event.getType() == UNPAUSE) {
paused = false;
ticker.start();
} else if (event.getType() == START_REBUILD) { } else if (event.getType() == START_REBUILD) {
rebuildScheduled = false;
discardChanges(); discardChanges();
} }
} }
private synchronized void respondToTicker() { private synchronized void respondToTicker() {
if (!paused && !changes.isEmpty()) { if (!changes.isEmpty()) {
searchIndexer.scheduleUpdatesForStatements(changes); searchIndexer.scheduleUpdatesForStatements(changes);
changes.clear(); changes.clear();
} }
@ -112,13 +106,17 @@ public class IndexingChangeListener implements ChangeListener,
@Override @Override
public void addedStatement(String serializedTriple, String graphURI) { public void addedStatement(String serializedTriple, String graphURI) {
if (!rebuildScheduled) {
noteChange(parseTriple(serializedTriple)); noteChange(parseTriple(serializedTriple));
} }
}
@Override @Override
public void removedStatement(String serializedTriple, String graphURI) { public void removedStatement(String serializedTriple, String graphURI) {
if (!rebuildScheduled) {
noteChange(parseTriple(serializedTriple)); noteChange(parseTriple(serializedTriple));
} }
}
/** /**
* We only care about events that signal the end of an edit operation. * We only care about events that signal the end of an edit operation.

View file

@ -4,6 +4,7 @@ package edu.cornell.mannlib.vitro.webapp.searchindex;
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY; import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.REBUILD_REQUESTED;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP;
@ -98,9 +99,13 @@ public class SearchIndexerImpl implements SearchIndexer {
private Set<IndexingUriFinder> uriFinders; private Set<IndexingUriFinder> uriFinders;
private WebappDaoFactory wadf; private WebappDaoFactory wadf;
private boolean ignoreTasksWhilePaused = false;
private boolean rebuildOnUnpause = false; private boolean rebuildOnUnpause = false;
private volatile int paused = 0;
private List<Statement> pendingStatements = new ArrayList<Statement>();
private Collection<String> pendingUris = new ArrayList<String>();
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// ConfigurationBeanLoader methods. // ConfigurationBeanLoader methods.
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@ -199,45 +204,54 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
@Override @Override
public void pause() { public synchronized void pause() {
if (!isPaused() && !isShutdown()) { if (!isShutdown()) {
ignoreTasksWhilePaused = false; paused++;
rebuildOnUnpause = false; if (paused == 1) {
scheduler.pause();
fireEvent(PAUSE); fireEvent(PAUSE);
} }
} }
}
@Override @Override
public void pauseInAnticipationOfRebuild() { public synchronized void unpause() {
if (!isPaused() && !isShutdown()) { if (paused > 0 && !isShutdown()) {
ignoreTasksWhilePaused = true; paused--;
rebuildOnUnpause = false;
scheduler.pause();
fireEvent(PAUSE);
}
}
@Override // Only process if we transition to unpaused state
public void unpause() { if (paused == 0) {
if (isPaused() && !isShutdown()) {
scheduler.unpause();
fireEvent(UNPAUSE); fireEvent(UNPAUSE);
if (rebuildOnUnpause) { if (rebuildOnUnpause) {
rebuildOnUnpause = false; rebuildOnUnpause = false;
pendingStatements.clear();
pendingUris.clear();
rebuildIndex(); rebuildIndex();
} else {
schedulePendingStatements();
schedulePendingUris();
} }
} }
} }
}
private synchronized void schedulePendingStatements() {
if (paused == 0 && pendingStatements.size() > 0) {
scheduleUpdatesForStatements(pendingStatements);
pendingStatements = new ArrayList<>();
}
}
private synchronized void schedulePendingUris() {
if (paused == 0 && pendingUris.size() > 0) {
scheduleUpdatesForUris(pendingUris);
pendingUris = new ArrayList<>();
}
}
private boolean isStarted() { private boolean isStarted() {
return scheduler.isStarted(); return scheduler.isStarted();
} }
private boolean isPaused() {
return scheduler.isPaused();
}
private boolean isShutdown() { private boolean isShutdown() {
return taskQueue.isShutdown(); return taskQueue.isShutdown();
} }
@ -264,15 +278,25 @@ public class SearchIndexerImpl implements SearchIndexer {
if (changes == null || changes.isEmpty()) { if (changes == null || changes.isEmpty()) {
return; return;
} }
if (ignoreTasksWhilePaused && isPaused()) { if (paused > 0) {
rebuildOnUnpause = true; if (addToPendingStatements(changes)) {
return; return;
} }
}
scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes)); scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes));
log.debug("Scheduled updates for " + changes.size() + " statements."); log.debug("Scheduled updates for " + changes.size() + " statements.");
} }
private synchronized boolean addToPendingStatements(List<Statement> changes) {
if (paused > 0) {
pendingStatements.addAll(changes);
return true;
}
return false;
}
@Override @Override
public void scheduleUpdatesForUris(Collection<String> uris) { public void scheduleUpdatesForUris(Collection<String> uris) {
if (isShutdown()) { if (isShutdown()) {
@ -282,25 +306,38 @@ public class SearchIndexerImpl implements SearchIndexer {
if (uris == null || uris.isEmpty()) { if (uris == null || uris.isEmpty()) {
return; return;
} }
if (ignoreTasksWhilePaused && isPaused()) { if (paused > 0) {
rebuildOnUnpause = true; if (pendingUris.addAll(uris)) {
return; return;
} }
}
scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris)); scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris));
log.debug("Scheduled updates for " + uris.size() + " uris."); log.debug("Scheduled updates for " + uris.size() + " uris.");
} }
private synchronized boolean addToPendingUris(Collection<String> uris) {
if (paused > 0) {
pendingUris.addAll(uris);
return true;
}
return false;
}
@Override @Override
public void rebuildIndex() { public void rebuildIndex() {
if (isShutdown()) { if (isShutdown()) {
log.warn("Call to rebuildIndex after shutdown."); log.warn("Call to rebuildIndex after shutdown.");
return;
} }
if (ignoreTasksWhilePaused && isPaused()) { fireEvent(REBUILD_REQUESTED);
if (paused > 0) {
// Make sure that we are rebuilding when we unpause
// and don't bother noting any other changes until unpaused
rebuildOnUnpause = true; rebuildOnUnpause = true;
return; return;
} }
scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this))); scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this)));
log.debug("Scheduled a full rebuild."); log.debug("Scheduled a full rebuild.");
} }
@ -345,7 +382,7 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
synchronized (listeners) { synchronized (listeners) {
listeners.add(listener); listeners.add(listener);
if (isPaused()) { if (paused > 0) {
listener.receiveSearchIndexerEvent(new Event(PAUSE, getStatus())); listener.receiveSearchIndexerEvent(new Event(PAUSE, getStatus()));
} }
} }
@ -399,7 +436,6 @@ public class SearchIndexerImpl implements SearchIndexer {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<Task> deferredQueue;
private volatile boolean started; private volatile boolean started;
private volatile boolean paused;
public Scheduler(TaskQueue taskQueue) { public Scheduler(TaskQueue taskQueue) {
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
@ -410,12 +446,8 @@ public class SearchIndexerImpl implements SearchIndexer {
return started; return started;
} }
public boolean isPaused() {
return paused;
}
public synchronized void scheduleTask(Task task) { public synchronized void scheduleTask(Task task) {
if (paused || !started) { if (!started) {
deferredQueue.add(task); deferredQueue.add(task);
log.debug("added task to deferred queue: " + task); log.debug("added task to deferred queue: " + task);
} else { } else {
@ -426,21 +458,8 @@ public class SearchIndexerImpl implements SearchIndexer {
public synchronized void start() { public synchronized void start() {
started = true; started = true;
if (!paused) {
processDeferredTasks(); processDeferredTasks();
} }
}
public synchronized void pause() {
paused = true;
}
public synchronized void unpause() {
paused = false;
if (started) {
processDeferredTasks();
}
}
private void processDeferredTasks() { private void processDeferredTasks() {
for (Task task : deferredQueue) { for (Task task : deferredQueue) {
@ -711,6 +730,5 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
} }
} }

View file

@ -31,11 +31,6 @@ public class SearchIndexerStub implements SearchIndexer {
paused = true; paused = true;
} }
@Override
public void pauseInAnticipationOfRebuild() {
paused = true;
}
@Override @Override
public void unpause() { public void unpause() {
paused = false; paused = false;