Defer tasks by passing passing in a reference for obtaining the config at runtime, rather than at creation, allowing tasks to be deferred during startup.

This commit is contained in:
Graham Triggs 2015-02-15 15:09:03 +00:00
parent 2bde98e3df
commit 64680d22b0
4 changed files with 551 additions and 443 deletions

View file

@ -269,7 +269,7 @@ public class SearchIndexerImpl implements SearchIndexer {
return; return;
} }
scheduler.scheduleTask(new UpdateStatementsTask.Deferrable(changes)); scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes));
log.debug("Scheduled updates for " + changes.size() + " statements."); log.debug("Scheduled updates for " + changes.size() + " statements.");
} }
@ -287,7 +287,7 @@ public class SearchIndexerImpl implements SearchIndexer {
return; return;
} }
scheduler.scheduleTask(new UpdateUrisTask.Deferrable(uris)); scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris));
log.debug("Scheduled updates for " + uris.size() + " uris."); log.debug("Scheduled updates for " + uris.size() + " uris.");
} }
@ -301,7 +301,7 @@ public class SearchIndexerImpl implements SearchIndexer {
return; return;
} }
scheduler.scheduleTask(new RebuildIndexTask.Deferrable()); scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this)));
log.debug("Scheduled a full rebuild."); log.debug("Scheduled a full rebuild.");
} }
@ -397,7 +397,7 @@ public class SearchIndexerImpl implements SearchIndexer {
*/ */
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<DeferrableTask> deferredQueue; private final List<Task> deferredQueue;
private final SearchIndexerImpl indexer; private final SearchIndexerImpl indexer;
private volatile boolean started; private volatile boolean started;
private volatile boolean paused; private volatile boolean paused;
@ -405,7 +405,7 @@ public class SearchIndexerImpl implements SearchIndexer {
public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) { public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) {
this.indexer = indexer; this.indexer = indexer;
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
this.deferredQueue = new ArrayList<DeferrableTask>(); this.deferredQueue = new ArrayList<Task>();
} }
public boolean isStarted() { public boolean isStarted() {
@ -416,20 +416,13 @@ public class SearchIndexerImpl implements SearchIndexer {
return paused; return paused;
} }
public synchronized void scheduleTask(DeferrableTask task) { public synchronized void scheduleTask(Task task) {
if (paused || !started) { if (paused || !started) {
deferredQueue.add(task); deferredQueue.add(task);
log.debug("added task to deferred queue: " + task); log.debug("added task to deferred queue: " + task);
} else { } else {
taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool));
}
}
public synchronized void scheduleTask(Task task) {
if (started && !paused) {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("added task to task queue: " + task); log.debug("added task to task queue: " + task);
} else {
log.debug("indexer not running, task ignored: " + task);
} }
} }
@ -452,8 +445,8 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
private void processDeferredTasks() { private void processDeferredTasks() {
for (DeferrableTask task : deferredQueue) { for (Task task : deferredQueue) {
taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool)); taskQueue.scheduleTask(task);
log.debug("moved task from deferred queue to task queue: " + task); log.debug("moved task from deferred queue to task queue: " + task);
} }
@ -561,10 +554,34 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
public static interface DeferrableTask { /**
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, * Interface for tasks to access the Indexer config
DocumentModifierList modifiers, IndividualDao indDao, */
ListenerList listeners, WorkerThreadPool pool); public static interface IndexerConfig {
public IndexingUriFinderList uriFinderList();
public SearchIndexExcluderList excluderList();
public DocumentModifierList documentModifierList();
public IndividualDao individualDao();
public ListenerList listenerList();
public WorkerThreadPool workerThreadPool();
}
/**
* Implementation of IndexerConfig
* Defers access to the configuration until the task is running, so a Task
* created and deferred before the indexer is started will not cause a NullPointerException
*/
private static class IndexerConfigImpl implements IndexerConfig {
private final SearchIndexerImpl sii;
public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; }
public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); }
public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); }
public DocumentModifierList documentModifierList() { return sii.createModifiersList(); }
public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); }
public ListenerList listenerList() { return sii.listeners; }
public WorkerThreadPool workerThreadPool() { return sii.pool; }
} }
public static interface Task extends Runnable { public static interface Task extends Runnable {
@ -671,4 +688,28 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
private static class StatementList {
List<Statement> changes;
public StatementList() {
changes = new ArrayList<Statement>();
}
public synchronized void addStatement(Statement stmt) {
changes.add(stmt);
}
public synchronized List<Statement> getStatements() {
try {
return new ArrayList<>(changes);
} finally {
changes.clear();
}
}
public synchronized int size() {
return changes.size();
}
};
} }

