From dee53e3aac3fb400ea4884c3aa2ec9b8c91746bc Mon Sep 17 00:00:00 2001 From: Jim Blake Date: Mon, 12 Jan 2015 15:15:16 -0500 Subject: [PATCH] VIVO-870 Implement UpdateStatementsTask. Also add the listener to support it and get rid of the last of the vestigial classes. --- .../modules/searchIndexer/SearchIndexer.java | 27 +- .../searchIndexer/SearchIndexerStatus.java | 44 +- .../webapp/search/IndexingException.java | 12 - .../vitro/webapp/search/SearchException.java | 10 - .../vitro/webapp/search/SearchIndexer.java | 212 ------- .../webapp/search/beans/IndexerIface.java | 58 -- .../search/controller/IndexController.java | 24 +- .../search/controller/IndexHistory.java | 11 +- .../SkipIndividualException.java | 11 - .../webapp/search/indexing/IndexBuilder.java | 546 ------------------ .../search/indexing/IndexWorkerThread.java | 108 ---- .../indexing/IndexingEventListener.java | 19 - .../indexing/SearchReindexingListener.java | 171 ------ .../searchindex/IndexingChangeListener.java | 184 ++++++ .../webapp/searchindex/SearchIndexerImpl.java | 51 +- .../searchindex/SearchIndexerSetup.java | 116 ++-- .../indexing/IndexingUriFinder.java | 48 +- .../tasks/FindUrisForStatementWorkUnit.java | 44 ++ .../searchindex/tasks/RebuildIndexTask.java | 128 +++- .../tasks/UpdateStatementsTask.java | 209 +++++++ .../searchindex/tasks/UpdateUrisTask.java | 26 +- .../DeveloperDisabledChangeListener.java | 53 ++ .../indexing/IndexBuilderThreadTest.java | 38 -- .../body/admin/searchIndexStatus.ftl | 12 +- 24 files changed, 845 insertions(+), 1317 deletions(-) delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/IndexingException.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchException.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchIndexer.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/beans/IndexerIface.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/documentBuilding/SkipIndividualException.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexingEventListener.java delete mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java create mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java create mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/FindUrisForStatementWorkUnit.java create mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java create mode 100644 webapp/src/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java delete mode 100644 webapp/test/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilderThreadTest.java diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java index e616b347f..724d2de41 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexer.java @@ -3,6 +3,9 @@ package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer; import java.util.Collection; +import java.util.List; + +import com.hp.hpl.jena.rdf.model.Statement; import edu.cornell.mannlib.vitro.webapp.modules.Application; @@ -13,6 +16,23 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application; * removeListener(). */ public interface SearchIndexer extends Application.Module { + /** + * Update any search documents that are affected by these statements. + * + * These statements are a mixture of additions and deletions. In either + * case, we feed them to the URI finders to see what individuals might have + * been affected by the change. + * + * We accumulate a batch of affected URIs, removing duplicates if they + * occur, and then submit them for updates. + * + * @param urls + * if null or empty, this call has no effect. + * @throws IllegalStateException + * if called after shutdown() + */ + void scheduleUpdatesForStatements(List changes); + /** * Update the search documents for these URIs. * @@ -116,7 +136,7 @@ public interface SearchIndexer extends Application.Module { START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS, - REBUILD_REQUESTED, REBUILD_COMPLETE, + START_REBUILD, STOP_REBUILD, SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE } @@ -136,6 +156,11 @@ public interface SearchIndexer extends Application.Module { public SearchIndexerStatus getStatus() { return status; } + + @Override + public String toString() { + return type + ", " + status; + } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStatus.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStatus.java index 59f7e8f1b..c4d9848c6 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStatus.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/modules/searchIndexer/SearchIndexerStatus.java @@ -2,6 +2,7 @@ package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer; +import java.text.SimpleDateFormat; import java.util.Date; /** @@ -55,12 +56,17 @@ public class SearchIndexerStatus { return counts; } + @Override + public String toString() { + return new SimpleDateFormat().format(since) + ", " + counts; + } + // ---------------------------------------------------------------------- // helper classes // ---------------------------------------------------------------------- public enum State { - IDLE, PROCESSING_URIS, PROCESSING_STMTS, PREPARING_REBUILD, SHUTDOWN + IDLE, PROCESSING_URIS, PROCESSING_STMTS, REBUILDING, SHUTDOWN } public abstract static class Counts { @@ -125,6 +131,11 @@ public class SearchIndexerStatus { return total; } + @Override + public String toString() { + return "[deleted=" + deleted + ", updated=" + updated + + ", remaining=" + remaining + ", total=" + total + "]"; + } } public static class StatementCounts extends Counts { @@ -151,25 +162,46 @@ public class SearchIndexerStatus { return total; } + @Override + public String toString() { + return "[processed=" + processed + ", remaining=" + remaining + + ", total=" + total + "]"; + } } public static class RebuildCounts extends Counts { - private final int numberOfIndividuals; + private final int documentsBefore; + private final int documentsAfter; - public RebuildCounts(int numberOfIndividuals) { + public RebuildCounts(int documentsBefore, int documentsAfter) { super(Type.REBUILD_COUNTS); - this.numberOfIndividuals = numberOfIndividuals; + this.documentsBefore = documentsBefore; + this.documentsAfter = documentsAfter; } - public int getNumberOfIndividuals() { - return numberOfIndividuals; + public int getDocumentsBefore() { + return documentsBefore; } + public int getDocumentsAfter() { + return documentsAfter; + } + + @Override + public String toString() { + return "[documentsBefore=" + documentsBefore + ", documentsAfter=" + + documentsAfter + "]"; + } } public static class NoCounts extends Counts { public NoCounts() { super(Type.NO_COUNTS); } + + @Override + public String toString() { + return "[]"; + } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/IndexingException.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/IndexingException.java deleted file mode 100644 index 6d77947d0..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/IndexingException.java +++ /dev/null @@ -1,12 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search; - -public class IndexingException extends java.lang.Exception { - public IndexingException(String i){ - super(i); - } - public IndexingException(){ - super(); - } -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchException.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchException.java deleted file mode 100644 index 2e79f7fec..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchException.java +++ /dev/null @@ -1,10 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search; - -public class SearchException extends Exception{ - public SearchException(String in){ - super(in); - } - -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchIndexer.java deleted file mode 100644 index 7f9e14ba8..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/SearchIndexer.java +++ /dev/null @@ -1,212 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search; - -import java.util.HashSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils; -import edu.cornell.mannlib.vitro.webapp.beans.Individual; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery.Order; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResponse; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResultDocumentList; -import edu.cornell.mannlib.vitro.webapp.search.IndexingException; -import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface; - - -public class SearchIndexer implements IndexerIface { - private final static Log log = LogFactory.getLog(SearchIndexer.class); - - protected SearchEngine server; - protected boolean indexing; - protected HashSet urisIndexed; - - /** - * System is shutting down if true. - */ - protected boolean shutdownRequested = false; - - /** - * This records when a full re-index starts so that once it is done - * all the documents in the search index that are earlier than the - * reindexStart can be removed. - */ - protected long reindexStart = 0L; - - /** - * If true, then a full index rebuild was requested and reindexStart - * will be used to determine what documents to remove from the index - * once the re-index is complete. - */ - protected boolean doingFullIndexRebuild = false; - - public SearchIndexer( SearchEngine server){ - this.server = server; - } - - @Override - public void index(Individual ind) throws IndexingException { - if( ! indexing ) - throw new IndexingException("SearchIndexer: must call " + - "startIndexing() before index()."); - - if( ind == null ) { - log.debug("Individual to index was null, ignoring."); - return; - } - - try{ - if( urisIndexed.contains(ind.getURI()) ){ - log.debug("already indexed " + ind.getURI() ); - return; - }else{ - SearchInputDocument doc = null; - synchronized(this){ - urisIndexed.add(ind.getURI()); - } - log.debug("indexing " + ind.getURI()); -// doc = individualToSearchDoc.translate(ind); - - if( doc != null){ - if( log.isDebugEnabled()){ - log.info("boost for " + ind.getName() + " is " + doc.getDocumentBoost()); - log.debug( doc.toString() ); - } - - server.add( doc ); - log.debug("Added docs to server."); - }else{ - log.debug("removing from index " + ind.getURI()); - removeFromIndex(ind.getURI()); - } - } - } catch (SearchEngineException ex) { - throw new IndexingException(ex.getMessage()); - } - } - - @Override - public boolean isIndexing() { - return indexing; - } - - @Override - public void prepareForRebuild() throws IndexingException { - reindexStart = System.currentTimeMillis(); - doingFullIndexRebuild = true; - } - - @Override - public void removeFromIndex(String uri) throws IndexingException { - if( uri != null ){ - try { -// server.deleteById(individualToSearchDoc.getIdForUri(uri)); - log.debug("deleted " + " " + uri); - } catch (Exception e) { - log.error( "could not delete individual " + uri, e); - } - } - } - - @Override - public synchronized void startIndexing() throws IndexingException { - if( indexing) - log.debug("SearchIndexer.startIndexing() Indexing in progress, waiting for completion..."); - while( indexing && ! shutdownRequested ){ //wait for indexing to end. - try{ wait( 250 ); } - catch(InterruptedException ex){} - } - - log.debug("Starting to index"); - indexing = true; - urisIndexed = new HashSet(); - notifyAll(); - } - - @Override - public void abortIndexingAndCleanUp() { - shutdownRequested = true; - try{ -// individualToSearchDoc.shutdown(); - }catch(Exception e){ - if( log != null) - log.debug(e,e); - } - endIndexing(); - } - - @Override - public synchronized void endIndexing() { - try { - if( doingFullIndexRebuild ){ - removeDocumentsFromBeforeRebuild( ); - } - } catch (Throwable e) { - if( ! shutdownRequested ) - log.debug("could not remove documents from before build, " ,e); - } - try { - server.commit(); - } catch (Throwable e) { - if( ! shutdownRequested ){ - log.debug("could not commit to the search engine, " + - "this should not be a problem since the search engine will do autocommit"); - } - } - indexing = false; - notifyAll(); - } - - protected void removeDocumentsFromBeforeRebuild(){ - try { - server.deleteByQuery("indexedTime:[ * TO " + reindexStart + " ]"); - server.commit(); - } catch (SearchEngineException e) { - if( ! shutdownRequested ) - log.error("could not delete documents from before rebuild.",e); - } - } - - - @Override - public long getModified() { - long modified = 0; - - SearchQuery query = ApplicationUtils.instance().getSearchEngine().createQuery(); - query.setQuery("*:*"); - query.addSortField("indexedTime", Order.DESC); - - try { - SearchResponse rsp = server.query(query); - SearchResultDocumentList docs = rsp.getResults(); - if(docs!=null){ - modified = (Long)docs.get(0).getFirstValue("indexedTime"); - } - } catch (SearchEngineException e) { - log.error(e,e); - } - - return modified; - } - - /** - * Returns true if there are documents in the index, false if there are none, - * and returns false on failure to connect to server. - */ - @Override - public boolean isIndexEmpty() { - try { - return server.documentCount() == 0; - } catch (SearchEngineException e) { - log.error("Could not connect to the search engine." ,e.getCause()); - } - return false; - } - -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/beans/IndexerIface.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/beans/IndexerIface.java deleted file mode 100644 index 00bd2a8f4..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/beans/IndexerIface.java +++ /dev/null @@ -1,58 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.beans; - -import edu.cornell.mannlib.vitro.webapp.beans.Individual; -import edu.cornell.mannlib.vitro.webapp.search.IndexingException; - -/** - * IntexerIface is for objects that will be used by the IndexBuilder. The - * IndexBuilder will manage getting lists of object to index and then use - * an object that implements IndexerIface to stuff the backend index. - * - * An example is SearchIndexer which is set up and associated with a - * IndexBuilder in SearchIndexerSetup. - * - * @author bdc34 - * - */ -public interface IndexerIface { - - /** - * Check if indexing is currently running. - */ - public boolean isIndexing(); - - /** - * Index a document. This should do an update of the - * document in the index of the semantics of the index require it. - * - * @param doc - * @param newDoc - if true, just insert doc, if false attempt to update. - * @throws IndexingException - */ - public void index(Individual ind)throws IndexingException; - - - /** - * Remove a document from the index. - * @param obj - * @throws IndexingException - */ - public void removeFromIndex(String uri) throws IndexingException; - - public void prepareForRebuild() throws IndexingException; - - public void startIndexing() throws IndexingException; - public void endIndexing(); - - public long getModified(); - - /** - * Ends the indexing and removes any temporary files. - * This may be called instead of endIndexing() - */ - public void abortIndexingAndCleanUp(); - - public boolean isIndexEmpty(); -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexController.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexController.java index 62e18f45d..43f3e3676 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexController.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexController.java @@ -24,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.Res import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.TemplateResponseValues; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts; @@ -79,20 +80,20 @@ public class IndexController extends FreemarkerHttpServlet { public static final RequestedAction REQUIRED_ACTIONS = SimplePermission.MANAGE_SEARCH_INDEX.ACTION; private SearchIndexer indexer; - private IndexHistory history; + private static IndexHistory history; @Override public void init() throws ServletException { - super.init(); this.indexer = ApplicationUtils.instance().getSearchIndexer(); - this.history = new IndexHistory(); - this.indexer.addListener(this.history); + super.init(); } - @Override - public void destroy() { - this.indexer.removeListener(this.history); - super.destroy(); + /** + * Called by SearchIndexerSetup to provide a history that dates from + * startup, not just from servlet load time. + */ + public static void setHistory(IndexHistory history) { + IndexController.history = history; } @Override @@ -186,6 +187,10 @@ public class IndexController extends FreemarkerHttpServlet { map.put("expectedCompletion", figureExpectedCompletion(status.getSince(), counts.getTotal(), counts.getProcessed())); + } else if (state == State.REBUILDING) { + RebuildCounts counts = status.getCounts().asRebuildCounts(); + map.put("documentsBefore", counts.getDocumentsBefore()); + map.put("documentsAfter", counts.getDocumentsAfter()); } else { // nothing for IDLE or SHUTDOWN, except what's already there. } @@ -217,7 +222,6 @@ public class IndexController extends FreemarkerHttpServlet { long seconds = (elapsedMillis / 1000L) % 60L; long minutes = (elapsedMillis / 60000L) % 60L; long hours = elapsedMillis / 3600000L; - return new int[] {(int) hours, (int) minutes, (int) seconds}; + return new int[] { (int) hours, (int) minutes, (int) seconds }; } - } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexHistory.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexHistory.java index 663e620a8..0bbac9364 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexHistory.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/controller/IndexHistory.java @@ -9,6 +9,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; @@ -23,12 +26,17 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatu * format them for display in a Freemarker template. */ public class IndexHistory implements SearchIndexer.Listener { + private static final Log log = LogFactory.getLog(IndexHistory.class); + private final static int MAX_EVENTS = 10; private final Deque events = new LinkedList<>(); @Override public void receiveSearchIndexerEvent(Event event) { + if (log.isInfoEnabled()) { + log.info(event); + } synchronized (events) { events.addFirst(event); while (events.size() > MAX_EVENTS) { @@ -89,7 +97,8 @@ public class IndexHistory implements SearchIndexer.Listener { } private void addCounts(RebuildCounts counts, Map map) { - map.put("numberOfIndividuals", counts.getNumberOfIndividuals()); + map.put("documentsBefore", counts.getDocumentsBefore()); + map.put("documentsAfter", counts.getDocumentsAfter()); } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/documentBuilding/SkipIndividualException.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/documentBuilding/SkipIndividualException.java deleted file mode 100644 index 19f643c1a..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/documentBuilding/SkipIndividualException.java +++ /dev/null @@ -1,11 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.documentBuilding; - -public class SkipIndividualException extends Exception{ - - public SkipIndividualException(String string) { - super(string); - } - -} \ No newline at end of file diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java deleted file mode 100644 index 4aa5a3ca7..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java +++ /dev/null @@ -1,546 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.indexing; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpSession; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.hp.hpl.jena.query.QueryParseException; -import com.hp.hpl.jena.rdf.model.ResourceFactory; -import com.hp.hpl.jena.rdf.model.Statement; - -import edu.cornell.mannlib.vitro.webapp.beans.Individual; -import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; -import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; -import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface; -import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder; -import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; - - -/** - * The IndexBuilder is used to rebuild or update a search index. - * There should only be one IndexBuilder in a vitro web application. - * It uses an implementation of a back-end through an object that - * implements IndexerIface. An example of a back-end is SearchIndexer. - * - * See the class SearchReindexingListener for an example of how a model change - * listener can use an IndexBuilder to keep the full text index in sncy with - * updates to a model. It calls IndexBuilder.addToChangedUris(). - */ -public class IndexBuilder extends VitroBackgroundThread { - private WebappDaoFactory wdf; - private final IndexerIface indexer; - - /** Statements that have changed in the model. The SearchReindexingListener - * and other similar objects will use methods on IndexBuilder to add statements - * to this queue. - */ - //private final ConcurrentLinkedQueue changedStmtQueue = new ConcurrentLinkedQueue(); - private final HashSet changedStmts = new HashSet(); - - /** This is a list of objects that will compute what URIs need to be - * updated in the search index when a statement changes. */ - private final List stmtToURIsToIndexFunctions; - - /** Indicates that a full index re-build has been requested. */ - private volatile boolean reindexRequested = false; - - /** Indicates that a stop of the indexing objects has been requested. */ - private volatile boolean stopRequested = false; - - /** Indicates that new updates should not be started. */ - private boolean deferNewUpdates = false; - - /** Length of time to wait before looking for work (if not wakened sooner). */ - public static final long MAX_IDLE_INTERVAL = 1000 * 60 /* msec */ ; - - /** Length of pause between when work comes into queue to when indexing starts */ - public static final long WAIT_AFTER_NEW_WORK_INTERVAL = 500; //msec - - /** Flag so we can tell that the index is being updated. */ - public static final String FLAG_UPDATING = "updating"; - - /** Flag so we can tell that the index is being rebuilt. */ - public static final String FLAG_REBUILDING = "rebuilding"; - - /** List of IndexingEventListeners */ - protected LinkedList indexingEventListeners = - new LinkedList(); - - /** number of threads to use during a full index rebuild. */ - public static final int REINDEX_THREADS= 10; - - /** Max threads to use during an update. Smaller updates will use fewer threads. */ - public static final int MAX_UPDATE_THREADS= 10; - - /** Number of individuals to index per update thread. */ - public static final int URIS_PER_UPDATE_THREAD = 50; - - private static final Log log = LogFactory.getLog(IndexBuilder.class); - - public static IndexBuilder getBuilder(ServletContext ctx) { - Object o = ctx.getAttribute(IndexBuilder.class.getName()); - if (o instanceof IndexBuilder) { - return (IndexBuilder) o; - } else { - log.error("IndexBuilder has not been initialized."); - return null; - } - } - - public IndexBuilder(IndexerIface indexer, - WebappDaoFactory wdf, - List stmtToURIsToIndexFunctions ){ - super("IndexBuilder"); - - this.indexer = indexer; - - this.wdf = wdf; - - if( stmtToURIsToIndexFunctions != null ) - this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions; - else - this.stmtToURIsToIndexFunctions = Collections.emptyList(); - - this.start(); - } - - protected IndexBuilder(){ - //for testing only - this( null, null, null); - } - - /** - * Use this method to add URIs that need to be indexed. Should be - * able to add to changedStmtQueue while indexing is in process. - * - * If you have a statement that has been added or removed from the - * RDF model and you would like it to take effect in the search - * index this is the method you should use. Follow the adding of - * your changes with a call to doUpdateIndex(). - */ - public void addToChanged(Statement stmt) { - log.debug("call to addToChanged(Statement)"); - synchronized(changedStmts){ - changedStmts.add(stmt); - } - } - - /** - * Convenience method to add a URI to the change queue. - */ - public void addToChanged(String uri){ - addToChanged(ResourceFactory.createStatement( - ResourceFactory.createResource(uri), - ResourceFactory.createProperty("http://ex.com/f"), - ResourceFactory.createPlainLiteral("added by IndexBuilder.addToChanged(uri)"))); - } - - /** - * This method will cause the IndexBuilder to completely rebuild - * the index. - */ - public synchronized void doIndexRebuild() { - log.debug("call to doIndexRebuild()"); - //set flag for full index rebuild - this.reindexRequested = true; - //wake up - this.notifyAll(); - } - - /** - * This will re-index Individuals were added with addToChanged(). - */ - public synchronized void doUpdateIndex() { - log.debug("call to doUpdateIndex()"); - //wake up thread and it will attempt to index anything in changedUris - this.notifyAll(); - } - - /** - * Add a listener for indexing events. Methods on listener will be called when - * events happen in the IndexBuilder. This is not a Jena ModelListener. - */ - public synchronized void addIndexBuilderListener(IndexingEventListener listener){ - indexingEventListeners.add(listener); - } - - /** - * This is called when the system shuts down. - */ - public synchronized void stopIndexingThread() { - stopRequested = true; - this.notifyAll(); - this.interrupt(); - } - - /** - * Calling this will cause the IndexBuider to no start a new index update - * until unpuase is called. This is intended to allow a large change - * without slowing it down with incremental search index updates. - */ - public synchronized void pause(){ - this.deferNewUpdates = true; - } - - public synchronized void unpause(){ - if( deferNewUpdates == true ){ - this.deferNewUpdates = false; - this.notifyAll(); - this.interrupt(); - } - } - - @Override - public void run() { - while(! stopRequested ){ - try{ - if ( deferNewUpdates ){ - log.debug("there is no indexing working to do, waiting for work"); - synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } - } - else if ( reindexRequested ){ - setWorkLevel(WorkLevel.WORKING, FLAG_REBUILDING); - log.debug("full re-index requested"); - - notifyListeners( IndexingEventListener.EventTypes.START_FULL_REBUILD ); - indexRebuild(); - notifyListeners( IndexingEventListener.EventTypes.FINISH_FULL_REBUILD ); - - setWorkLevel(WorkLevel.IDLE); - } - else{ - boolean workToDo = false; - synchronized (changedStmts ){ - workToDo = !changedStmts.isEmpty(); - } - if( workToDo ){ - setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING); - - //wait a bit to let a bit more work to come into the queue - Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); - log.debug("work found for IndexBuilder, starting update"); - - notifyListeners( IndexingEventListener.EventTypes.START_UPDATE ); - updatedIndex(); - notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE ); - setWorkLevel(WorkLevel.IDLE); - } else { - log.debug("there is no indexing working to do, waiting for work"); - synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } - } - } - } catch (InterruptedException e) { - log.debug("woken up",e); - }catch(Throwable e){ - log.error(e,e); - } - } - - if( indexer != null) - indexer.abortIndexingAndCleanUp(); - } - - - public static void checkIndexOnRootLogin(HttpServletRequest req){ - HttpSession session = req.getSession(); - ServletContext context = session.getServletContext(); - IndexBuilder indexBuilder = (IndexBuilder)context.getAttribute(IndexBuilder.class.getName()); - - log.debug("Checking if the index is empty"); - if(indexBuilder.indexer.isIndexEmpty()){ - log.info("Search index is empty. Running a full index rebuild."); - indexBuilder.doIndexRebuild(); - } - } - - - /* ******************** non-public methods ************************* */ - - /** - * Take the changed statements from the queue and determine which URIs that need to be updated in - * the index. - */ - private Collection changedStatementsToUris(){ - //inform StatementToURIsToUpdate that index is starting - for( IndexingUriFinder stu : stmtToURIsToIndexFunctions ) { - stu.startIndexing(); - } - - //keep uris unique by using a HashSet - Collection urisToUpdate = new HashSet(); - Statement[] changedStatements = getAndClearChangedStmts(); - int howManyChanges = changedStatements.length; - - if (howManyChanges > 100) { - log.info("Finding URIs that are affected by " + howManyChanges - + " changed statements."); - } - - for (int i = 0; i < howManyChanges; i++) { - Statement stmt = changedStatements[i]; - for (IndexingUriFinder stu : stmtToURIsToIndexFunctions) { - urisToUpdate.addAll(stu.findAdditionalURIsToIndex(stmt)); - } - if ((i > 0) && (i % 1000 == 0)) { - log.info("Processed " + i + " changed statements; found " - + urisToUpdate.size() + " affected URIs."); - } - } - - //inform StatementToURIsToUpdate that they are done - for( IndexingUriFinder stu : stmtToURIsToIndexFunctions ) { - stu.endIndexing(); - } - - return urisToUpdate; - } - - private Statement[] getAndClearChangedStmts(){ - //get the statements that changed - Statement[] stmts = null; - synchronized( changedStmts ){ - stmts = new Statement[changedStmts.size()]; - stmts = changedStmts.toArray( stmts ); - changedStmts.clear(); - } - return stmts; - } - - /** - * Take the URIs that we got from the changedStmtQueue, and create the lists - * of updated URIs and deleted URIs. - */ - private UriLists makeAddAndDeleteLists(Collection uris) { - IndividualDao indDao = wdf.getIndividualDao(); - - UriLists uriLists = new UriLists(); - for (String uri : uris) { - if (uri != null) { - try { - Individual ind = indDao.getIndividualByURI(uri); - if (ind != null) { - log.debug("uri to update or add to search index: " + uri); - uriLists.updatedUris.add(uri); - } else { - log.debug("found delete in changed uris: " + uri); - uriLists.deletedUris.add(uri); - } - } catch (QueryParseException ex) { - log.error("could not get Individual " + uri, ex); - } - } - } - return uriLists; - } - - /** - * This rebuilds the whole index. - */ - protected void indexRebuild() { - log.info("Rebuild of search index is starting."); - - // clear out changed URIs since we are doing a full index rebuild - changedStmts.clear(); - - log.debug("Getting all URIs in the model"); - Iterator uris = wdf.getIndividualDao().getAllOfThisTypeIterator(); - - doBuild(uris, Collections.emptyList(), REINDEX_THREADS ); - - if( log != null ) //log might be null if system is shutting down. - log.info("Rebuild of search index is complete."); - } - - protected void updatedIndex() { - log.debug("Starting updateIndex()"); - - UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() ); - int numberOfThreads = - Math.min( MAX_UPDATE_THREADS, - Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1)); - - doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris , numberOfThreads); - - log.debug("Ending updateIndex()"); - } - - /** - * For each sourceIterator, get all of the objects and attempt to - * index them. - * - * This takes a list of source Iterators and, for each of these, - * calls indexForSource. - * - * @param sourceIterators - * @param newDocs true if we know that the document is new. Set - * to false if we want to attempt to remove the object from the index before - * attempting to index it. If an object is not on the list but you set this - * to false, and a check is made before adding, it will work fine; but - * checking if an object is on the index is slow. - */ - private void doBuild(Iterator updates, Collection deletes, int numberOfThreads ){ - boolean updateRequested = ! reindexRequested; - - try { - if( reindexRequested ){ - indexer.prepareForRebuild(); - } - - indexer.startIndexing(); - reindexRequested = false; - - if( updateRequested ){ - //if this is not a full reindex, deleted indivdiuals need to be removed from the index - for(String deleteMe : deletes ){ - try{ - indexer.removeFromIndex(deleteMe); - }catch(Exception ex){ - log.debug("could not remove individual " + deleteMe - + " from index, usually this is harmless",ex); - } - } - } - - indexUriList(updates, numberOfThreads); - - } catch (Exception e) { - if( log != null) log.debug("Exception during indexing",e); - } - - indexer.endIndexing(); - } - - /** - * Use the back end indexer to index each object that the Iterator returns. - * @throws AbortIndexing - */ - private void indexUriList(Iterator updateUris , int numberOfThreads) { - //make lists of work URIs for workers - List> workLists = makeWorkerUriLists(updateUris, numberOfThreads); - - //setup workers with work - List workers = new ArrayList(); - for(int i = 0; i< numberOfThreads ;i++){ - Iterator workToDo = new UriToIndividualIterator(workLists.get(i), wdf); - workers.add( new IndexWorkerThread(indexer, i, workToDo) ); - } - - // reset the counters so we can monitor the progress - IndexWorkerThread.resetCounters(System.currentTimeMillis(), figureWorkLoad(workLists)); - - log.debug("Starting the building and indexing of documents in worker threads"); - // starting worker threads - for(int i =0; i < numberOfThreads; i++){ - workers.get(i).start(); - } - - //waiting for all the work to finish - for(int i =0; i < numberOfThreads; i++){ - try{ - workers.get(i).join(); - }catch(InterruptedException e){ - //this thread will get interrupted if the system is trying to shut down. - if( log != null ) - log.debug(e,e); - for( IndexWorkerThread thread: workers){ - thread.requestStop(); - } - return; - } - } - } - - protected class UriToIndividualIterator implements Iterator{ - private final Iterator uris; - private final WebappDaoFactory wdf; - - public UriToIndividualIterator( Iterator uris, WebappDaoFactory wdf){ - this.uris= uris; - this.wdf = wdf; - } - - public UriToIndividualIterator( List uris, WebappDaoFactory wdf){ - this.uris= uris.iterator(); - this.wdf = wdf; - } - - @Override - public boolean hasNext() { - return uris.hasNext(); - } - - /** may return null */ - @Override - public Individual next() { - String uri = uris.next(); - return wdf.getIndividualDao().getIndividualByURI(uri); - } - - @Override - public void remove() { - throw new IllegalAccessError(""); - } - } - - private static List> makeWorkerUriLists(Iterator uris,int workers){ - List> work = new ArrayList>(workers); - for(int i =0; i< workers; i++){ - work.add( new ArrayList() ); - } - - int counter = 0; - while(uris.hasNext()){ - work.get( counter % workers ).add( uris.next() ); - counter ++; - } - log.info("Number of individuals to be indexed : " + counter + " by " - + workers + " worker threads."); - return work; - } - - private long figureWorkLoad(List> workLists) { - long load = 0; - for (List list: workLists) { - load += list.size(); - } - return load; - } - - public long getCompletedCount() { - return IndexWorkerThread.getCount(); - } - - public long getTotalToDo() { - return IndexWorkerThread.getCountToIndex(); - } - - private static class UriLists { - private final List updatedUris = new ArrayList(); - private final List deletedUris = new ArrayList(); - } - - protected void notifyListeners(IndexingEventListener.EventTypes event){ - for ( IndexingEventListener listener : indexingEventListeners ){ - try{ - if(listener != null ) - listener.notifyOfIndexingEvent( event ); - }catch(Throwable th){ - log.error("problem during NotifyListeners(): " , th); - } - } - } -} - diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java deleted file mode 100644 index 59b5aef39..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java +++ /dev/null @@ -1,108 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.indexing; - -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import edu.cornell.mannlib.vitro.webapp.beans.Individual; -import edu.cornell.mannlib.vitro.webapp.search.IndexingException; -import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface; -import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; - -class IndexWorkerThread extends VitroBackgroundThread{ - private static final Log log = LogFactory.getLog(IndexWorkerThread.class); - - protected final int threadNum; - protected final IndexerIface indexer; - protected final Iterator individualsToIndex; - protected volatile boolean stopRequested = false; - - private static AtomicLong countCompleted= new AtomicLong(); - private static AtomicLong countToIndex= new AtomicLong(); - private static long starttime = 0; - - public IndexWorkerThread(IndexerIface indexer, int threadNum , Iterator individualsToIndex){ - super("IndexWorkerThread"+threadNum); - this.indexer = indexer; - this.threadNum = threadNum; - this.individualsToIndex = individualsToIndex; - } - - public void requestStop(){ - stopRequested = true; - } - - @Override - public void run(){ - setWorkLevel(WorkLevel.WORKING, "indexing " + individualsToIndex + " individuals"); - - while( ! stopRequested ){ - - //do the actual indexing work - log.debug("work found for Worker number " + threadNum); - addDocsToIndex(); - - // done so shut this thread down. - stopRequested = true; - } - setWorkLevel(WorkLevel.IDLE); - - log.debug("Worker number " + threadNum + " exiting."); - } - - protected void addDocsToIndex() { - - while( individualsToIndex.hasNext() ){ - //need to stop right away if requested to - if( stopRequested ) return; - try{ - //build the document and add it to the index - Individual ind = null; - try { - ind = individualsToIndex.next(); - indexer.index( ind ); - } catch (IndexingException e) { - if( stopRequested ) - return; - - if( ind != null ) - log.error("Could not index individual " + ind.getURI() , e ); - else - log.warn("Could not index, individual was null"); - } - - - long countNow = countCompleted.incrementAndGet(); - if( log.isInfoEnabled() ){ - if( (countNow % 100 ) == 0 ){ - long dt = (System.currentTimeMillis() - starttime); - log.info("individuals indexed: " + countNow + " in " + dt + " msec " + - " time per individual = " + (dt / countNow) + " msec" ); - } - } - }catch(Throwable th){ - //on tomcat shutdown odd exceptions get thrown - if( ! stopRequested ) - log.error("Exception during index building",th); - } - } - } - - public static void resetCounters(long time, long workload) { - IndexWorkerThread.starttime = time; - IndexWorkerThread.countToIndex.set(workload); - IndexWorkerThread.countCompleted.set(0); - } - - public static long getCount() { - return countCompleted.get(); - } - - public static long getCountToIndex() { - return countToIndex.get(); - } -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexingEventListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexingEventListener.java deleted file mode 100644 index 0f3e301f7..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexingEventListener.java +++ /dev/null @@ -1,19 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ -package edu.cornell.mannlib.vitro.webapp.search.indexing; - -/** - * Classes that implement this interface can get informed of - * events that happen to the IndexBuilder. - */ -public interface IndexingEventListener { - - public enum EventTypes { - START_UPDATE, - FINISHED_UPDATE, - START_FULL_REBUILD, - FINISH_FULL_REBUILD - } - - - public void notifyOfIndexingEvent(EventTypes ie); -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java deleted file mode 100644 index 6327608c8..000000000 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java +++ /dev/null @@ -1,171 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.indexing; - -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import com.hp.hpl.jena.rdf.model.Model; -import com.hp.hpl.jena.rdf.model.ModelChangedListener; -import com.hp.hpl.jena.rdf.model.ModelFactory; -import com.hp.hpl.jena.rdf.model.Resource; -import com.hp.hpl.jena.rdf.model.Statement; -import com.hp.hpl.jena.rdf.model.StmtIterator; -import com.hp.hpl.jena.shared.Lock; - -import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; - -/** - * This class is thread safe. Notice that doAsyncIndexBuild() is frequently - * called because the inference system does not seem to send notifyEvents. - */ -public class SearchReindexingListener implements ModelChangedListener { - private IndexBuilder indexBuilder; - - /** Model just for creating statements */ - private Model statementCreator; - - public SearchReindexingListener(IndexBuilder indexBuilder ) { - if(indexBuilder == null ) - throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null"); - this.indexBuilder = indexBuilder; - this.statementCreator = ModelFactory.createDefaultModel(); - log.debug("new SearchReindexingListener"); - } - - private synchronized void addChange(Statement stmt){ - if( stmt == null ) return; - if( log.isDebugEnabled() ){ - String sub="unknown"; - String pred = "unknown"; - String obj ="unknown"; - - if( stmt.getSubject().isURIResource() ){ - sub = stmt.getSubject().getURI(); - } - if( stmt.getPredicate() != null ){ - pred = stmt.getPredicate().getURI(); - } - if( stmt.getObject().isURIResource() ){ - obj = ((Resource) (stmt.getObject().as(Resource.class))).getURI(); - }else{ - obj = stmt.getObject().toString(); - } - log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'"); - } - - indexBuilder.addToChanged( copyStmt(stmt) ); - } - - private void requestAsyncIndexUpdate(){ - log.debug("requestAsyncIndexUpdate()"); - indexBuilder.doUpdateIndex(); - } - - /** - * Create a copy of the statement to make sure that - * it is not associated with any Models. This is - * to avoid the Models being held back from GC. - */ - private Statement copyStmt( Statement in){ - return statementCreator.createStatement(in.getSubject(),in.getPredicate(),in.getObject()); - } - - @Override - public void notifyEvent(Model arg0, Object arg1) { - if ( (arg1 instanceof EditEvent) ){ - EditEvent editEvent = (EditEvent)arg1; - if( !editEvent.getBegin() ){// editEvent is the end of an edit - log.debug("Doing search index build at end of EditEvent"); - requestAsyncIndexUpdate(); - } - } else{ - log.debug("ignoring event " + arg1.getClass().getName() + " "+ arg1 ); - } - } - - @Override - public void addedStatement(Statement stmt) { - addChange(stmt); - requestAsyncIndexUpdate(); - } - - @Override - public void removedStatement(Statement stmt){ - addChange(stmt); - requestAsyncIndexUpdate(); - } - - private static final Log log = LogFactory.getLog(SearchReindexingListener.class.getName()); - - @Override - public void addedStatements(Statement[] arg0) { - for( Statement s: arg0){ - addChange(s); - } - requestAsyncIndexUpdate(); - } - - @Override - public void addedStatements(List arg0) { - for( Statement s: arg0){ - addChange(s); - } - requestAsyncIndexUpdate(); - } - - @Override - public void addedStatements(StmtIterator arg0) { - try{ - while(arg0.hasNext()){ - Statement s = arg0.nextStatement(); - addChange(s); - } - }finally{ - arg0.close(); - } - requestAsyncIndexUpdate(); - } - - @Override - public void addedStatements(Model m) { - m.enterCriticalSection(Lock.READ); - StmtIterator it = null; - try{ - it = m.listStatements(); - while(it.hasNext()){ - addChange(it.nextStatement()); - } - }finally{ - if( it != null ) it.close(); - m.leaveCriticalSection(); - } - requestAsyncIndexUpdate(); - } - - @Override - public void removedStatements(Statement[] arg0) { - //same as add stmts - this.addedStatements(arg0); - } - - @Override - public void removedStatements(List arg0) { - //same as add - this.addedStatements(arg0); - } - - @Override - public void removedStatements(StmtIterator arg0) { - //same as add - this.addedStatements(arg0); - } - - @Override - public void removedStatements(Model arg0) { - //same as add - this.addedStatements(arg0); - } -} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java new file mode 100644 index 000000000..b9012189c --- /dev/null +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/IndexingChangeListener.java @@ -0,0 +1,184 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.searchindex; + +import java.io.ByteArrayInputStream; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.Statement; +import com.hp.hpl.jena.rdf.model.StmtIterator; + +import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; +import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; + +/** + * When a change is heard, wait for an interval to see if more changes come in. + * When changes stop coming in for a specified interval, send what has + * accumulated. + */ +public class IndexingChangeListener implements ChangeListener { + private static final Log log = LogFactory + .getLog(IndexingChangeListener.class); + + private final SearchIndexer searchIndexer; + private final Ticker ticker; + + /** All access to the list must be synchronized. */ + private final List changes; + + public IndexingChangeListener(SearchIndexer searchIndexer) { + this.searchIndexer = searchIndexer; + this.ticker = new Ticker(); + this.changes = new ArrayList<>(); + } + + private synchronized void noteChange(Statement stmt) { + changes.add(stmt); + ticker.start(); + } + + private synchronized void respondToTicker() { + searchIndexer.scheduleUpdatesForStatements(changes); + changes.clear(); + } + + public void shutdown() { + ticker.shutdown(); + } + + @Override + public void addedStatement(String serializedTriple, String graphURI) { + noteChange(parseTriple(serializedTriple)); + } + + @Override + public void removedStatement(String serializedTriple, String graphURI) { + noteChange(parseTriple(serializedTriple)); + } + + /** + * We only care about events that signal the end of an edit operation. + */ + @Override + public void notifyEvent(String graphURI, Object event) { + if ((event instanceof EditEvent)) { + EditEvent editEvent = (EditEvent) event; + if (!editEvent.getBegin()) { // editEvent is the end of an edit + log.debug("Doing search index build at end of EditEvent"); + ticker.start(); + } + } else { + log.debug("ignoring event " + event.getClass().getName() + " " + + event); + } + } + + // TODO avoid overhead of Model. + // TODO avoid duplication with JenaChangeListener + private Statement parseTriple(String serializedTriple) { + try { + Model m = ModelFactory.createDefaultModel(); + m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), + null, "N3"); + StmtIterator sit = m.listStatements(); + if (!sit.hasNext()) { + throw new RuntimeException("no triple parsed from change event"); + } else { + Statement s = sit.nextStatement(); + if (sit.hasNext()) { + log.warn("More than one triple parsed from change event"); + } + return s; + } + } catch (RuntimeException riot) { + log.error("Failed to parse triple " + serializedTriple, riot); + throw riot; + } catch (UnsupportedEncodingException uee) { + throw new RuntimeException(uee); + } + } + + // ---------------------------------------------------------------------- + // helper classes + // ---------------------------------------------------------------------- + + /** + * The ticker will ask for a response after two ticks, unless it is started + * again before the second one. + * + *
+	 * On a call to start():
+	 *    Start the timer unless it is already running.
+	 *    Reset the hasOneTick flag.
+	 *    
+	 * When the timer expires:
+	 *    If the timer hasOneTick, we're done: call for a response.
+	 *    Otherwise, record that it hasOneTick, and keep the timer running.
+	 * 
+ * + * All methods are synchronized on the enclosing IndexingChangeListener. + */ + private class Ticker { + private final ScheduledExecutorService queue; + private volatile boolean running; + private volatile boolean hasOneTick; + + public Ticker() { + this.queue = Executors.newScheduledThreadPool(1, + new VitroBackgroundThread.Factory( + "IndexingChangeListener_Ticker")); + } + + public void shutdown() { + synchronized (IndexingChangeListener.this) { + this.queue.shutdown(); + } + } + + public void start() { + synchronized (IndexingChangeListener.this) { + if (!running) { + startTicker(); + } + hasOneTick = false; + } + } + + private void startTicker() { + if (queue.isShutdown()) { + log.warn("Attempt to start ticker after shutdown request."); + } else { + queue.schedule(new TickerResponse(), 500, TimeUnit.MILLISECONDS); + running = true; + } + } + + private class TickerResponse implements Runnable { + @Override + public void run() { + synchronized (IndexingChangeListener.this) { + running = false; + if (hasOneTick) { + respondToTicker(); + hasOneTick = false; + } else { + startTicker(); + hasOneTick = true; + } + } + } + } + } +} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java index df5877ff8..24fa1b2ea 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerImpl.java @@ -26,6 +26,8 @@ import javax.servlet.ServletContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.hp.hpl.jena.rdf.model.Statement; + import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; @@ -40,6 +42,7 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentMod import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder; import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder; import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.RebuildIndexTask; +import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.UpdateStatementsTask; import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.UpdateUrisTask; import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoader; import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoaderException; @@ -105,16 +108,48 @@ public class SearchIndexerImpl implements SearchIndexer { return new WebappDaoFactoryFiltering(rawWadf, vf); } + @Override + public void scheduleUpdatesForStatements(List changes) { + if (changes == null || changes.isEmpty()) { + return; + } + + if (taskQueue.isShutdown()) { + throw new IllegalStateException("SearchIndexer is shut down."); + } + + Task task = new UpdateStatementsTask(changes, uriFinders, excluders, + modifiers, wadf.getIndividualDao(), listeners, pool); + scheduler.scheduleTask(task); + log.debug("Scheduled updates for " + changes.size() + " statements."); + } + @Override public void scheduleUpdatesForUris(Collection uris) { - log.debug("Schedule updates for " + uris.size() + " uris."); - scheduler.scheduleTask(new UpdateUrisTask(uris, excluders, modifiers, - wadf.getIndividualDao(), listeners, pool)); + if (uris == null || uris.isEmpty()) { + return; + } + + if (taskQueue.isShutdown()) { + throw new IllegalStateException("SearchIndexer is shut down."); + } + + Task task = new UpdateUrisTask(uris, excluders, modifiers, + wadf.getIndividualDao(), listeners, pool); + scheduler.scheduleTask(task); + log.debug("Scheduled updates for " + uris.size() + " uris."); } @Override public void rebuildIndex() { - scheduler.scheduleTask(new RebuildIndexTask()); + if (taskQueue.isShutdown()) { + throw new IllegalStateException("SearchIndexer is shut down."); + } + + Task task = new RebuildIndexTask(excluders, modifiers, + wadf.getIndividualDao(), listeners, pool); + scheduler.scheduleTask(task); + log.debug("Scheduled a full rebuild."); } @Override @@ -148,6 +183,7 @@ public class SearchIndexerImpl implements SearchIndexer { if (status.getState() != State.SHUTDOWN) { listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status)); + pool.shutdown(); taskQueue.shutdown(); for (DocumentModifier dm : modifiers) { @@ -230,7 +266,8 @@ public class SearchIndexerImpl implements SearchIndexer { paused = false; for (Task task : deferredQueue) { taskQueue.scheduleTask(task); - log.debug("moved task from deferred queue to task queue: " + task); + log.debug("moved task from deferred queue to task queue: " + + task); } } } @@ -275,6 +312,10 @@ public class SearchIndexerImpl implements SearchIndexer { } } + public boolean isShutdown() { + return queue.isShutdown(); + } + /** When this wrapper is run, we will know the current task and status. */ private class TaskWrapper implements Runnable { private final Task task; diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java index 30da194f1..4b4784398 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/SearchIndexerSetup.java @@ -2,11 +2,6 @@ package edu.cornell.mannlib.vitro.webapp.searchindex; -import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY; - -import java.util.ArrayList; -import java.util.List; - import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; @@ -14,92 +9,75 @@ import javax.servlet.ServletContextListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.hp.hpl.jena.ontology.OntModel; - import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils; -import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; -import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; -import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; -import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters; -import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext; -import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; -import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; -import edu.cornell.mannlib.vitro.webapp.search.SearchIndexer; -import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder; -import edu.cornell.mannlib.vitro.webapp.search.indexing.SearchReindexingListener; -import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder; +import edu.cornell.mannlib.vitro.webapp.modules.Application; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; +import edu.cornell.mannlib.vitro.webapp.search.controller.IndexController; +import edu.cornell.mannlib.vitro.webapp.search.controller.IndexHistory; import edu.cornell.mannlib.vitro.webapp.startup.ComponentStartupStatusImpl; import edu.cornell.mannlib.vitro.webapp.startup.StartupStatus; -import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoader; -import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoaderException; import edu.cornell.mannlib.vitro.webapp.utils.developer.Key; -import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisabledModelChangeListener; +import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisabledChangeListener; /** - * TODO A silly implementation that just wraps the old IndexBuilder with a new - * SearchIndexerImpl. + * Start the SearchIndexer. Create a listener on the RDFService and link it to + * the indexer. Create a history object as a listener and make it avaiable to + * the IndexController. */ public class SearchIndexerSetup implements ServletContextListener { private static final Log log = LogFactory.getLog(SearchIndexerSetup.class); private ServletContext ctx; - private OntModel displayModel; - private ConfigurationBeanLoader beanLoader; + private Application app; + private SearchIndexer searchIndexer; + private IndexingChangeListener listener; + private DeveloperDisabledChangeListener listenerWrapper; + private IndexHistory history; @Override public void contextInitialized(ServletContextEvent sce) { - this.ctx = sce.getServletContext(); - this.displayModel = ModelAccess.on(ctx).getOntModel(DISPLAY); - this.beanLoader = new ConfigurationBeanLoader(displayModel, ctx); + ctx = sce.getServletContext(); + app = ApplicationUtils.instance(); - ServletContext context = sce.getServletContext(); - StartupStatus ss = StartupStatus.getBean(context); - SearchEngine searchEngine = ApplicationUtils.instance() - .getSearchEngine(); + StartupStatus ss = StartupStatus.getBean(ctx); - { // >>>>> TODO - try { -// /* setup search indexer */ -// SearchIndexer searchIndexer = new SearchIndexer(searchEngine, -// indToSearchDoc); -// -// // Make the IndexBuilder -// IndexBuilder builder = new IndexBuilder(searchIndexer, wadf, -// uriFinders); -// -// // Create listener to notify index builder of changes to model -// // (can be disabled by developer setting.) -// ModelContext -// .registerListenerForChanges( -// context, -// new DeveloperDisabledModelChangeListener( -// new SearchReindexingListener(builder), -// Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER)); -// -// ss.info(this, "Setup of search indexer completed."); -// - } catch (Throwable e) { - ss.fatal(this, "could not setup search engine", e); - } + try { + searchIndexer = app.getSearchIndexer(); + + listener = new IndexingChangeListener(searchIndexer); + + listenerWrapper = new DeveloperDisabledChangeListener(listener, + Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER); + RDFServiceUtils.getRDFServiceFactory(ctx).registerListener( + listenerWrapper); + + this.history = new IndexHistory(); + searchIndexer.addListener(this.history); + IndexController.setHistory(this.history); + + searchIndexer + .startup(app, new ComponentStartupStatusImpl(this, ss)); + + ss.info(this, "Setup of search indexer completed."); + } catch (RDFServiceException e) { + ss.fatal(this, "Failed to register the model changed listener.", e); } - ApplicationUtils - .instance() - .getSearchIndexer() - .startup(ApplicationUtils.instance(), - new ComponentStartupStatusImpl(this, ss)); } @Override public void contextDestroyed(ServletContextEvent sce) { - ApplicationUtils.instance().getSearchIndexer() - .shutdown(ApplicationUtils.instance()); + searchIndexer.shutdown(app); - { // >>>>> TODO - IndexBuilder builder = (IndexBuilder) sce.getServletContext() - .getAttribute(IndexBuilder.class.getName()); - if (builder != null) - builder.stopIndexingThread(); + searchIndexer.removeListener(this.history); + + try { + RDFServiceUtils.getRDFServiceFactory(ctx).unregisterListener( + listenerWrapper); + } catch (RDFServiceException e) { + log.warn("Failed to unregister the indexing listener."); } + listener.shutdown(); } - } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/indexing/IndexingUriFinder.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/indexing/IndexingUriFinder.java index e86c07d0f..c622de01c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/indexing/IndexingUriFinder.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/indexing/IndexingUriFinder.java @@ -7,23 +7,37 @@ import java.util.List; import com.hp.hpl.jena.rdf.model.Statement; /** - * Interface to use with IndexBuilder to find more URIs to index given a changed statement. - * The statement may have been added or removed from the model. + * Interface to use with IndexBuilder to find more URIs to index given a changed + * statement. The statement may have been added or removed from the model. + * + * Implementing classes must be threadsafe, as multiple threads are used to + * complete the task. + * + * The life-cycle is as follows: startIndexing(), 0 or more calls to + * findAdditionalURIsToIndex(), endIndexing(). + * + * Repeat as desired. */ public interface IndexingUriFinder { - - /** - * For the domain that is the responsibility of the given implementation, - * calculate the URIs that need to be updated in the search index. - * The URIs in the list will be updated by the IndexBuilder, which will - * handle URIs of new individuals, URIs of individuals that have changes, - * and URIs of individuals that have been removed from the model. - * - * @return List of URIs. - */ - List findAdditionalURIsToIndex(Statement stmt); - - void startIndexing(); - - void endIndexing(); + + /** + * For the domain that is the responsibility of the given implementation, + * calculate the URIs that need to be updated in the search index. The URIs + * in the list will be updated by the IndexBuilder, which will handle URIs + * of new individuals, URIs of individuals that have changes, and URIs of + * individuals that have been removed from the model. + * + * @return List of URIs. Never return null. + */ + List findAdditionalURIsToIndex(Statement stmt); + + /** + * Indicates that a new collection of statements is about to be processed. + */ + void startIndexing(); + + /** + * Indicates that the collection of statements being processed is complete. + */ + void endIndexing(); } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/FindUrisForStatementWorkUnit.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/FindUrisForStatementWorkUnit.java new file mode 100644 index 000000000..06e6a0c7e --- /dev/null +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/FindUrisForStatementWorkUnit.java @@ -0,0 +1,44 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.searchindex.tasks; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import com.hp.hpl.jena.rdf.model.Statement; + +import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder; + +/** + * Ask all of the URI Finders to find URIs that might be affected by this + * statement. + */ +public class FindUrisForStatementWorkUnit implements Runnable { + private final Statement stmt; + private final Collection uriFinders; + private final Set uris; + + public FindUrisForStatementWorkUnit(Statement stmt, + Collection uriFinders) { + this.stmt = stmt; + this.uriFinders = uriFinders; + this.uris = new HashSet<>(); + } + + @Override + public void run() { + for (IndexingUriFinder uriFinder : uriFinders) { + uris.addAll(uriFinder.findAdditionalURIsToIndex(stmt)); + } + } + + public Statement getStatement() { + return stmt; + } + + public Set getUris() { + return uris; + } + +} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java index 752abdda7..bc6cf9426 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/RebuildIndexTask.java @@ -2,40 +2,132 @@ package edu.cornell.mannlib.vitro.webapp.searchindex.tasks; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_REBUILD; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_REBUILD; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.REBUILDING; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils; +import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; +import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine; +import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; +import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier; +import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder; /** - * TODO + * Get the URIs of all individuals in the model. Update each of their search + * documents. + * + * Delete all search documents that have not been updated since this rebuild + * began. That removes all obsolete documents from the index. */ public class RebuildIndexTask implements Task { + private static final Log log = LogFactory.getLog(RebuildIndexTask.class); - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - // TODO Auto-generated method stub - throw new RuntimeException("RebuildIndexTask.run() not implemented."); + private final IndividualDao indDao; + private final List excluders; + private final List modifiers; + private final ListenerList listeners; + private final WorkerThreadPool pool; + private final SearchEngine searchEngine; + private final Date requestedAt; + private final int documentsBefore; + + private volatile SearchIndexerStatus status; + + public RebuildIndexTask(Collection excluders, + Collection modifiers, IndividualDao indDao, + ListenerList listeners, WorkerThreadPool pool) { + this.excluders = new ArrayList<>(excluders); + this.modifiers = new ArrayList<>(modifiers); + this.indDao = indDao; + this.listeners = listeners; + this.pool = pool; + + this.searchEngine = ApplicationUtils.instance().getSearchEngine(); + + this.requestedAt = new Date(); + this.documentsBefore = getDocumentCount(); + this.status = buildStatus(REBUILDING, 0); + } + + @Override + public void run() { + listeners.fireEvent(new Event(START_REBUILD, status)); + + Collection uris = getAllUrisInTheModel(); + updateTheUris(uris); + deleteOutdatedDocuments(); + + status = buildStatus(REBUILDING, getDocumentCount()); + listeners.fireEvent(new Event(STOP_REBUILD, status)); + } + + private Collection getAllUrisInTheModel() { + return indDao.getAllIndividualUris(); + } + + private void updateTheUris(Collection uris) { + new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) + .run(); + } + + private void deleteOutdatedDocuments() { + String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]"; + try { + searchEngine.deleteByQuery(query); + searchEngine.commit(); + } catch (SearchEngineException e) { + log.warn("Failed to delete outdated documents " + + "from the search index", e); + } + } + + private int getDocumentCount() { + try { + return searchEngine.documentCount(); + } catch (SearchEngineException e) { + log.warn("Failed to get docoument count from the search index.", e); + return 0; + } + } + + private SearchIndexerStatus buildStatus(State state, int documentsAfter) { + return new SearchIndexerStatus(state, new Date(), new RebuildCounts( + documentsBefore, documentsAfter)); } - /* (non-Javadoc) - * @see edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task#getStatus() - */ @Override public SearchIndexerStatus getStatus() { - // TODO Auto-generated method stub - throw new RuntimeException( - "RebuildIndexTask.getStatus() not implemented."); - + return status; } @Override public void notifyWorkUnitCompletion(Runnable workUnit) { - // TODO Auto-generated method stub - throw new RuntimeException("RebuildIndexTask.notifyWorkUnitCompletion() not implemented."); - + // We don't submit any work units, so we won't see any calls to this. + log.error("Why was this called?"); + } + + @Override + public String toString() { + return "RebuildIndexTask[requestedAt=" + + new SimpleDateFormat().format(requestedAt) + "]"; } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java new file mode 100644 index 000000000..c72198eba --- /dev/null +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateStatementsTask.java @@ -0,0 +1,209 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.searchindex.tasks; + +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PROGRESS; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_PROCESSING_STATEMENTS; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_STATEMENTS; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_STMTS; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.hp.hpl.jena.rdf.model.Statement; + +import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; +import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; +import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; +import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier; +import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder; +import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder; + +/** + * Receive a collection of statements that have been added to the model, or + * delete from it. + * + * Find the URIs of search documents that may have been affected by these + * changes, and update those documents. + * + * ------------------- + * + * It would be nice to stream this whole thing, finding the URIs affected by + * each statement and updating those documents before proceding. However, it is + * very common for several statements within a group to affect the same + * document, so that method would result in rebuilding the document several + * times. + * + * Instead, we final all of the URIs affected by all statements, store them in a + * Set to remove duplicates, and then process the URIs in the set. + */ +public class UpdateStatementsTask implements Task { + private static final Log log = LogFactory + .getLog(UpdateStatementsTask.class); + + private final List changes; + private final Set uriFinders; + private final Set excluders; + private final Set modifiers; + private final IndividualDao indDao; + private final ListenerList listeners; + private final WorkerThreadPool pool; + + private final Set uris; + private final Status status; + + public UpdateStatementsTask(List changes, + Set uriFinders, + Set excluders, + Set modifiers, IndividualDao indDao, + ListenerList listeners, WorkerThreadPool pool) { + this.changes = new ArrayList<>(changes); + this.uriFinders = uriFinders; + this.excluders = excluders; + this.modifiers = modifiers; + this.indDao = indDao; + this.listeners = listeners; + this.pool = pool; + + this.uris = Collections.synchronizedSet(new HashSet()); + + this.status = new Status(changes.size(), 200, listeners); + } + + @Override + public void run() { + listeners + .fireEvent(new Event(START_PROCESSING_STATEMENTS, getStatus())); + + findAffectedUris(); + listeners.fireEvent(new Event(PROGRESS, getStatus())); + + updateTheUris(); + listeners.fireEvent(new Event(STOP_PROCESSING_STATEMENTS, getStatus())); + } + + private void findAffectedUris() { + tellFindersWeAreStarting(); + + for (Statement stmt : changes) { + if (isInterrupted()) { + log.info("Interrupted: " + status.getSearchIndexerStatus()); + return; + } else { + findUrisForStatement(stmt); + } + } + waitForWorkUnitsToComplete(); + + tellFindersWeAreStopping(); + } + + private void tellFindersWeAreStarting() { + log.debug("Tell finders we are starting."); + for (IndexingUriFinder uriFinder : uriFinders) { + uriFinder.startIndexing(); + } + } + + private void tellFindersWeAreStopping() { + log.debug("Tell finders we are stopping."); + for (IndexingUriFinder uriFinder : uriFinders) { + uriFinder.endIndexing(); + } + } + + private boolean isInterrupted() { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return true; + } else { + return false; + } + } + + private void findUrisForStatement(Statement stmt) { + Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); + pool.submit(workUnit, this); + log.debug("scheduled uri finders for " + stmt); + } + + private void waitForWorkUnitsToComplete() { + pool.waitUntilIdle(); + } + + private void updateTheUris() { + new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) + .run(); + } + + @Override + public SearchIndexerStatus getStatus() { + return status.getSearchIndexerStatus(); + } + + @Override + public void notifyWorkUnitCompletion(Runnable workUnit) { + FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; + + Set foundUris = worker.getUris(); + Statement stmt = worker.getStatement(); + log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); + + uris.addAll(foundUris); + status.incrementProcessed(); + } + + // ---------------------------------------------------------------------- + // Helper classes + // ---------------------------------------------------------------------- + + /** + * A thread-safe collection of status information. All methods are + * synchronized. + */ + private static class Status { + private final int total; + private final int progressInterval; + private final ListenerList listeners; + private int processed = 0; + private Date since = new Date(); + + public Status(int total, int progressInterval, ListenerList listeners) { + this.total = total; + this.progressInterval = progressInterval; + this.listeners = listeners; + } + + public synchronized void incrementProcessed() { + processed++; + since = new Date(); + maybeFireProgressEvent(); + } + + private void maybeFireProgressEvent() { + if (processed > 0 && processed % progressInterval == 0) { + listeners.fireEvent(new Event(PROGRESS, + getSearchIndexerStatus())); + } + } + + public synchronized SearchIndexerStatus getSearchIndexerStatus() { + int remaining = total - processed; + return new SearchIndexerStatus(PROCESSING_STMTS, since, + new StatementCounts(processed, remaining, total)); + } + + } + +} diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java index 766cfc695..c6e66f9e1 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/searchindex/tasks/UpdateUrisTask.java @@ -2,6 +2,7 @@ package edu.cornell.mannlib.vitro.webapp.searchindex.tasks; +import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PROGRESS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_PROCESSING_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; @@ -67,7 +68,7 @@ public class UpdateUrisTask implements Task { this.listeners = listeners; this.pool = pool; - this.status = new Status(uris.size()); + this.status = new Status(uris.size(), 200, listeners); this.searchEngine = ApplicationUtils.instance().getSearchEngine(); } @@ -90,10 +91,19 @@ public class UpdateUrisTask implements Task { } } pool.waitUntilIdle(); + commitChanges(); listeners.fireEvent(new Event(STOP_PROCESSING_URIS, status .getSearchIndexerStatus())); } + private void commitChanges() { + try { + searchEngine.commit(); + } catch (SearchEngineException e) { + log.error("Failed to commit the changes."); + } + } + private boolean isInterrupted() { if (Thread.interrupted()) { Thread.currentThread().interrupt(); @@ -170,17 +180,22 @@ public class UpdateUrisTask implements Task { */ private static class Status { private final int total; + private final int progressInterval; + private final ListenerList listeners; private int updated = 0; private int deleted = 0; private Date since = new Date(); - public Status(int total) { + public Status(int total, int progressInterval, ListenerList listeners) { this.total = total; + this.progressInterval = progressInterval; + this.listeners = listeners; } public synchronized void incrementUpdates() { updated++; since = new Date(); + maybeFireProgressEvent(); } public synchronized void incrementDeletes() { @@ -188,6 +203,13 @@ public class UpdateUrisTask implements Task { since = new Date(); } + private void maybeFireProgressEvent() { + if (updated > 0 && updated % progressInterval == 0) { + listeners.fireEvent(new Event(PROGRESS, + getSearchIndexerStatus())); + } + } + public synchronized SearchIndexerStatus getSearchIndexerStatus() { int remaining = total - updated - deleted; return new SearchIndexerStatus(PROCESSING_URIS, since, diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java new file mode 100644 index 000000000..2c5d035ef --- /dev/null +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/utils/developer/listeners/DeveloperDisabledChangeListener.java @@ -0,0 +1,53 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.utils.developer.listeners; + +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; +import edu.cornell.mannlib.vitro.webapp.utils.developer.DeveloperSettings; +import edu.cornell.mannlib.vitro.webapp.utils.developer.Key; + +/** + * If a particular developer flag is NOT set to true, this is transparent. + * + * Set the flag and this becomes opaque, passing no events through. + */ +public class DeveloperDisabledChangeListener implements ChangeListener { + private final ChangeListener inner; + private final Key disablingKey; + + public DeveloperDisabledChangeListener(ChangeListener inner, + Key disablingKey) { + this.inner = inner; + this.disablingKey = disablingKey; + } + + private boolean isEnabled() { + return !DeveloperSettings.getInstance().getBoolean(disablingKey); + } + + // ---------------------------------------------------------------------- + // Delegated methods. + // ---------------------------------------------------------------------- + + @Override + public void addedStatement(String serializedTriple, String graphURI) { + if (isEnabled()) { + inner.addedStatement(serializedTriple, graphURI); + } + } + + @Override + public void removedStatement(String serializedTriple, String graphURI) { + if (isEnabled()) { + inner.removedStatement(serializedTriple, graphURI); + } + } + + @Override + public void notifyEvent(String graphURI, Object event) { + if (isEnabled()) { + inner.notifyEvent(graphURI, event); + } + } + +} diff --git a/webapp/test/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilderThreadTest.java b/webapp/test/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilderThreadTest.java deleted file mode 100644 index ac7584bb8..000000000 --- a/webapp/test/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilderThreadTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* $This file is distributed under the terms of the license in /doc/license.txt$ */ - -package edu.cornell.mannlib.vitro.webapp.search.indexing; - -import org.junit.Assert; - -import org.apache.log4j.Level; -import org.junit.Ignore; -import org.junit.Test; - -import edu.cornell.mannlib.vitro.testing.AbstractTestClass; - - -public class IndexBuilderThreadTest extends AbstractTestClass { - @Ignore - @Test - public void testStoppingTheThread(){ - setLoggerLevel(IndexBuilder.class, Level.OFF); - - IndexBuilder ib = new IndexBuilder(); - Assert.assertNotSame(Thread.State.NEW, ib.getState() ); - Assert.assertNotSame(Thread.State.TERMINATED, ib.getState() ); - - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Assert.fail(e.getMessage()); - } - ib.stopIndexingThread(); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Assert.fail(e.getMessage()); - } - Assert.assertFalse(ib.isAlive()); - Assert.assertSame(Thread.State.TERMINATED, ib.getState() ); - } -} diff --git a/webapp/web/templates/freemarker/body/admin/searchIndexStatus.ftl b/webapp/web/templates/freemarker/body/admin/searchIndexStatus.ftl index c9bc8abf0..99219f30f 100644 --- a/webapp/web/templates/freemarker/body/admin/searchIndexStatus.ftl +++ b/webapp/web/templates/freemarker/body/admin/searchIndexStatus.ftl @@ -18,8 +18,9 @@

<@showIndexerCounts "STATEMENT_COUNTS", status />

<@showElapsedTime status.elapsed /> Expected completion ${status.expectedCompletion?datetime}.

- <#elseif status.statusType = "PREPARING_REBUILD"> -

The search indexer has been preparing to rebuild the index since ${status.since?datetime}

+ <#elseif status.statusType = "REBUILDING"> +

The search indexer has been rebuilding the index since ${status.since?datetime}

+

<@showIndexerCounts "REBUILD_COUNTS", status />

<#else>

The search indexer status is: ${status.statusType} @@ -67,6 +68,11 @@ <#elseif countsType == "STATEMENT_COUNTS"> Processed: ${counts.processed}, remaining: ${counts.remaining}, total: ${counts.total} <#elseif countsType == "REBUILD_COUNTS"> - Number of individuals before rebuild: ${counts.numberOfIndividuals} + Number of document before rebuild: ${counts.documentsBefore}, after rebuild: + <#if counts.documentsAfter == 0> + UNKNOWN + <#else> + ${counts.documentsAfter} +