From 64680d22b00a2b3835d87e6adb6effffdf477ce9 Mon Sep 17 00:00:00 2001 From: Graham Triggs Date: Sun, 15 Feb 2015 15:09:03 +0000 Subject: [PATCH] Defer tasks by passing passing in a reference for obtaining the config at runtime, rather than at creation, allowing tasks to be deferred during startup. --- .../webapp/searchindex/SearchIndexerImpl.java | 79 +++- .../searchindex/tasks/RebuildIndexTask.java | 230 +++++----- .../tasks/UpdateStatementsTask.java | 256 ++++++----- .../searchindex/tasks/UpdateUrisTask.java | 429 ++++++++++-------- 4 files changed, 551 insertions(+), 443 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index b41a43054..c9a94329c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -269,7 +269,7 @@ public class SearchIndexerImpl implements SearchIndexer { return; } - scheduler.scheduleTask(new UpdateStatementsTask.Deferrable(changes)); + scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes)); log.debug("Scheduled updates for " + changes.size() + " statements."); } @@ -287,7 +287,7 @@ public class SearchIndexerImpl implements SearchIndexer { return; } - scheduler.scheduleTask(new UpdateUrisTask.Deferrable(uris)); + scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris)); log.debug("Scheduled updates for " + uris.size() + " uris."); } @@ -301,7 +301,7 @@ public class SearchIndexerImpl implements SearchIndexer { return; } - scheduler.scheduleTask(new RebuildIndexTask.Deferrable()); + scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this))); log.debug("Scheduled a full rebuild."); } @@ -397,7 +397,7 @@ public class SearchIndexerImpl implements SearchIndexer { */ private static class Scheduler { private final TaskQueue taskQueue; - private final List deferredQueue; + private final List deferredQueue; private final SearchIndexerImpl indexer; private volatile boolean started; private volatile boolean paused; @@ -405,7 +405,7 @@ public class SearchIndexerImpl implements SearchIndexer { public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) { this.indexer = indexer; this.taskQueue = taskQueue; - this.deferredQueue = new ArrayList(); + this.deferredQueue = new ArrayList(); } public boolean isStarted() { @@ -416,20 +416,13 @@ public class SearchIndexerImpl implements SearchIndexer { return paused; } - public synchronized void scheduleTask(DeferrableTask task) { + public synchronized void scheduleTask(Task task) { if (paused || !started) { deferredQueue.add(task); log.debug("added task to deferred queue: " + task); } else { - taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool)); - } - } - public synchronized void scheduleTask(Task task) { - if (started && !paused) { taskQueue.scheduleTask(task); log.debug("added task to task queue: " + task); - } else { - log.debug("indexer not running, task ignored: " + task); } } @@ -452,8 +445,8 @@ public class SearchIndexerImpl implements SearchIndexer { } private void processDeferredTasks() { - for (DeferrableTask task : deferredQueue) { - taskQueue.scheduleTask(task.makeRunnable(indexer.createFindersList(), indexer.createExcludersList(), indexer.createModifiersList(), indexer.wadf.getIndividualDao(), indexer.listeners, indexer.pool)); + for (Task task : deferredQueue) { + taskQueue.scheduleTask(task); log.debug("moved task from deferred queue to task queue: " + task); } @@ -561,10 +554,34 @@ public class SearchIndexerImpl implements SearchIndexer { } } - public static interface DeferrableTask { - public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, - DocumentModifierList modifiers, IndividualDao indDao, - ListenerList listeners, WorkerThreadPool pool); + /** + * Interface for tasks to access the Indexer config + */ + public static interface IndexerConfig { + public IndexingUriFinderList uriFinderList(); + public SearchIndexExcluderList excluderList(); + public DocumentModifierList documentModifierList(); + public IndividualDao individualDao(); + public ListenerList listenerList(); + public WorkerThreadPool workerThreadPool(); + } + + /** + * Implementation of IndexerConfig + * Defers access to the configuration until the task is running, so a Task + * created and deferred before the indexer is started will not cause a NullPointerException + */ + private static class IndexerConfigImpl implements IndexerConfig { + private final SearchIndexerImpl sii; + + public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; } + + public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); } + public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); } + public DocumentModifierList documentModifierList() { return sii.createModifiersList(); } + public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); } + public ListenerList listenerList() { return sii.listeners; } + public WorkerThreadPool workerThreadPool() { return sii.pool; } } public static interface Task extends Runnable { @@ -671,4 +688,28 @@ public class SearchIndexerImpl implements SearchIndexer { } } + + private static class StatementList { + List changes; + + public StatementList() { + changes = new ArrayList(); + } + + public synchronized void addStatement(Statement stmt) { + changes.add(stmt); + } + + public synchronized List getStatements() { + try { + return new ArrayList<>(changes); + } finally { + changes.clear(); + } + } + + public synchronized int size() { + return changes.size(); + } + }; } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java index 9cc89d6ed..77af681cb 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java @@ -10,8 +10,6 @@ import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; -import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl; -import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,7 +22,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; -import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -40,126 +38,150 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud */ public class RebuildIndexTask implements Task { private static final Log log = LogFactory.getLog(RebuildIndexTask.class); - - private final IndividualDao indDao; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final ListenerList listeners; - private final WorkerThreadPool pool; - private final SearchEngine searchEngine; - private final Date requestedAt; - private final int documentsBefore; - private volatile SearchIndexerStatus status; + private final IndexerConfig config; + private RebuildIndexTaskImpl impl; - public static class Deferrable implements DeferrableTask { - public Deferrable() {} - - @Override - public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - return new RebuildIndexTask(excluders, modifiers, indDao, listeners, pool); - } + public RebuildIndexTask(IndexerConfig config) { + this.config = config; + this.requestedAt = new Date(); } - public RebuildIndexTask(SearchIndexExcluderList excluders, - DocumentModifierList modifiers, IndividualDao indDao, - ListenerList listeners, WorkerThreadPool pool) { - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; - - this.searchEngine = ApplicationUtils.instance().getSearchEngine(); - - this.requestedAt = new Date(); - this.documentsBefore = getDocumentCount(); - this.status = buildStatus(REBUILDING, 0); - } - @Override public void run() { - listeners.fireEvent(new Event(START_REBUILD, status)); - - Collection uris = getAllUrisInTheModel(); - - if (!isInterrupted()) { - updateTheUris(uris); - if (!isInterrupted()) { - deleteOutdatedDocuments(); - } - } - - status = buildStatus(REBUILDING, getDocumentCount()); - listeners.fireEvent(new Event(STOP_REBUILD, status)); - } - - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } - - private Collection getAllUrisInTheModel() { - return indDao.getAllIndividualUris(); - } - - private void updateTheUris(Collection uris) { - new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) - .run(); - } - - private void deleteOutdatedDocuments() { - String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]"; - try { - searchEngine.deleteByQuery(query); - searchEngine.commit(); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to delete outdated documents from the search index: " - + "the search engine is not responding."); - } catch (SearchEngineException e) { - log.warn("Failed to delete outdated documents " - + "from the search index", e); - } - } - - private int getDocumentCount() { - try { - return searchEngine.documentCount(); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to get document count from the search index: " - + "the search engine is not responding."); - return 0; - } catch (SearchEngineException e) { - log.warn("Failed to get document count from the search index.", e); - return 0; - } - } - - private SearchIndexerStatus buildStatus(State state, int documentsAfter) { - return new SearchIndexerStatus(state, new Date(), new RebuildCounts( - documentsBefore, documentsAfter)); + impl = new RebuildIndexTaskImpl(config, requestedAt); + impl.run(); } @Override public SearchIndexerStatus getStatus() { - return status; + return impl == null ? null : impl.getStatus(); } @Override public void notifyWorkUnitCompletion(Runnable workUnit) { - // We don't submit any work units, so we won't see any calls to this. - log.error("Why was this called?"); + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); + } } @Override public String toString() { - return "RebuildIndexTask[requestedAt=" - + new SimpleDateFormat().format(requestedAt) + "]"; + return "RebuildIndexTask[requestedAt=" + new SimpleDateFormat().format(requestedAt) + "]"; } + private static class RebuildIndexTaskImpl implements Task { + private final IndexerConfig config; + + private final IndividualDao indDao; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final ListenerList listeners; + private final WorkerThreadPool pool; + private final SearchEngine searchEngine; + + private final Date requestedAt; + private final int documentsBefore; + + private volatile SearchIndexerStatus status; + + public RebuildIndexTaskImpl(IndexerConfig config, Date requestedAt) { + this.config = config; + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); + + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + + this.requestedAt = requestedAt; + this.documentsBefore = getDocumentCount(); + this.status = buildStatus(REBUILDING, 0); + } + + @Override + public void run() { + listeners.fireEvent(new Event(START_REBUILD, status)); + + Collection uris = getAllUrisInTheModel(); + + if (!isInterrupted()) { + updateTheUris(uris); + if (!isInterrupted()) { + deleteOutdatedDocuments(); + } + } + + status = buildStatus(REBUILDING, getDocumentCount()); + listeners.fireEvent(new Event(STOP_REBUILD, status)); + } + + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } + + private Collection getAllUrisInTheModel() { + return indDao.getAllIndividualUris(); + } + + private void updateTheUris(Collection uris) { + UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool); + } + + private void deleteOutdatedDocuments() { + String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]"; + try { + searchEngine.deleteByQuery(query); + searchEngine.commit(); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to delete outdated documents from the search index: " + + "the search engine is not responding."); + } catch (SearchEngineException e) { + log.warn("Failed to delete outdated documents " + + "from the search index", e); + } + } + + private int getDocumentCount() { + try { + return searchEngine.documentCount(); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to get document count from the search index: " + + "the search engine is not responding."); + return 0; + } catch (SearchEngineException e) { + log.warn("Failed to get document count from the search index.", e); + return 0; + } + } + + private SearchIndexerStatus buildStatus(State state, int documentsAfter) { + return new SearchIndexerStatus(state, new Date(), new RebuildCounts( + documentsBefore, documentsAfter)); + } + + @Override + public SearchIndexerStatus getStatus() { + return status; + } + + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + // We don't submit any work units, so we won't see any calls to this. + log.error("Why was this called?"); + } + + @Override + public String toString() { + return "RebuildIndexTask[requestedAt=" + + new SimpleDateFormat().format(requestedAt) + "]"; + } + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java index 020fda35a..9a12a04bc 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java @@ -24,7 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; -import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.DeferrableTask; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -51,157 +51,169 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderLi * Set to remove duplicates, and then process the URIs in the set. */ public class UpdateStatementsTask implements Task { - private static final Log log = LogFactory - .getLog(UpdateStatementsTask.class); + private static final Log log = LogFactory.getLog(UpdateStatementsTask.class); - private final List changes; - private final IndexingUriFinderList uriFinders; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final IndividualDao indDao; - private final ListenerList listeners; - private final WorkerThreadPool pool; + private final IndexerConfig config; + private UpdateStatementsTaskImpl impl; - private final Set uris; - private final Status status; + private List changes; - public static class Deferrable implements DeferrableTask { - List changes; + public UpdateStatementsTask(IndexerConfig config, List changes) { + this.config = config; + this.changes = new ArrayList<>(changes); + } - public Deferrable(List changes) { this.changes = changes; } + @Override + public void run() { + impl = new UpdateStatementsTaskImpl(config, changes); + impl.run(); + } + @Override + public SearchIndexerStatus getStatus() { + return impl == null ? null : impl.getStatus(); + } - @Override - public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - return new UpdateStatementsTask(changes, uriFinders, excluders, modifiers, indDao, listeners, pool); + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); } } - public UpdateStatementsTask(List changes, - IndexingUriFinderList uriFinders, - SearchIndexExcluderList excluders, DocumentModifierList modifiers, - IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - this.changes = new ArrayList<>(changes); - this.uriFinders = uriFinders; - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; + private static class UpdateStatementsTaskImpl implements Task { + private final List changes; + private final IndexingUriFinderList uriFinders; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final IndividualDao indDao; + private final ListenerList listeners; + private final WorkerThreadPool pool; - this.uris = Collections.synchronizedSet(new HashSet()); + private final Set uris; + private final Status status; - this.status = new Status(changes.size(), 500, listeners); - } + public UpdateStatementsTaskImpl(IndexerConfig config, List changes) { + this.changes = changes; + this.uriFinders = config.uriFinderList(); + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); - @Override - public void run() { - listeners.fireEvent(new Event(START_STATEMENTS, getStatus())); + this.uris = Collections.synchronizedSet(new HashSet()); - findAffectedUris(); + this.status = new Status(changes.size(), 500, listeners); + } - updateTheUris(); - listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); - } + @Override + public void run() { + listeners.fireEvent(new Event(START_STATEMENTS, getStatus())); - private void findAffectedUris() { - log.debug("Tell finders we are starting."); - uriFinders.startIndexing(); + findAffectedUris(); - for (Statement stmt : changes) { - if (isInterrupted()) { - log.info("Interrupted: " + status.getSearchIndexerStatus()); - return; - } else { - findUrisForStatement(stmt); - } - } - waitForWorkUnitsToComplete(); + updateTheUris(); + listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); + } - log.debug("Tell finders we are stopping."); - uriFinders.stopIndexing(); - } + private void findAffectedUris() { + log.debug("Tell finders we are starting."); + uriFinders.startIndexing(); - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } + for (Statement stmt : changes) { + if (isInterrupted()) { + log.info("Interrupted: " + status.getSearchIndexerStatus()); + return; + } else { + findUrisForStatement(stmt); + } + } + waitForWorkUnitsToComplete(); - private void findUrisForStatement(Statement stmt) { - Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); - pool.submit(workUnit, this); - log.debug("scheduled uri finders for " + stmt); - } + log.debug("Tell finders we are stopping."); + uriFinders.stopIndexing(); + } - private void waitForWorkUnitsToComplete() { - pool.waitUntilIdle(); - } + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } - private void updateTheUris() { - new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) - .run(); - } + private void findUrisForStatement(Statement stmt) { + Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); + pool.submit(workUnit, this); + log.debug("scheduled uri finders for " + stmt); + } - @Override - public SearchIndexerStatus getStatus() { - return status.getSearchIndexerStatus(); - } + private void waitForWorkUnitsToComplete() { + pool.waitUntilIdle(); + } - @Override - public void notifyWorkUnitCompletion(Runnable workUnit) { - FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; + private void updateTheUris() { + UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool); + } - Set foundUris = worker.getUris(); - Statement stmt = worker.getStatement(); - log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); + @Override + public SearchIndexerStatus getStatus() { + return status.getSearchIndexerStatus(); + } - uris.addAll(foundUris); - status.incrementProcessed(); - } + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; - // ---------------------------------------------------------------------- - // Helper classes - // ---------------------------------------------------------------------- + Set foundUris = worker.getUris(); + Statement stmt = worker.getStatement(); + log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); - /** - * A thread-safe collection of status information. All methods are - * synchronized. - */ - private static class Status { - private final int total; - private final int progressInterval; - private final ListenerList listeners; - private int processed = 0; - private Date since = new Date(); + uris.addAll(foundUris); + status.incrementProcessed(); + } - public Status(int total, int progressInterval, ListenerList listeners) { - this.total = total; - this.progressInterval = progressInterval; - this.listeners = listeners; - } + // ---------------------------------------------------------------------- + // Helper classes + // ---------------------------------------------------------------------- - public synchronized void incrementProcessed() { - processed++; - since = new Date(); - maybeFireProgressEvent(); - } + /** + * A thread-safe collection of status information. All methods are + * synchronized. + */ + private static class Status { + private final int total; + private final int progressInterval; + private final ListenerList listeners; + private int processed = 0; + private Date since = new Date(); - private void maybeFireProgressEvent() { - if (processed > 0 && processed % progressInterval == 0) { - listeners.fireEvent(new Event(PROGRESS, - getSearchIndexerStatus())); - } - } + public Status(int total, int progressInterval, ListenerList listeners) { + this.total = total; + this.progressInterval = progressInterval; + this.listeners = listeners; + } - public synchronized SearchIndexerStatus getSearchIndexerStatus() { - int remaining = total - processed; - return new SearchIndexerStatus(PROCESSING_STMTS, since, - new StatementCounts(processed, remaining, total)); - } + public synchronized void incrementProcessed() { + processed++; + since = new Date(); + maybeFireProgressEvent(); + } - } + private void maybeFireProgressEvent() { + if (processed > 0 && processed % progressInterval == 0) { + listeners.fireEvent(new Event(PROGRESS, + getSearchIndexerStatus())); + } + } + public synchronized SearchIndexerStatus getSearchIndexerStatus() { + int remaining = total - processed; + return new SearchIndexerStatus(PROCESSING_STMTS, since, + new StatementCounts(processed, remaining, total)); + } + + } + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java index 0e8aba65c..2bfc7db38 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java @@ -7,14 +7,8 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; -import java.util.Collection; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; -import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl; -import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +23,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -51,230 +46,268 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud * again at the end of the task. */ public class UpdateUrisTask implements Task { - private static final Log log = LogFactory.getLog(UpdateUrisTask.class); + private static final Log log = LogFactory.getLog(UpdateUrisTask.class); - private final Set uris; - private final IndividualDao indDao; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final ListenerList listeners; - private final WorkerThreadPool pool; + private final IndexerConfig config; + private UpdateUrisTaskImpl impl; - private final Status status; - private final SearchEngine searchEngine; + private final Collection uris; + private Date since = new Date(); - public static class Deferrable implements SearchIndexerImpl.DeferrableTask { - Collection uris; - public Deferrable(Collection uris) { this.uris = uris; } + public UpdateUrisTask(IndexerConfig config, Collection uris) { + this.config = config; + this.uris = new HashSet<>(uris); + } - @Override - public Task makeRunnable(IndexingUriFinderList uriFinders, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - return new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool); + static void runNow(Collection uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { + UpdateUrisTaskImpl impl = new UpdateUrisTaskImpl(uris, excluders, modifiers, indDao, listeners, pool); + impl.run(); + } + + @Override + public void run() { + impl = new UpdateUrisTaskImpl(config, uris); + impl.run(); + } + + @Override + public SearchIndexerStatus getStatus() { + if (impl != null) { + return impl.getStatus(); + } + + return new SearchIndexerStatus(PROCESSING_URIS, since, new UriCounts(0, 0, 0, uris.size(), uris.size())); + } + + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); } } - public UpdateUrisTask(Collection uris, - SearchIndexExcluderList excluders, DocumentModifierList modifiers, - IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - this.uris = new HashSet<>(uris); - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; + private static class UpdateUrisTaskImpl implements Task { + private final Collection uris; + private final IndividualDao indDao; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final ListenerList listeners; + private final WorkerThreadPool pool; - this.status = new Status(this, uris.size(), 500); + private final Status status; + private final SearchEngine searchEngine; - this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + public UpdateUrisTaskImpl(IndexerConfig config, Collection uris) { + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); - } + this.uris = uris; + this.status = new Status(this, uris.size(), 500); - @Override - public void run() { - listeners.fireEvent(new Event(START_URIS, status - .getSearchIndexerStatus())); - excluders.startIndexing(); - modifiers.startIndexing(); + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + } - for (String uri : uris) { - if (isInterrupted()) { - log.info("Interrupted: " + status.getSearchIndexerStatus()); - break; - } else if (uri == null) { - // Nothing to do - } else { - Individual ind = getIndividual(uri); - if (ind == null) { - deleteDocument(uri); - } else if (isExcluded(ind)) { - excludeDocument(uri); - } else { - updateDocument(ind); - } - } - } - pool.waitUntilIdle(); + public UpdateUrisTaskImpl(Collection uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { + this.uris = uris; + this.excluders = excluders; + this.modifiers = modifiers; + this.indDao = indDao; + this.listeners = listeners; + this.pool = pool; + this.status = new Status(this, uris.size(), 500); - commitChanges(); + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + } - excluders.stopIndexing(); - modifiers.stopIndexing(); - listeners.fireEvent(new Event(STOP_URIS, status - .getSearchIndexerStatus())); - } + @Override + public void run() { + listeners.fireEvent(new Event(START_URIS, status.getSearchIndexerStatus())); + excluders.startIndexing(); + modifiers.startIndexing(); - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } + for (String uri : uris) { + if (isInterrupted()) { + log.info("Interrupted: " + status.getSearchIndexerStatus()); + break; + } else if (uri == null) { + // Nothing to do + } else { + Individual ind = getIndividual(uri); + if (ind == null) { + deleteDocument(uri); + } else if (isExcluded(ind)) { + excludeDocument(uri); + } else { + updateDocument(ind); + } + } + } + pool.waitUntilIdle(); - private Individual getIndividual(String uri) { - Individual ind = indDao.getIndividualByURI(uri); - if (ind == null) { - log.debug("Found no individual for '" + uri + "'"); - } - return ind; - } + commitChanges(); - private boolean isExcluded(Individual ind) { - return excluders.isExcluded(ind); - } + excluders.stopIndexing(); + modifiers.stopIndexing(); + listeners.fireEvent(new Event(STOP_URIS, status.getSearchIndexerStatus())); + } - /** A delete is fast enough to be done synchronously. */ - private void deleteDocument(String uri) { - try { - searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); - status.incrementDeletes(); - log.debug("deleted '" + uri + "' from search index."); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to delete '" + uri + "' from search index: " - + "the search engine is not responding."); - } catch (Exception e) { - log.warn("Failed to delete '" + uri + "' from search index", e); - } - } + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } - /** An exclusion is just a delete for different reasons. */ - private void excludeDocument(String uri) { - try { - searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); - status.incrementExclusions(); - log.debug("excluded '" + uri + "' from search index."); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to exclude '" + uri + "' from search index: " - + "the search engine is not responding.", e); - } catch (Exception e) { - log.warn("Failed to exclude '" + uri + "' from search index", e); - } - } + private Individual getIndividual(String uri) { + Individual ind = indDao.getIndividualByURI(uri); + if (ind == null) { + log.debug("Found no individual for '" + uri + "'"); + } + return ind; + } - private void updateDocument(Individual ind) { - Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); - pool.submit(workUnit, this); - log.debug("scheduled update to " + ind); - } + private boolean isExcluded(Individual ind) { + return excluders.isExcluded(ind); + } - private void fireEvent(Event event) { - listeners.fireEvent(event); - if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { - commitChanges(); - } - } + /** + * A delete is fast enough to be done synchronously. + */ + private void deleteDocument(String uri) { + try { + searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); + status.incrementDeletes(); + log.debug("deleted '" + uri + "' from search index."); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to delete '" + uri + "' from search index: " + + "the search engine is not responding."); + } catch (Exception e) { + log.warn("Failed to delete '" + uri + "' from search index", e); + } + } - private void commitChanges() { - try { - searchEngine.commit(); - } catch (SearchEngineException e) { - log.warn("Failed to commit changes.", e); - } - } + /** + * An exclusion is just a delete for different reasons. + */ + private void excludeDocument(String uri) { + try { + searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); + status.incrementExclusions(); + log.debug("excluded '" + uri + "' from search index."); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to exclude '" + uri + "' from search index: " + + "the search engine is not responding.", e); + } catch (Exception e) { + log.warn("Failed to exclude '" + uri + "' from search index", e); + } + } - @Override - public void notifyWorkUnitCompletion(Runnable workUnit) { - log.debug("completed update to " - + ((UpdateDocumentWorkUnit) workUnit).getInd()); - status.incrementUpdates(); - } + private void updateDocument(Individual ind) { + Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); + pool.submit(workUnit, this); + log.debug("scheduled update to " + ind); + } - @Override - public SearchIndexerStatus getStatus() { - return status.getSearchIndexerStatus(); - } + private void fireEvent(Event event) { + listeners.fireEvent(event); + if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { + commitChanges(); + } + } - // ---------------------------------------------------------------------- - // helper classes - // ---------------------------------------------------------------------- + private void commitChanges() { + try { + searchEngine.commit(); + } catch (SearchEngineException e) { + log.warn("Failed to commit changes.", e); + } + } - /** - * A thread-safe collection of status information. All methods are - * synchronized. - */ - private static class Status { - private final UpdateUrisTask parent; - private final int total; - private final int progressInterval; - private int updated = 0; - private int deleted = 0; - private int excluded = 0; - private Date since = new Date(); + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + log.debug("completed update to " + + ((UpdateDocumentWorkUnit) workUnit).getInd()); + status.incrementUpdates(); + } - public Status(UpdateUrisTask parent, int total, int progressInterval) { - this.parent = parent; - this.total = total; - this.progressInterval = progressInterval; - } + @Override + public SearchIndexerStatus getStatus() { + return status.getSearchIndexerStatus(); + } - public synchronized void incrementUpdates() { - updated++; - since = new Date(); - maybeFireProgressEvent(); - } + /** + * A thread-safe collection of status information. All methods are + * synchronized. + */ + private static class Status { + private final UpdateUrisTaskImpl parent; + private final int total; + private final int progressInterval; + private int updated = 0; + private int deleted = 0; + private int excluded = 0; + private Date since = new Date(); - public synchronized void incrementDeletes() { - deleted++; - since = new Date(); - } + public Status(UpdateUrisTaskImpl parent, int total, int progressInterval) { + this.parent = parent; + this.total = total; + this.progressInterval = progressInterval; + } - public synchronized void incrementExclusions() { - excluded++; - since = new Date(); - } + public synchronized void incrementUpdates() { + updated++; + since = new Date(); + maybeFireProgressEvent(); + } - private void maybeFireProgressEvent() { - if (updated > 0 && updated % progressInterval == 0) { - parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); - } - } + public synchronized void incrementDeletes() { + deleted++; + since = new Date(); + } - public synchronized SearchIndexerStatus getSearchIndexerStatus() { - int remaining = total - updated - deleted - excluded; - return new SearchIndexerStatus(PROCESSING_URIS, since, - new UriCounts(excluded, deleted, updated, remaining, total)); - } + public synchronized void incrementExclusions() { + excluded++; + since = new Date(); + } - } + private void maybeFireProgressEvent() { + if (updated > 0 && updated % progressInterval == 0) { + parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); + } + } - /** - * This will be first in the list of SearchIndexExcluders. - */ - public static class ExcludeIfNoVClasses implements SearchIndexExcluder { - @Override - public String checkForExclusion(Individual ind) { - List vclasses = ind.getVClasses(false); - if (vclasses == null || vclasses.isEmpty()) { - return "Individual " + ind + " has no classes."; - } - return null; - } + public synchronized SearchIndexerStatus getSearchIndexerStatus() { + int remaining = total - updated - deleted - excluded; + return new SearchIndexerStatus(PROCESSING_URIS, since, + new UriCounts(excluded, deleted, updated, remaining, total)); + } + } + } - @Override - public String toString() { - return "Internal: ExcludeIfNoVClasses"; - } + // ---------------------------------------------------------------------- + // helper classes + // ---------------------------------------------------------------------- + /** + * This will be first in the list of SearchIndexExcluders. + */ + public static class ExcludeIfNoVClasses implements SearchIndexExcluder { + @Override + public String checkForExclusion(Individual ind) { + List vclasses = ind.getVClasses(false); + if (vclasses == null || vclasses.isEmpty()) { + return "Individual " + ind + " has no classes."; + } + return null; + } - } -} + @Override + public String toString() { + return "Internal: ExcludeIfNoVClasses"; + } + } +} \ No newline at end of file