View file

@ -10,8 +10,6 @@ import java.text.SimpleDateFormat;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -24,7 +22,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -40,126 +38,150 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud
*/ */
public class RebuildIndexTask implements Task { public class RebuildIndexTask implements Task {
private static final Log log = LogFactory.getLog(RebuildIndexTask.class); private static final Log log = LogFactory.getLog(RebuildIndexTask.class);
private final IndividualDao indDao;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final SearchEngine searchEngine;
private final Date requestedAt; private final Date requestedAt;
private final int documentsBefore;
private volatile SearchIndexerStatus status; private final IndexerConfig config;
private RebuildIndexTaskImpl impl;
public static class Deferrable implements DeferrableTask { public RebuildIndexTask(IndexerConfig config) {
public Deferrable() {} this.config = config;
this.requestedAt = new Date();
@Override
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
return new RebuildIndexTask(excluders, modifiers, indDao, listeners, pool);
}
} }
public RebuildIndexTask(SearchIndexExcluderList excluders,
DocumentModifierList modifiers, IndividualDao indDao,
ListenerList listeners, WorkerThreadPool pool) {
this.excluders = excluders;
this.modifiers = modifiers;
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
this.requestedAt = new Date();
this.documentsBefore = getDocumentCount();
this.status = buildStatus(REBUILDING, 0);
}
@Override @Override
public void run() { public void run() {
listeners.fireEvent(new Event(START_REBUILD, status)); impl = new RebuildIndexTaskImpl(config, requestedAt);
impl.run();
Collection<String> uris = getAllUrisInTheModel();
if (!isInterrupted()) {
updateTheUris(uris);
if (!isInterrupted()) {
deleteOutdatedDocuments();
}
}
status = buildStatus(REBUILDING, getDocumentCount());
listeners.fireEvent(new Event(STOP_REBUILD, status));
}
private boolean isInterrupted() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private Collection<String> getAllUrisInTheModel() {
return indDao.getAllIndividualUris();
}
private void updateTheUris(Collection<String> uris) {
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool)
.run();
}
private void deleteOutdatedDocuments() {
String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]";
try {
searchEngine.deleteByQuery(query);
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete outdated documents from the search index: "
+ "the search engine is not responding.");
} catch (SearchEngineException e) {
log.warn("Failed to delete outdated documents "
+ "from the search index", e);
}
}
private int getDocumentCount() {
try {
return searchEngine.documentCount();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to get document count from the search index: "
+ "the search engine is not responding.");
return 0;
} catch (SearchEngineException e) {
log.warn("Failed to get document count from the search index.", e);
return 0;
}
}
private SearchIndexerStatus buildStatus(State state, int documentsAfter) {
return new SearchIndexerStatus(state, new Date(), new RebuildCounts(
documentsBefore, documentsAfter));
} }
@Override @Override
public SearchIndexerStatus getStatus() { public SearchIndexerStatus getStatus() {
return status; return impl == null ? null : impl.getStatus();
} }
@Override @Override
public void notifyWorkUnitCompletion(Runnable workUnit) { public void notifyWorkUnitCompletion(Runnable workUnit) {
// We don't submit any work units, so we won't see any calls to this. if (impl != null) {
log.error("Why was this called?"); impl.notifyWorkUnitCompletion(workUnit);
}
} }
@Override @Override
public String toString() { public String toString() {
return "RebuildIndexTask[requestedAt=" return "RebuildIndexTask[requestedAt=" + new SimpleDateFormat().format(requestedAt) + "]";
+ new SimpleDateFormat().format(requestedAt) + "]";
} }
private static class RebuildIndexTaskImpl implements Task {
private final IndexerConfig config;
private final IndividualDao indDao;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final SearchEngine searchEngine;
private final Date requestedAt;
private final int documentsBefore;
private volatile SearchIndexerStatus status;
public RebuildIndexTaskImpl(IndexerConfig config, Date requestedAt) {
this.config = config;
this.excluders = config.excluderList();
this.modifiers = config.documentModifierList();
this.indDao = config.individualDao();
this.listeners = config.listenerList();
this.pool = config.workerThreadPool();
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
this.requestedAt = requestedAt;
this.documentsBefore = getDocumentCount();
this.status = buildStatus(REBUILDING, 0);
}
@Override
public void run() {
listeners.fireEvent(new Event(START_REBUILD, status));
Collection<String> uris = getAllUrisInTheModel();
if (!isInterrupted()) {
updateTheUris(uris);
if (!isInterrupted()) {
deleteOutdatedDocuments();
}
}
status = buildStatus(REBUILDING, getDocumentCount());
listeners.fireEvent(new Event(STOP_REBUILD, status));
}
private boolean isInterrupted() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private Collection<String> getAllUrisInTheModel() {
return indDao.getAllIndividualUris();
}
private void updateTheUris(Collection<String> uris) {
UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool);
}
private void deleteOutdatedDocuments() {
String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]";
try {
searchEngine.deleteByQuery(query);
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete outdated documents from the search index: "
+ "the search engine is not responding.");
} catch (SearchEngineException e) {
log.warn("Failed to delete outdated documents "
+ "from the search index", e);
}
}
private int getDocumentCount() {
try {
return searchEngine.documentCount();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to get document count from the search index: "
+ "the search engine is not responding.");
return 0;
} catch (SearchEngineException e) {
log.warn("Failed to get document count from the search index.", e);
return 0;
}
}
private SearchIndexerStatus buildStatus(State state, int documentsAfter) {
return new SearchIndexerStatus(state, new Date(), new RebuildCounts(
documentsBefore, documentsAfter));
}
@Override
public SearchIndexerStatus getStatus() {
return status;
}
@Override
public void notifyWorkUnitCompletion(Runnable workUnit) {
// We don't submit any work units, so we won't see any calls to this.
log.error("Why was this called?");
}
@Override
public String toString() {
return "RebuildIndexTask[requestedAt="
+ new SimpleDateFormat().format(requestedAt) + "]";
}
}
} }

