VIVO-869 Don't submit updates while indexer is paused.
When the pause flag is set, the IndexingChangeListener will continue to accumulate changes even if there are gaps in the stream. The listener will not submit a task until the indexer is unpaused.
This commit is contained in:
parent
7d16e10357
commit
d382e0efd6
5 changed files with 46 additions and 11 deletions
|
@ -68,7 +68,7 @@
|
||||||
# developer.searchIndex.showDocuments = false
|
# developer.searchIndex.showDocuments = false
|
||||||
# developer.searchIndex.uriOrNameRestriction = .*
|
# developer.searchIndex.uriOrNameRestriction = .*
|
||||||
# developer.searchIndex.documentRestriction = .*
|
# developer.searchIndex.documentRestriction = .*
|
||||||
# developer.searchIndex.logIndexingBreakdownTimings = .*
|
# developer.searchIndex.logIndexingBreakdownTimings = false
|
||||||
# developer.searchIndex.suppressModelChangeListener = false
|
# developer.searchIndex.suppressModelChangeListener = false
|
||||||
# developer.searchDeletions.enable = false
|
# developer.searchDeletions.enable = false
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ public interface SearchIndexer extends Application.Module {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop processing new tasks. Requests will be queued until a call to
|
* Stop processing new tasks. Requests will be queued until a call to
|
||||||
* unpause().
|
* unpause(). Fires a PAUSED event to listeners.
|
||||||
*
|
*
|
||||||
* The SearchIndexer is paused when created. When fully initialized, it
|
* The SearchIndexer is paused when created. When fully initialized, it
|
||||||
* should be unpaused.
|
* should be unpaused.
|
||||||
|
@ -87,7 +87,8 @@ public interface SearchIndexer extends Application.Module {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resume processing new tasks. Any requests that were received since the
|
* Resume processing new tasks. Any requests that were received since the
|
||||||
* call to pause() will now be scheduled for processing.
|
* call to pause() will now be scheduled for processing. Fires an UNPAUSED
|
||||||
|
* event to listeners.
|
||||||
*
|
*
|
||||||
* The SearchIndexer is paused when created. When fully initialized, it
|
* The SearchIndexer is paused when created. When fully initialized, it
|
||||||
* should be unpaused.
|
* should be unpaused.
|
||||||
|
@ -149,6 +150,8 @@ public interface SearchIndexer extends Application.Module {
|
||||||
public enum Type {
|
public enum Type {
|
||||||
STARTUP, PROGRESS,
|
STARTUP, PROGRESS,
|
||||||
|
|
||||||
|
PAUSE, UNPAUSE,
|
||||||
|
|
||||||
START_PROCESSING_URIS, STOP_PROCESSING_URIS,
|
START_PROCESSING_URIS, STOP_PROCESSING_URIS,
|
||||||
|
|
||||||
START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS,
|
START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS,
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
|
|
||||||
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
||||||
|
|
||||||
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
|
||||||
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -20,6 +23,7 @@ import com.hp.hpl.jena.rdf.model.StmtIterator;
|
||||||
|
|
||||||
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
|
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||||
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
|
||||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
|
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
|
||||||
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
||||||
|
|
||||||
|
@ -27,13 +31,18 @@ import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
||||||
* When a change is heard, wait for an interval to see if more changes come in.
|
* When a change is heard, wait for an interval to see if more changes come in.
|
||||||
* When changes stop coming in for a specified interval, send what has
|
* When changes stop coming in for a specified interval, send what has
|
||||||
* accumulated.
|
* accumulated.
|
||||||
|
*
|
||||||
|
* When the SearchIndexer pauses, stop sending changes until the SearchIndexer
|
||||||
|
* unpauses.
|
||||||
*/
|
*/
|
||||||
public class IndexingChangeListener implements ChangeListener {
|
public class IndexingChangeListener implements ChangeListener,
|
||||||
|
SearchIndexer.Listener {
|
||||||
private static final Log log = LogFactory
|
private static final Log log = LogFactory
|
||||||
.getLog(IndexingChangeListener.class);
|
.getLog(IndexingChangeListener.class);
|
||||||
|
|
||||||
private final SearchIndexer searchIndexer;
|
private final SearchIndexer searchIndexer;
|
||||||
private final Ticker ticker;
|
private final Ticker ticker;
|
||||||
|
private volatile boolean paused = true;
|
||||||
|
|
||||||
/** All access to the list must be synchronized. */
|
/** All access to the list must be synchronized. */
|
||||||
private final List<Statement> changes;
|
private final List<Statement> changes;
|
||||||
|
@ -42,17 +51,33 @@ public class IndexingChangeListener implements ChangeListener {
|
||||||
this.searchIndexer = searchIndexer;
|
this.searchIndexer = searchIndexer;
|
||||||
this.ticker = new Ticker();
|
this.ticker = new Ticker();
|
||||||
this.changes = new ArrayList<>();
|
this.changes = new ArrayList<>();
|
||||||
|
|
||||||
|
searchIndexer.addListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void noteChange(Statement stmt) {
|
private synchronized void noteChange(Statement stmt) {
|
||||||
changes.add(stmt);
|
changes.add(stmt);
|
||||||
|
if (!paused) {
|
||||||
ticker.start();
|
ticker.start();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void receiveSearchIndexerEvent(Event event) {
|
||||||
|
if (event.getType() == PAUSE) {
|
||||||
|
paused = true;
|
||||||
|
} else if (event.getType() == UNPAUSE) {
|
||||||
|
paused = false;
|
||||||
|
respondToTicker();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void respondToTicker() {
|
private synchronized void respondToTicker() {
|
||||||
|
if (!paused && !changes.isEmpty()) {
|
||||||
searchIndexer.scheduleUpdatesForStatements(changes);
|
searchIndexer.scheduleUpdatesForStatements(changes);
|
||||||
changes.clear();
|
changes.clear();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
ticker.shutdown();
|
ticker.shutdown();
|
||||||
|
|
|
@ -3,9 +3,11 @@
|
||||||
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
||||||
|
|
||||||
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
|
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
|
||||||
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE;
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED;
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP;
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP;
|
||||||
|
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.utils.developer.Key.SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS;
|
import static edu.cornell.mannlib.vitro.webapp.utils.developer.Key.SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.IDLE;
|
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.IDLE;
|
||||||
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING;
|
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING;
|
||||||
|
@ -39,6 +41,7 @@ import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.Application;
|
import edu.cornell.mannlib.vitro.webapp.modules.Application;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
|
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||||
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
||||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
|
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
|
||||||
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
|
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
|
||||||
|
@ -235,11 +238,13 @@ public class SearchIndexerImpl implements SearchIndexer {
|
||||||
@Override
|
@Override
|
||||||
public void pause() {
|
public void pause() {
|
||||||
scheduler.pause();
|
scheduler.pause();
|
||||||
|
listeners.fireEvent(new Event(PAUSE, getStatus()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unpause() {
|
public void unpause() {
|
||||||
scheduler.unpause();
|
scheduler.unpause();
|
||||||
|
listeners.fireEvent(new Event(UNPAUSE, getStatus()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -62,8 +62,10 @@ public class SearchIndexerSetup implements ServletContextListener {
|
||||||
|
|
||||||
listener = new IndexingChangeListener(searchIndexer);
|
listener = new IndexingChangeListener(searchIndexer);
|
||||||
|
|
||||||
|
// Wrap it so it can be disabled by a developer flag.
|
||||||
listenerWrapper = new DeveloperDisabledChangeListener(listener,
|
listenerWrapper = new DeveloperDisabledChangeListener(listener,
|
||||||
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER);
|
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER);
|
||||||
|
|
||||||
RDFServiceUtils.getRDFServiceFactory(ctx).registerListener(
|
RDFServiceUtils.getRDFServiceFactory(ctx).registerListener(
|
||||||
listenerWrapper);
|
listenerWrapper);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue