diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index 21753ada5..fc3b3667c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -40,10 +40,18 @@ public interface SearchIndexer extends Application.Module { * unpause(). Fires a PAUSED event to listeners. * * This call has no effect if already paused, or if called after shutdown. - */ + */ void pause(); - /** + /** + * Stop processing new tasks. Requests will be ignored and the index rebuilt when unpaused. + * Fires a PAUSED event to listeners. + * + * This call has no effect if already paused, or if called after shutdown. + */ + void pauseWithoutDeferring(); + + /** * Resume processing new tasks. Any requests that were received since the * call to pause() will now be scheduled for processing. Fires an UNPAUSED * event to listeners. diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 38af537d8..2dcd771d4 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -104,7 +104,7 @@ public class ABoxRecomputer { } try { if (searchIndexer != null) { - searchIndexer.pause(); + searchIndexer.pauseWithoutDeferring(); } recomputeABox(); } finally { diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java index d1f198176..55c7bd342 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -9,30 +9,27 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex import java.io.ByteArrayInputStream; import java.io.UnsupportedEncodingException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.StringUtils; +import com.hp.hpl.jena.graph.Triple; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.hp.hpl.jena.rdf.model.Literal; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; -import com.hp.hpl.jena.rdf.model.Property; -import com.hp.hpl.jena.rdf.model.RDFNode; -import com.hp.hpl.jena.rdf.model.Resource; -import com.hp.hpl.jena.rdf.model.ResourceFactory; import com.hp.hpl.jena.rdf.model.Statement; -import com.hp.hpl.jena.rdf.model.StmtIterator; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; +import org.apache.jena.riot.RDFLanguages; +import org.apache.jena.riot.RiotReader; /** * When a change is heard, wait for an interval to see if more changes come in. @@ -65,7 +62,7 @@ public class IndexingChangeListener implements ChangeListener, private final SearchIndexer searchIndexer; private final Ticker ticker; - private volatile boolean paused = true; + private final Model defaultModel; /** All access to the list must be synchronized. */ private final List changes; @@ -73,82 +70,28 @@ public class IndexingChangeListener implements ChangeListener, public IndexingChangeListener(SearchIndexer searchIndexer) { this.searchIndexer = searchIndexer; this.ticker = new Ticker(); + this.defaultModel = ModelFactory.createDefaultModel(); this.changes = new ArrayList<>(); searchIndexer.addListener(this); } private synchronized void noteChange(Statement stmt) { - try { - changes.add(sanitize(stmt)); - if (!paused) { - ticker.start(); - } - } catch (Exception e) { - log.warn("Failed to sanitize this statement: " + stmt); - } - } - - private Statement sanitize(Statement rawStmt) { - return ResourceFactory.createStatement( - sanitizeSubject(rawStmt.getSubject()), - sanitizePredicate(rawStmt.getPredicate()), - sanitizeObject(rawStmt.getObject())); - } - - private Resource sanitizeSubject(Resource rawSubject) { - if (rawSubject.isURIResource()) { - return ResourceFactory.createResource(rawSubject.getURI()); - } - return ResourceFactory.createResource(); - } - - private Property sanitizePredicate(Property rawPredicate) { - return ResourceFactory.createProperty(rawPredicate.getURI()); - } - - private RDFNode sanitizeObject(RDFNode rawObject) { - if (rawObject.isURIResource()) { - return ResourceFactory.createResource(rawObject.asResource() - .getURI()); - } - if (rawObject.isResource()) { - return ResourceFactory.createResource(); - } - Literal l = rawObject.asLiteral(); - if (StringUtils.isNotEmpty(l.getLanguage())) { - return ResourceFactory.createLangLiteral(l.getString(), - l.getLanguage()); - } - if (null != l.getDatatype()) { - return ResourceFactory.createTypedLiteral(l.getValue()); - } - return ResourceFactory.createPlainLiteral(l.getString()); + changes.add(stmt); + ticker.start(); } @Override public void receiveSearchIndexerEvent(Event event) { - if (event.getType() == PAUSE) { - paused = true; - } else if (event.getType() == UNPAUSE) { - paused = false; - ticker.start(); - } else if (event.getType() == START_REBUILD) { - discardChanges(); - } } private synchronized void respondToTicker() { - if (!paused && !changes.isEmpty()) { + if (!changes.isEmpty()) { searchIndexer.scheduleUpdatesForStatements(changes); changes.clear(); } } - private synchronized void discardChanges() { - changes.clear(); - } - public void shutdown() { ticker.shutdown(); } @@ -176,33 +119,38 @@ public class IndexingChangeListener implements ChangeListener, } } else { log.debug("ignoring event " + event.getClass().getName() + " " - + event); + + event); } } - // TODO avoid overhead of Model. // TODO avoid duplication with JenaChangeListener private Statement parseTriple(String serializedTriple) { - try { - Model m = ModelFactory.createDefaultModel(); - m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), - null, "N3"); - StmtIterator sit = m.listStatements(); - if (!sit.hasNext()) { - throw new RuntimeException("no triple parsed from change event"); - } else { - Statement s = sit.nextStatement(); - if (sit.hasNext()) { - log.warn("More than one triple parsed from change event"); - } - return s; - } - } catch (RuntimeException riot) { - log.error("Failed to parse triple " + serializedTriple, riot); - throw riot; - } catch (UnsupportedEncodingException uee) { - throw new RuntimeException(uee); - } + try { + // Use RiotReader to parse a Triple + // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " ."; + Iterator it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null); + + if (it.hasNext()) { + Triple triple = it.next(); + + if (it.hasNext()) { + log.warn("More than one triple parsed from change event"); + } + + // Use the retained defaultModel instance to convert the Triple to a Statement + // This does not add the Statement to the Model, so the Statement can be disposed when unused + // And whilst the Model is attached to the Statement, using a single instance means only one Model + // is created and attached to all of the Statements created by this instance + return defaultModel.asStatement(triple); + } else { + throw new RuntimeException("no triple parsed from change event"); + } + } catch (RuntimeException riot) { + log.error("Failed to parse triple " + serializedTriple, riot); + throw riot; + } catch (UnsupportedEncodingException uee) { + throw new RuntimeException(uee); + } } // ---------------------------------------------------------------------- diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index 04db79ec6..c9a94329c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletContext; +import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +87,7 @@ public class SearchIndexerImpl implements SearchIndexer { private final ListenerList listeners = new ListenerList(); private final TaskQueue taskQueue = new TaskQueue(); - private final Scheduler scheduler = new Scheduler(taskQueue); + private final Scheduler scheduler = new Scheduler(this, taskQueue); private Integer threadPoolSize; private WorkerThreadPool pool; @@ -97,6 +98,9 @@ public class SearchIndexerImpl implements SearchIndexer { private Set uriFinders; private WebappDaoFactory wadf; + private boolean ignoreTasksWhilePaused = false; + private boolean rebuildOnUnpause = false; + // ---------------------------------------------------------------------- // ConfigurationBeanLoader methods. // ---------------------------------------------------------------------- @@ -197,16 +201,32 @@ public class SearchIndexerImpl implements SearchIndexer { @Override public void pause() { if (!isPaused() && !isShutdown()) { + ignoreTasksWhilePaused = false; + rebuildOnUnpause = false; scheduler.pause(); fireEvent(PAUSE); } } + @Override + public void pauseWithoutDeferring() { + if (!isPaused() && !isShutdown()) { + ignoreTasksWhilePaused = true; + rebuildOnUnpause = false; + scheduler.pause(); + fireEvent(PAUSE); + } + } + @Override public void unpause() { if (isPaused() && !isShutdown()) { scheduler.unpause(); fireEvent(UNPAUSE); + if (rebuildOnUnpause) { + rebuildOnUnpause = false; + rebuildIndex(); + } } } @@ -244,11 +264,12 @@ public class SearchIndexerImpl implements SearchIndexer { if (changes == null || changes.isEmpty()) { return; } + if (ignoreTasksWhilePaused && isPaused()) { + rebuildOnUnpause = true; + return; + } - Task task = new UpdateStatementsTask(changes, createFindersList(), - createExcludersList(), createModifiersList(), - wadf.getIndividualDao(), listeners, pool); - scheduler.scheduleTask(task); + scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes)); log.debug("Scheduled updates for " + changes.size() + " statements."); } @@ -261,10 +282,12 @@ public class SearchIndexerImpl implements SearchIndexer { if (uris == null || uris.isEmpty()) { return; } + if (ignoreTasksWhilePaused && isPaused()) { + rebuildOnUnpause = true; + return; + } - Task task = new UpdateUrisTask(uris, createExcludersList(), - createModifiersList(), wadf.getIndividualDao(), listeners, pool); - scheduler.scheduleTask(task); + scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris)); log.debug("Scheduled updates for " + uris.size() + " uris."); } @@ -273,10 +296,12 @@ public class SearchIndexerImpl implements SearchIndexer { if (isShutdown()) { log.warn("Call to rebuildIndex after shutdown."); } + if (ignoreTasksWhilePaused && isPaused()) { + rebuildOnUnpause = true; + return; + } - Task task = new RebuildIndexTask(createExcludersList(), - createModifiersList(), wadf.getIndividualDao(), listeners, pool); - scheduler.scheduleTask(task); + scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this))); log.debug("Scheduled a full rebuild."); } @@ -373,10 +398,12 @@ public class SearchIndexerImpl implements SearchIndexer { private static class Scheduler { private final TaskQueue taskQueue; private final List deferredQueue; + private final SearchIndexerImpl indexer; private volatile boolean started; private volatile boolean paused; - public Scheduler(TaskQueue taskQueue) { + public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) { + this.indexer = indexer; this.taskQueue = taskQueue; this.deferredQueue = new ArrayList(); } @@ -390,13 +417,13 @@ public class SearchIndexerImpl implements SearchIndexer { } public synchronized void scheduleTask(Task task) { - if (paused || !started) { - deferredQueue.add(task); - log.debug("added task to deferred queue: " + task); - } else { + if (paused || !started) { + deferredQueue.add(task); + log.debug("added task to deferred queue: " + task); + } else { taskQueue.scheduleTask(task); log.debug("added task to task queue: " + task); - } + } } public synchronized void start() { @@ -419,10 +446,12 @@ public class SearchIndexerImpl implements SearchIndexer { private void processDeferredTasks() { for (Task task : deferredQueue) { - taskQueue.scheduleTask(task); - log.debug("moved task from deferred queue to task queue: " - + task); + taskQueue.scheduleTask(task); + log.debug("moved task from deferred queue to task queue: " + task); } + + // Empty out the deferred queue as we've now processed it + deferredQueue.clear(); } } @@ -525,6 +554,36 @@ public class SearchIndexerImpl implements SearchIndexer { } } + /** + * Interface for tasks to access the Indexer config + */ + public static interface IndexerConfig { + public IndexingUriFinderList uriFinderList(); + public SearchIndexExcluderList excluderList(); + public DocumentModifierList documentModifierList(); + public IndividualDao individualDao(); + public ListenerList listenerList(); + public WorkerThreadPool workerThreadPool(); + } + + /** + * Implementation of IndexerConfig + * Defers access to the configuration until the task is running, so a Task + * created and deferred before the indexer is started will not cause a NullPointerException + */ + private static class IndexerConfigImpl implements IndexerConfig { + private final SearchIndexerImpl sii; + + public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; } + + public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); } + public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); } + public DocumentModifierList documentModifierList() { return sii.createModifiersList(); } + public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); } + public ListenerList listenerList() { return sii.listeners; } + public WorkerThreadPool workerThreadPool() { return sii.pool; } + } + public static interface Task extends Runnable { public SearchIndexerStatus getStatus(); @@ -629,4 +688,28 @@ public class SearchIndexerImpl implements SearchIndexer { } } + + private static class StatementList { + List changes; + + public StatementList() { + changes = new ArrayList(); + } + + public synchronized void addStatement(Statement stmt) { + changes.add(stmt); + } + + public synchronized List getStatements() { + try { + return new ArrayList<>(changes); + } finally { + changes.clear(); + } + } + + public synchronized int size() { + return changes.size(); + } + }; } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java index 60be6dc7e..77af681cb 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java @@ -22,6 +22,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -37,117 +38,150 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud */ public class RebuildIndexTask implements Task { private static final Log log = LogFactory.getLog(RebuildIndexTask.class); - - private final IndividualDao indDao; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final ListenerList listeners; - private final WorkerThreadPool pool; - private final SearchEngine searchEngine; - private final Date requestedAt; - private final int documentsBefore; - private volatile SearchIndexerStatus status; + private final IndexerConfig config; + private RebuildIndexTaskImpl impl; - public RebuildIndexTask(SearchIndexExcluderList excluders, - DocumentModifierList modifiers, IndividualDao indDao, - ListenerList listeners, WorkerThreadPool pool) { - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; - - this.searchEngine = ApplicationUtils.instance().getSearchEngine(); - - this.requestedAt = new Date(); - this.documentsBefore = getDocumentCount(); - this.status = buildStatus(REBUILDING, 0); - } + public RebuildIndexTask(IndexerConfig config) { + this.config = config; + this.requestedAt = new Date(); + } @Override public void run() { - listeners.fireEvent(new Event(START_REBUILD, status)); - - Collection uris = getAllUrisInTheModel(); - - if (!isInterrupted()) { - updateTheUris(uris); - if (!isInterrupted()) { - deleteOutdatedDocuments(); - } - } - - status = buildStatus(REBUILDING, getDocumentCount()); - listeners.fireEvent(new Event(STOP_REBUILD, status)); - } - - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } - - private Collection getAllUrisInTheModel() { - return indDao.getAllIndividualUris(); - } - - private void updateTheUris(Collection uris) { - new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) - .run(); - } - - private void deleteOutdatedDocuments() { - String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]"; - try { - searchEngine.deleteByQuery(query); - searchEngine.commit(); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to delete outdated documents from the search index: " - + "the search engine is not responding."); - } catch (SearchEngineException e) { - log.warn("Failed to delete outdated documents " - + "from the search index", e); - } - } - - private int getDocumentCount() { - try { - return searchEngine.documentCount(); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to get document count from the search index: " - + "the search engine is not responding."); - return 0; - } catch (SearchEngineException e) { - log.warn("Failed to get document count from the search index.", e); - return 0; - } - } - - private SearchIndexerStatus buildStatus(State state, int documentsAfter) { - return new SearchIndexerStatus(state, new Date(), new RebuildCounts( - documentsBefore, documentsAfter)); + impl = new RebuildIndexTaskImpl(config, requestedAt); + impl.run(); } @Override public SearchIndexerStatus getStatus() { - return status; + return impl == null ? null : impl.getStatus(); } @Override public void notifyWorkUnitCompletion(Runnable workUnit) { - // We don't submit any work units, so we won't see any calls to this. - log.error("Why was this called?"); + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); + } } @Override public String toString() { - return "RebuildIndexTask[requestedAt=" - + new SimpleDateFormat().format(requestedAt) + "]"; + return "RebuildIndexTask[requestedAt=" + new SimpleDateFormat().format(requestedAt) + "]"; } + private static class RebuildIndexTaskImpl implements Task { + private final IndexerConfig config; + + private final IndividualDao indDao; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final ListenerList listeners; + private final WorkerThreadPool pool; + private final SearchEngine searchEngine; + + private final Date requestedAt; + private final int documentsBefore; + + private volatile SearchIndexerStatus status; + + public RebuildIndexTaskImpl(IndexerConfig config, Date requestedAt) { + this.config = config; + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); + + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + + this.requestedAt = requestedAt; + this.documentsBefore = getDocumentCount(); + this.status = buildStatus(REBUILDING, 0); + } + + @Override + public void run() { + listeners.fireEvent(new Event(START_REBUILD, status)); + + Collection uris = getAllUrisInTheModel(); + + if (!isInterrupted()) { + updateTheUris(uris); + if (!isInterrupted()) { + deleteOutdatedDocuments(); + } + } + + status = buildStatus(REBUILDING, getDocumentCount()); + listeners.fireEvent(new Event(STOP_REBUILD, status)); + } + + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } + + private Collection getAllUrisInTheModel() { + return indDao.getAllIndividualUris(); + } + + private void updateTheUris(Collection uris) { + UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool); + } + + private void deleteOutdatedDocuments() { + String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]"; + try { + searchEngine.deleteByQuery(query); + searchEngine.commit(); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to delete outdated documents from the search index: " + + "the search engine is not responding."); + } catch (SearchEngineException e) { + log.warn("Failed to delete outdated documents " + + "from the search index", e); + } + } + + private int getDocumentCount() { + try { + return searchEngine.documentCount(); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to get document count from the search index: " + + "the search engine is not responding."); + return 0; + } catch (SearchEngineException e) { + log.warn("Failed to get document count from the search index.", e); + return 0; + } + } + + private SearchIndexerStatus buildStatus(State state, int documentsAfter) { + return new SearchIndexerStatus(state, new Date(), new RebuildCounts( + documentsBefore, documentsAfter)); + } + + @Override + public SearchIndexerStatus getStatus() { + return status; + } + + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + // We don't submit any work units, so we won't see any calls to this. + log.error("Why was this called?"); + } + + @Override + public String toString() { + return "RebuildIndexTask[requestedAt=" + + new SimpleDateFormat().format(requestedAt) + "]"; + } + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java index b6aa98894..9a12a04bc 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java @@ -14,6 +14,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -23,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -49,146 +51,169 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderLi * Set to remove duplicates, and then process the URIs in the set. */ public class UpdateStatementsTask implements Task { - private static final Log log = LogFactory - .getLog(UpdateStatementsTask.class); + private static final Log log = LogFactory.getLog(UpdateStatementsTask.class); - private final List changes; - private final IndexingUriFinderList uriFinders; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final IndividualDao indDao; - private final ListenerList listeners; - private final WorkerThreadPool pool; + private final IndexerConfig config; + private UpdateStatementsTaskImpl impl; - private final Set uris; - private final Status status; + private List changes; - public UpdateStatementsTask(List changes, - IndexingUriFinderList uriFinders, - SearchIndexExcluderList excluders, DocumentModifierList modifiers, - IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - this.changes = new ArrayList<>(changes); - this.uriFinders = uriFinders; - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; + public UpdateStatementsTask(IndexerConfig config, List changes) { + this.config = config; + this.changes = new ArrayList<>(changes); + } - this.uris = Collections.synchronizedSet(new HashSet()); + @Override + public void run() { + impl = new UpdateStatementsTaskImpl(config, changes); + impl.run(); + } + @Override + public SearchIndexerStatus getStatus() { + return impl == null ? null : impl.getStatus(); + } - this.status = new Status(changes.size(), 500, listeners); - } + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); + } + } - @Override - public void run() { - listeners.fireEvent(new Event(START_STATEMENTS, getStatus())); + private static class UpdateStatementsTaskImpl implements Task { + private final List changes; + private final IndexingUriFinderList uriFinders; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final IndividualDao indDao; + private final ListenerList listeners; + private final WorkerThreadPool pool; - findAffectedUris(); + private final Set uris; + private final Status status; - updateTheUris(); - listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); - } + public UpdateStatementsTaskImpl(IndexerConfig config, List changes) { + this.changes = changes; + this.uriFinders = config.uriFinderList(); + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); - private void findAffectedUris() { - log.debug("Tell finders we are starting."); - uriFinders.startIndexing(); + this.uris = Collections.synchronizedSet(new HashSet()); - for (Statement stmt : changes) { - if (isInterrupted()) { - log.info("Interrupted: " + status.getSearchIndexerStatus()); - return; - } else { - findUrisForStatement(stmt); - } - } - waitForWorkUnitsToComplete(); + this.status = new Status(changes.size(), 500, listeners); + } - log.debug("Tell finders we are stopping."); - uriFinders.stopIndexing(); - } + @Override + public void run() { + listeners.fireEvent(new Event(START_STATEMENTS, getStatus())); - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } + findAffectedUris(); - private void findUrisForStatement(Statement stmt) { - Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); - pool.submit(workUnit, this); - log.debug("scheduled uri finders for " + stmt); - } + updateTheUris(); + listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); + } - private void waitForWorkUnitsToComplete() { - pool.waitUntilIdle(); - } + private void findAffectedUris() { + log.debug("Tell finders we are starting."); + uriFinders.startIndexing(); - private void updateTheUris() { - new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) - .run(); - } + for (Statement stmt : changes) { + if (isInterrupted()) { + log.info("Interrupted: " + status.getSearchIndexerStatus()); + return; + } else { + findUrisForStatement(stmt); + } + } + waitForWorkUnitsToComplete(); - @Override - public SearchIndexerStatus getStatus() { - return status.getSearchIndexerStatus(); - } + log.debug("Tell finders we are stopping."); + uriFinders.stopIndexing(); + } - @Override - public void notifyWorkUnitCompletion(Runnable workUnit) { - FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } - Set foundUris = worker.getUris(); - Statement stmt = worker.getStatement(); - log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); + private void findUrisForStatement(Statement stmt) { + Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); + pool.submit(workUnit, this); + log.debug("scheduled uri finders for " + stmt); + } - uris.addAll(foundUris); - status.incrementProcessed(); - } + private void waitForWorkUnitsToComplete() { + pool.waitUntilIdle(); + } - // ---------------------------------------------------------------------- - // Helper classes - // ---------------------------------------------------------------------- + private void updateTheUris() { + UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool); + } - /** - * A thread-safe collection of status information. All methods are - * synchronized. - */ - private static class Status { - private final int total; - private final int progressInterval; - private final ListenerList listeners; - private int processed = 0; - private Date since = new Date(); + @Override + public SearchIndexerStatus getStatus() { + return status.getSearchIndexerStatus(); + } - public Status(int total, int progressInterval, ListenerList listeners) { - this.total = total; - this.progressInterval = progressInterval; - this.listeners = listeners; - } + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; - public synchronized void incrementProcessed() { - processed++; - since = new Date(); - maybeFireProgressEvent(); - } + Set foundUris = worker.getUris(); + Statement stmt = worker.getStatement(); + log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); - private void maybeFireProgressEvent() { - if (processed > 0 && processed % progressInterval == 0) { - listeners.fireEvent(new Event(PROGRESS, - getSearchIndexerStatus())); - } - } + uris.addAll(foundUris); + status.incrementProcessed(); + } - public synchronized SearchIndexerStatus getSearchIndexerStatus() { - int remaining = total - processed; - return new SearchIndexerStatus(PROCESSING_STMTS, since, - new StatementCounts(processed, remaining, total)); - } + // ---------------------------------------------------------------------- + // Helper classes + // ---------------------------------------------------------------------- - } + /** + * A thread-safe collection of status information. All methods are + * synchronized. + */ + private static class Status { + private final int total; + private final int progressInterval; + private final ListenerList listeners; + private int processed = 0; + private Date since = new Date(); + public Status(int total, int progressInterval, ListenerList listeners) { + this.total = total; + this.progressInterval = progressInterval; + this.listeners = listeners; + } + + public synchronized void incrementProcessed() { + processed++; + since = new Date(); + maybeFireProgressEvent(); + } + + private void maybeFireProgressEvent() { + if (processed > 0 && processed % progressInterval == 0) { + listeners.fireEvent(new Event(PROGRESS, + getSearchIndexerStatus())); + } + } + + public synchronized SearchIndexerStatus getSearchIndexerStatus() { + int remaining = total - processed; + return new SearchIndexerStatus(PROCESSING_STMTS, since, + new StatementCounts(processed, remaining, total)); + } + + } + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java index 3b61e29b0..2bfc7db38 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java @@ -7,11 +7,7 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; -import java.util.Collection; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +23,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; @@ -49,220 +46,268 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud * again at the end of the task. */ public class UpdateUrisTask implements Task { - private static final Log log = LogFactory.getLog(UpdateUrisTask.class); + private static final Log log = LogFactory.getLog(UpdateUrisTask.class); - private final Set uris; - private final IndividualDao indDao; - private final SearchIndexExcluderList excluders; - private final DocumentModifierList modifiers; - private final ListenerList listeners; - private final WorkerThreadPool pool; + private final IndexerConfig config; + private UpdateUrisTaskImpl impl; - private final Status status; - private final SearchEngine searchEngine; + private final Collection uris; + private Date since = new Date(); - public UpdateUrisTask(Collection uris, - SearchIndexExcluderList excluders, DocumentModifierList modifiers, - IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { - this.uris = new HashSet<>(uris); - this.excluders = excluders; - this.modifiers = modifiers; - this.indDao = indDao; - this.listeners = listeners; - this.pool = pool; + public UpdateUrisTask(IndexerConfig config, Collection uris) { + this.config = config; + this.uris = new HashSet<>(uris); + } - this.status = new Status(this, uris.size(), 500); + static void runNow(Collection uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { + UpdateUrisTaskImpl impl = new UpdateUrisTaskImpl(uris, excluders, modifiers, indDao, listeners, pool); + impl.run(); + } - this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + @Override + public void run() { + impl = new UpdateUrisTaskImpl(config, uris); + impl.run(); + } - } + @Override + public SearchIndexerStatus getStatus() { + if (impl != null) { + return impl.getStatus(); + } - @Override - public void run() { - listeners.fireEvent(new Event(START_URIS, status - .getSearchIndexerStatus())); - excluders.startIndexing(); - modifiers.startIndexing(); + return new SearchIndexerStatus(PROCESSING_URIS, since, new UriCounts(0, 0, 0, uris.size(), uris.size())); + } - for (String uri : uris) { - if (isInterrupted()) { - log.info("Interrupted: " + status.getSearchIndexerStatus()); - break; - } else if (uri == null) { - // Nothing to do - } else { - Individual ind = getIndividual(uri); - if (ind == null) { - deleteDocument(uri); - } else if (isExcluded(ind)) { - excludeDocument(uri); - } else { - updateDocument(ind); - } - } - } - pool.waitUntilIdle(); + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + if (impl != null) { + impl.notifyWorkUnitCompletion(workUnit); + } + } - commitChanges(); + private static class UpdateUrisTaskImpl implements Task { + private final Collection uris; + private final IndividualDao indDao; + private final SearchIndexExcluderList excluders; + private final DocumentModifierList modifiers; + private final ListenerList listeners; + private final WorkerThreadPool pool; - excluders.stopIndexing(); - modifiers.stopIndexing(); - listeners.fireEvent(new Event(STOP_URIS, status - .getSearchIndexerStatus())); - } + private final Status status; + private final SearchEngine searchEngine; - private boolean isInterrupted() { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return true; - } else { - return false; - } - } + public UpdateUrisTaskImpl(IndexerConfig config, Collection uris) { + this.excluders = config.excluderList(); + this.modifiers = config.documentModifierList(); + this.indDao = config.individualDao(); + this.listeners = config.listenerList(); + this.pool = config.workerThreadPool(); - private Individual getIndividual(String uri) { - Individual ind = indDao.getIndividualByURI(uri); - if (ind == null) { - log.debug("Found no individual for '" + uri + "'"); - } - return ind; - } + this.uris = uris; + this.status = new Status(this, uris.size(), 500); - private boolean isExcluded(Individual ind) { - return excluders.isExcluded(ind); - } + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + } - /** A delete is fast enough to be done synchronously. */ - private void deleteDocument(String uri) { - try { - searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); - status.incrementDeletes(); - log.debug("deleted '" + uri + "' from search index."); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to delete '" + uri + "' from search index: " - + "the search engine is not responding."); - } catch (Exception e) { - log.warn("Failed to delete '" + uri + "' from search index", e); - } - } + public UpdateUrisTaskImpl(Collection uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { + this.uris = uris; + this.excluders = excluders; + this.modifiers = modifiers; + this.indDao = indDao; + this.listeners = listeners; + this.pool = pool; + this.status = new Status(this, uris.size(), 500); - /** An exclusion is just a delete for different reasons. */ - private void excludeDocument(String uri) { - try { - searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); - status.incrementExclusions(); - log.debug("excluded '" + uri + "' from search index."); - } catch (SearchEngineNotRespondingException e) { - log.warn("Failed to exclude '" + uri + "' from search index: " - + "the search engine is not responding.", e); - } catch (Exception e) { - log.warn("Failed to exclude '" + uri + "' from search index", e); - } - } + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + } - private void updateDocument(Individual ind) { - Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); - pool.submit(workUnit, this); - log.debug("scheduled update to " + ind); - } + @Override + public void run() { + listeners.fireEvent(new Event(START_URIS, status.getSearchIndexerStatus())); + excluders.startIndexing(); + modifiers.startIndexing(); - private void fireEvent(Event event) { - listeners.fireEvent(event); - if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { - commitChanges(); - } - } + for (String uri : uris) { + if (isInterrupted()) { + log.info("Interrupted: " + status.getSearchIndexerStatus()); + break; + } else if (uri == null) { + // Nothing to do + } else { + Individual ind = getIndividual(uri); + if (ind == null) { + deleteDocument(uri); + } else if (isExcluded(ind)) { + excludeDocument(uri); + } else { + updateDocument(ind); + } + } + } + pool.waitUntilIdle(); - private void commitChanges() { - try { - searchEngine.commit(); - } catch (SearchEngineException e) { - log.warn("Failed to commit changes.", e); - } - } + commitChanges(); - @Override - public void notifyWorkUnitCompletion(Runnable workUnit) { - log.debug("completed update to " - + ((UpdateDocumentWorkUnit) workUnit).getInd()); - status.incrementUpdates(); - } + excluders.stopIndexing(); + modifiers.stopIndexing(); + listeners.fireEvent(new Event(STOP_URIS, status.getSearchIndexerStatus())); + } - @Override - public SearchIndexerStatus getStatus() { - return status.getSearchIndexerStatus(); - } + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } - // ---------------------------------------------------------------------- - // helper classes - // ---------------------------------------------------------------------- + private Individual getIndividual(String uri) { + Individual ind = indDao.getIndividualByURI(uri); + if (ind == null) { + log.debug("Found no individual for '" + uri + "'"); + } + return ind; + } - /** - * A thread-safe collection of status information. All methods are - * synchronized. - */ - private static class Status { - private final UpdateUrisTask parent; - private final int total; - private final int progressInterval; - private int updated = 0; - private int deleted = 0; - private int excluded = 0; - private Date since = new Date(); + private boolean isExcluded(Individual ind) { + return excluders.isExcluded(ind); + } - public Status(UpdateUrisTask parent, int total, int progressInterval) { - this.parent = parent; - this.total = total; - this.progressInterval = progressInterval; - } + /** + * A delete is fast enough to be done synchronously. + */ + private void deleteDocument(String uri) { + try { + searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); + status.incrementDeletes(); + log.debug("deleted '" + uri + "' from search index."); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to delete '" + uri + "' from search index: " + + "the search engine is not responding."); + } catch (Exception e) { + log.warn("Failed to delete '" + uri + "' from search index", e); + } + } - public synchronized void incrementUpdates() { - updated++; - since = new Date(); - maybeFireProgressEvent(); - } + /** + * An exclusion is just a delete for different reasons. + */ + private void excludeDocument(String uri) { + try { + searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); + status.incrementExclusions(); + log.debug("excluded '" + uri + "' from search index."); + } catch (SearchEngineNotRespondingException e) { + log.warn("Failed to exclude '" + uri + "' from search index: " + + "the search engine is not responding.", e); + } catch (Exception e) { + log.warn("Failed to exclude '" + uri + "' from search index", e); + } + } - public synchronized void incrementDeletes() { - deleted++; - since = new Date(); - } + private void updateDocument(Individual ind) { + Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); + pool.submit(workUnit, this); + log.debug("scheduled update to " + ind); + } - public synchronized void incrementExclusions() { - excluded++; - since = new Date(); - } + private void fireEvent(Event event) { + listeners.fireEvent(event); + if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { + commitChanges(); + } + } - private void maybeFireProgressEvent() { - if (updated > 0 && updated % progressInterval == 0) { - parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); - } - } + private void commitChanges() { + try { + searchEngine.commit(); + } catch (SearchEngineException e) { + log.warn("Failed to commit changes.", e); + } + } - public synchronized SearchIndexerStatus getSearchIndexerStatus() { - int remaining = total - updated - deleted - excluded; - return new SearchIndexerStatus(PROCESSING_URIS, since, - new UriCounts(excluded, deleted, updated, remaining, total)); - } + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + log.debug("completed update to " + + ((UpdateDocumentWorkUnit) workUnit).getInd()); + status.incrementUpdates(); + } - } + @Override + public SearchIndexerStatus getStatus() { + return status.getSearchIndexerStatus(); + } - /** - * This will be first in the list of SearchIndexExcluders. - */ - public static class ExcludeIfNoVClasses implements SearchIndexExcluder { - @Override - public String checkForExclusion(Individual ind) { - List vclasses = ind.getVClasses(false); - if (vclasses == null || vclasses.isEmpty()) { - return "Individual " + ind + " has no classes."; - } - return null; - } + /** + * A thread-safe collection of status information. All methods are + * synchronized. + */ + private static class Status { + private final UpdateUrisTaskImpl parent; + private final int total; + private final int progressInterval; + private int updated = 0; + private int deleted = 0; + private int excluded = 0; + private Date since = new Date(); - @Override - public String toString() { - return "Internal: ExcludeIfNoVClasses"; - } + public Status(UpdateUrisTaskImpl parent, int total, int progressInterval) { + this.parent = parent; + this.total = total; + this.progressInterval = progressInterval; + } - } -} + public synchronized void incrementUpdates() { + updated++; + since = new Date(); + maybeFireProgressEvent(); + } + + public synchronized void incrementDeletes() { + deleted++; + since = new Date(); + } + + public synchronized void incrementExclusions() { + excluded++; + since = new Date(); + } + + private void maybeFireProgressEvent() { + if (updated > 0 && updated % progressInterval == 0) { + parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); + } + } + + public synchronized SearchIndexerStatus getSearchIndexerStatus() { + int remaining = total - updated - deleted - excluded; + return new SearchIndexerStatus(PROCESSING_URIS, since, + new UriCounts(excluded, deleted, updated, remaining, total)); + } + } + } + + // ---------------------------------------------------------------------- + // helper classes + // ---------------------------------------------------------------------- + /** + * This will be first in the list of SearchIndexExcluders. + */ + public static class ExcludeIfNoVClasses implements SearchIndexExcluder { + @Override + public String checkForExclusion(Individual ind) { + List vclasses = ind.getVClasses(false); + if (vclasses == null || vclasses.isEmpty()) { + return "Individual " + ind + " has no classes."; + } + return null; + } + + @Override + public String toString() { + return "Internal: ExcludeIfNoVClasses"; + } + } +} \ No newline at end of file diff --git a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java index e2b8670b6..bd902cb58 100644 --- a/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java +++ b/webapp/test/stubs/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStub.java @@ -31,7 +31,12 @@ public class SearchIndexerStub implements SearchIndexer { paused = true; } - @Override + @Override + public void pauseWithoutDeferring() { + paused = true; + } + + @Override public void unpause() { paused = false; }