View file

@ -24,7 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -51,157 +51,169 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderLi
* Set to remove duplicates, and then process the URIs in the set. * Set to remove duplicates, and then process the URIs in the set.
*/ */
public class UpdateStatementsTask implements Task { public class UpdateStatementsTask implements Task {
private static final Log log = LogFactory private static final Log log = LogFactory.getLog(UpdateStatementsTask.class);
.getLog(UpdateStatementsTask.class);
private final List<Statement> changes; private final IndexerConfig config;
private final IndexingUriFinderList uriFinders; private UpdateStatementsTaskImpl impl;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final IndividualDao indDao;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final Set<String> uris; private List<Statement> changes;
private final Status status;
public static class Deferrable implements DeferrableTask { public UpdateStatementsTask(IndexerConfig config, List<Statement> changes) {
List<Statement> changes; this.config = config;
this.changes = new ArrayList<>(changes);
}
public Deferrable(List<Statement> changes) { this.changes = changes; } @Override
public void run() {
impl = new UpdateStatementsTaskImpl(config, changes);
impl.run();
}
@Override
public SearchIndexerStatus getStatus() {
return impl == null ? null : impl.getStatus();
}
@Override @Override
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { public void notifyWorkUnitCompletion(Runnable workUnit) {
return new UpdateStatementsTask(changes, uriFinders, excluders, modifiers, indDao, listeners, pool); if (impl != null) {
impl.notifyWorkUnitCompletion(workUnit);
} }
} }
public UpdateStatementsTask(List<Statement> changes, private static class UpdateStatementsTaskImpl implements Task {
IndexingUriFinderList uriFinders, private final List<Statement> changes;
SearchIndexExcluderList excluders, DocumentModifierList modifiers, private final IndexingUriFinderList uriFinders;
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { private final SearchIndexExcluderList excluders;
this.changes = new ArrayList<>(changes); private final DocumentModifierList modifiers;
this.uriFinders = uriFinders; private final IndividualDao indDao;
this.excluders = excluders; private final ListenerList listeners;
this.modifiers = modifiers; private final WorkerThreadPool pool;
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.uris = Collections.synchronizedSet(new HashSet<String>()); private final Set<String> uris;
private final Status status;
this.status = new Status(changes.size(), 500, listeners); public UpdateStatementsTaskImpl(IndexerConfig config, List<Statement> changes) {
} this.changes = changes;
this.uriFinders = config.uriFinderList();
this.excluders = config.excluderList();
this.modifiers = config.documentModifierList();
this.indDao = config.individualDao();
this.listeners = config.listenerList();
this.pool = config.workerThreadPool();
@Override this.uris = Collections.synchronizedSet(new HashSet<String>());
public void run() {
listeners.fireEvent(new Event(START_STATEMENTS, getStatus()));
findAffectedUris(); this.status = new Status(changes.size(), 500, listeners);
}
updateTheUris(); @Override
listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); public void run() {
} listeners.fireEvent(new Event(START_STATEMENTS, getStatus()));
private void findAffectedUris() { findAffectedUris();
log.debug("Tell finders we are starting.");
uriFinders.startIndexing();
for (Statement stmt : changes) { updateTheUris();
if (isInterrupted()) { listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus()));
log.info("Interrupted: " + status.getSearchIndexerStatus()); }
return;
} else {
findUrisForStatement(stmt);
}
}
waitForWorkUnitsToComplete();
log.debug("Tell finders we are stopping."); private void findAffectedUris() {
uriFinders.stopIndexing(); log.debug("Tell finders we are starting.");
} uriFinders.startIndexing();
private boolean isInterrupted() { for (Statement stmt : changes) {
if (Thread.interrupted()) { if (isInterrupted()) {
Thread.currentThread().interrupt(); log.info("Interrupted: " + status.getSearchIndexerStatus());
return true; return;
} else { } else {
return false; findUrisForStatement(stmt);
} }
} }
waitForWorkUnitsToComplete();
private void findUrisForStatement(Statement stmt) { log.debug("Tell finders we are stopping.");
Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); uriFinders.stopIndexing();
pool.submit(workUnit, this); }
log.debug("scheduled uri finders for " + stmt);
}
private void waitForWorkUnitsToComplete() { private boolean isInterrupted() {
pool.waitUntilIdle(); if (Thread.interrupted()) {
} Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private void updateTheUris() { private void findUrisForStatement(Statement stmt) {
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders);
.run(); pool.submit(workUnit, this);
} log.debug("scheduled uri finders for " + stmt);
}
@Override private void waitForWorkUnitsToComplete() {
public SearchIndexerStatus getStatus() { pool.waitUntilIdle();
return status.getSearchIndexerStatus(); }
}
@Override private void updateTheUris() {
public void notifyWorkUnitCompletion(Runnable workUnit) { UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool);
FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; }
Set<String> foundUris = worker.getUris(); @Override
Statement stmt = worker.getStatement(); public SearchIndexerStatus getStatus() {
log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); return status.getSearchIndexerStatus();
}
uris.addAll(foundUris); @Override
status.incrementProcessed(); public void notifyWorkUnitCompletion(Runnable workUnit) {
} FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit;
// ---------------------------------------------------------------------- Set<String> foundUris = worker.getUris();
// Helper classes Statement stmt = worker.getStatement();
// ---------------------------------------------------------------------- log.debug("Found " + foundUris.size() + " uris for statement: " + stmt);
/** uris.addAll(foundUris);
* A thread-safe collection of status information. All methods are status.incrementProcessed();
* synchronized. }
*/
private static class Status {
private final int total;
private final int progressInterval;
private final ListenerList listeners;
private int processed = 0;
private Date since = new Date();
public Status(int total, int progressInterval, ListenerList listeners) { // ----------------------------------------------------------------------
this.total = total; // Helper classes
this.progressInterval = progressInterval; // ----------------------------------------------------------------------
this.listeners = listeners;
}
public synchronized void incrementProcessed() { /**
processed++; * A thread-safe collection of status information. All methods are
since = new Date(); * synchronized.
maybeFireProgressEvent(); */
} private static class Status {
private final int total;
private final int progressInterval;
private final ListenerList listeners;
private int processed = 0;
private Date since = new Date();
private void maybeFireProgressEvent() { public Status(int total, int progressInterval, ListenerList listeners) {
if (processed > 0 && processed % progressInterval == 0) { this.total = total;
listeners.fireEvent(new Event(PROGRESS, this.progressInterval = progressInterval;
getSearchIndexerStatus())); this.listeners = listeners;
} }
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() { public synchronized void incrementProcessed() {
int remaining = total - processed; processed++;
return new SearchIndexerStatus(PROCESSING_STMTS, since, since = new Date();
new StatementCounts(processed, remaining, total)); maybeFireProgressEvent();
} }
} private void maybeFireProgressEvent() {
if (processed > 0 && processed % progressInterval == 0) {
listeners.fireEvent(new Event(PROGRESS,
getSearchIndexerStatus()));
}
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
int remaining = total - processed;
return new SearchIndexerStatus(PROCESSING_STMTS, since,
new StatementCounts(processed, remaining, total));
}
}
}
} }

View file

@ -7,14 +7,8 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS;
import java.util.Collection; import java.util.*;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,6 +23,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -51,230 +46,268 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud
* again at the end of the task. * again at the end of the task.
*/ */
public class UpdateUrisTask implements Task { public class UpdateUrisTask implements Task {
private static final Log log = LogFactory.getLog(UpdateUrisTask.class); private static final Log log = LogFactory.getLog(UpdateUrisTask.class);
private final Set<String> uris; private final IndexerConfig config;
private final IndividualDao indDao; private UpdateUrisTaskImpl impl;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final Status status; private final Collection<String> uris;
private final SearchEngine searchEngine; private Date since = new Date();
public static class Deferrable implements SearchIndexerImpl.DeferrableTask { public UpdateUrisTask(IndexerConfig config, Collection<String> uris) {
Collection<String> uris; this.config = config;
public Deferrable(Collection<String> uris) { this.uris = uris; } this.uris = new HashSet<>(uris);
}
@Override static void runNow(Collection<String> uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { UpdateUrisTaskImpl impl = new UpdateUrisTaskImpl(uris, excluders, modifiers, indDao, listeners, pool);
return new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool); impl.run();
}
@Override
public void run() {
impl = new UpdateUrisTaskImpl(config, uris);
impl.run();
}
@Override
public SearchIndexerStatus getStatus() {
if (impl != null) {
return impl.getStatus();
}
return new SearchIndexerStatus(PROCESSING_URIS, since, new UriCounts(0, 0, 0, uris.size(), uris.size()));
}
@Override
public void notifyWorkUnitCompletion(Runnable workUnit) {
if (impl != null) {
impl.notifyWorkUnitCompletion(workUnit);
} }
} }
public UpdateUrisTask(Collection<String> uris, private static class UpdateUrisTaskImpl implements Task {
SearchIndexExcluderList excluders, DocumentModifierList modifiers, private final Collection<String> uris;
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { private final IndividualDao indDao;
this.uris = new HashSet<>(uris); private final SearchIndexExcluderList excluders;
this.excluders = excluders; private final DocumentModifierList modifiers;
this.modifiers = modifiers; private final ListenerList listeners;
this.indDao = indDao; private final WorkerThreadPool pool;
this.listeners = listeners;
this.pool = pool;
this.status = new Status(this, uris.size(), 500); private final Status status;
private final SearchEngine searchEngine;
this.searchEngine = ApplicationUtils.instance().getSearchEngine(); public UpdateUrisTaskImpl(IndexerConfig config, Collection<String> uris) {
this.excluders = config.excluderList();
this.modifiers = config.documentModifierList();
this.indDao = config.individualDao();
this.listeners = config.listenerList();
this.pool = config.workerThreadPool();
} this.uris = uris;
this.status = new Status(this, uris.size(), 500);
@Override this.searchEngine = ApplicationUtils.instance().getSearchEngine();
public void run() { }
listeners.fireEvent(new Event(START_URIS, status
.getSearchIndexerStatus()));
excluders.startIndexing();
modifiers.startIndexing();
for (String uri : uris) { public UpdateUrisTaskImpl(Collection<String> uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
if (isInterrupted()) { this.uris = uris;
log.info("Interrupted: " + status.getSearchIndexerStatus()); this.excluders = excluders;
break; this.modifiers = modifiers;
} else if (uri == null) { this.indDao = indDao;
// Nothing to do this.listeners = listeners;
} else { this.pool = pool;
Individual ind = getIndividual(uri); this.status = new Status(this, uris.size(), 500);
if (ind == null) {
deleteDocument(uri);
} else if (isExcluded(ind)) {
excludeDocument(uri);
} else {
updateDocument(ind);
}
}
}
pool.waitUntilIdle();
commitChanges(); this.searchEngine = ApplicationUtils.instance().getSearchEngine();
}
excluders.stopIndexing(); @Override
modifiers.stopIndexing(); public void run() {
listeners.fireEvent(new Event(STOP_URIS, status listeners.fireEvent(new Event(START_URIS, status.getSearchIndexerStatus()));
.getSearchIndexerStatus())); excluders.startIndexing();
} modifiers.startIndexing();
private boolean isInterrupted() { for (String uri : uris) {
if (Thread.interrupted()) { if (isInterrupted()) {
Thread.currentThread().interrupt(); log.info("Interrupted: " + status.getSearchIndexerStatus());
return true; break;
} else { } else if (uri == null) {
return false; // Nothing to do
} } else {
} Individual ind = getIndividual(uri);
if (ind == null) {
deleteDocument(uri);
} else if (isExcluded(ind)) {
excludeDocument(uri);
} else {
updateDocument(ind);
}
}
}
pool.waitUntilIdle();
private Individual getIndividual(String uri) { commitChanges();
Individual ind = indDao.getIndividualByURI(uri);
if (ind == null) {
log.debug("Found no individual for '" + uri + "'");
}
return ind;
}
private boolean isExcluded(Individual ind) { excluders.stopIndexing();
return excluders.isExcluded(ind); modifiers.stopIndexing();
} listeners.fireEvent(new Event(STOP_URIS, status.getSearchIndexerStatus()));
}
/** A delete is fast enough to be done synchronously. */ private boolean isInterrupted() {
private void deleteDocument(String uri) { if (Thread.interrupted()) {
try { Thread.currentThread().interrupt();
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); return true;
status.incrementDeletes(); } else {
log.debug("deleted '" + uri + "' from search index."); return false;
} catch (SearchEngineNotRespondingException e) { }
log.warn("Failed to delete '" + uri + "' from search index: " }
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
/** An exclusion is just a delete for different reasons. */ private Individual getIndividual(String uri) {
private void excludeDocument(String uri) { Individual ind = indDao.getIndividualByURI(uri);
try { if (ind == null) {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); log.debug("Found no individual for '" + uri + "'");
status.incrementExclusions(); }
log.debug("excluded '" + uri + "' from search index."); return ind;
} catch (SearchEngineNotRespondingException e) { }
log.warn("Failed to exclude '" + uri + "' from search index: "
+ "the search engine is not responding.", e);
} catch (Exception e) {
log.warn("Failed to exclude '" + uri + "' from search index", e);
}
}
private void updateDocument(Individual ind) { private boolean isExcluded(Individual ind) {
Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); return excluders.isExcluded(ind);
pool.submit(workUnit, this); }
log.debug("scheduled update to " + ind);
}
private void fireEvent(Event event) { /**
listeners.fireEvent(event); * A delete is fast enough to be done synchronously.
if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { */
commitChanges(); private void deleteDocument(String uri) {
} try {
} searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementDeletes();
log.debug("deleted '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete '" + uri + "' from search index: "
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
private void commitChanges() { /**
try { * An exclusion is just a delete for different reasons.
searchEngine.commit(); */
} catch (SearchEngineException e) { private void excludeDocument(String uri) {
log.warn("Failed to commit changes.", e); try {
} searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
} status.incrementExclusions();
log.debug("excluded '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to exclude '" + uri + "' from search index: "
+ "the search engine is not responding.", e);
} catch (Exception e) {
log.warn("Failed to exclude '" + uri + "' from search index", e);
}
}
@Override private void updateDocument(Individual ind) {
public void notifyWorkUnitCompletion(Runnable workUnit) { Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers);
log.debug("completed update to " pool.submit(workUnit, this);
+ ((UpdateDocumentWorkUnit) workUnit).getInd()); log.debug("scheduled update to " + ind);
status.incrementUpdates(); }
}
@Override private void fireEvent(Event event) {
public SearchIndexerStatus getStatus() { listeners.fireEvent(event);
return status.getSearchIndexerStatus(); if (event.getType() == PROGRESS || event.getType() == STOP_URIS) {
} commitChanges();
}
}
// ---------------------------------------------------------------------- private void commitChanges() {
// helper classes try {
// ---------------------------------------------------------------------- searchEngine.commit();
} catch (SearchEngineException e) {
log.warn("Failed to commit changes.", e);
}
}
/** @Override
* A thread-safe collection of status information. All methods are public void notifyWorkUnitCompletion(Runnable workUnit) {
* synchronized. log.debug("completed update to "
*/ + ((UpdateDocumentWorkUnit) workUnit).getInd());
private static class Status { status.incrementUpdates();
private final UpdateUrisTask parent; }
private final int total;
private final int progressInterval;
private int updated = 0;
private int deleted = 0;
private int excluded = 0;
private Date since = new Date();
public Status(UpdateUrisTask parent, int total, int progressInterval) { @Override
this.parent = parent; public SearchIndexerStatus getStatus() {
this.total = total; return status.getSearchIndexerStatus();
this.progressInterval = progressInterval; }
}
public synchronized void incrementUpdates() { /**
updated++; * A thread-safe collection of status information. All methods are
since = new Date(); * synchronized.
maybeFireProgressEvent(); */
} private static class Status {
private final UpdateUrisTaskImpl parent;
private final int total;
private final int progressInterval;
private int updated = 0;
private int deleted = 0;
private int excluded = 0;
private Date since = new Date();
public synchronized void incrementDeletes() { public Status(UpdateUrisTaskImpl parent, int total, int progressInterval) {
deleted++; this.parent = parent;
since = new Date(); this.total = total;
} this.progressInterval = progressInterval;
}
public synchronized void incrementExclusions() { public synchronized void incrementUpdates() {
excluded++; updated++;
since = new Date(); since = new Date();
} maybeFireProgressEvent();
}
private void maybeFireProgressEvent() { public synchronized void incrementDeletes() {
if (updated > 0 && updated % progressInterval == 0) { deleted++;
parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); since = new Date();
} }
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() { public synchronized void incrementExclusions() {
int remaining = total - updated - deleted - excluded; excluded++;
return new SearchIndexerStatus(PROCESSING_URIS, since, since = new Date();
new UriCounts(excluded, deleted, updated, remaining, total)); }
}
} private void maybeFireProgressEvent() {
if (updated > 0 && updated % progressInterval == 0) {
parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus()));
}
}
/** public synchronized SearchIndexerStatus getSearchIndexerStatus() {
* This will be first in the list of SearchIndexExcluders. int remaining = total - updated - deleted - excluded;
*/ return new SearchIndexerStatus(PROCESSING_URIS, since,
public static class ExcludeIfNoVClasses implements SearchIndexExcluder { new UriCounts(excluded, deleted, updated, remaining, total));
@Override }
public String checkForExclusion(Individual ind) { }
List<VClass> vclasses = ind.getVClasses(false); }
if (vclasses == null || vclasses.isEmpty()) {
return "Individual " + ind + " has no classes.";
}
return null;
}
@Override // ----------------------------------------------------------------------
public String toString() { // helper classes
return "Internal: ExcludeIfNoVClasses"; // ----------------------------------------------------------------------
} /**
* This will be first in the list of SearchIndexExcluders.
*/
public static class ExcludeIfNoVClasses implements SearchIndexExcluder {
@Override
public String checkForExclusion(Individual ind) {
List<VClass> vclasses = ind.getVClasses(false);
if (vclasses == null || vclasses.isEmpty()) {
return "Individual " + ind + " has no classes.";
}
return null;
}
} @Override
} public String toString() {
return "Internal: ExcludeIfNoVClasses";
}
}
}