diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java index dd1607f76..4f3c52cb6 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java @@ -7,8 +7,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -43,34 +43,25 @@ public class IndexBuilder extends Thread { /** Statements that have changed in the model. The SearchReindexingListener * and other similar objects will use methods on IndexBuilder to add statements - * to this queue. This should only be accessed from blocks synchronized on - * the changedStmtQueue object. + * to this queue. */ - protected List changedStmtQueue; - - /** This is the list of URIs that need to be updated in the search - * index. The IndexBuilder thread will process Statements in changedStmtQueue - * to create this set of URIs. - * This should only be accessed by the IndexBuilder thread. */ - private HashSet urisRequiringUpdate; + private final ConcurrentLinkedQueue changedStmtQueue = new ConcurrentLinkedQueue(); /** This is a list of objects that will compute what URIs need to be * updated in the search index when a statement changes. */ - protected List stmtToURIsToIndexFunctions; + private final List stmtToURIsToIndexFunctions; - /** - * Indicates that a full index re-build has been requested. - */ - private boolean reindexRequested = false; + /** Indicates that a full index re-build has been requested. */ + private volatile boolean reindexRequested = false; /** Indicates that a stop of the indexing objects has been requested. */ - protected boolean stopRequested = false; + private volatile boolean stopRequested = false; - /** Length of pause between a model change an the start of indexing. */ - protected long reindexInterval = 1000 * 60 /* msec */ ; + /** Length of time to wait before looking for work (if not wakened sooner). */ + public static final long MAX_IDLE_INTERVAL = 1000 * 60 /* msec */ ; /** Length of pause between when work comes into queue to when indexing starts */ - protected long waitAfterNewWorkInterval = 500; //msec + public static final long WAIT_AFTER_NEW_WORK_INTERVAL = 500; //msec /** Number of threads to use during indexing. */ protected int numberOfThreads = 10; @@ -79,9 +70,6 @@ public class IndexBuilder extends Thread { public static final int MAX_UPDATE_THREADS= 10; public static final int MAX_THREADS = Math.max( MAX_UPDATE_THREADS, MAX_REINDEX_THREADS); - //public static final boolean UPDATE_DOCS = false; - //public static final boolean NEW_DOCS = true; - private static final Log log = LogFactory.getLog(IndexBuilder.class); public IndexBuilder(IndexerIface indexer, @@ -92,13 +80,12 @@ public class IndexBuilder extends Thread { this.indexer = indexer; this.wdf = wdf; + if( stmtToURIsToIndexFunctions != null ) this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions; else this.stmtToURIsToIndexFunctions = Collections.emptyList(); - this.changedStmtQueue = new LinkedList(); - this.urisRequiringUpdate = new HashSet(); this.start(); } @@ -116,11 +103,9 @@ public class IndexBuilder extends Thread { * index this is the method you should use. Follow the adding of * your changes with a call to doUpdateIndex(). */ - public void addToChanged(Statement stmt){ - synchronized(changedStmtQueue){ - changedStmtQueue.add(stmt); - } - } + public void addToChanged(Statement stmt) { + changedStmtQueue.add(stmt); + } /** * This method will cause the IndexBuilder to completely rebuild @@ -159,30 +144,28 @@ public class IndexBuilder extends Thread { public void run() { while(! stopRequested ){ try{ - if( !stopRequested && reindexRequested ){ + if( reindexRequested ){ log.debug("full re-index requested"); indexRebuild(); - }else if( !stopRequested && isThereWorkToDo() ){ - Thread.sleep(waitAfterNewWorkInterval); //wait a bit to let a bit more work to come into the queue + }else if( !changedStmtQueue.isEmpty() ){ + Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); //wait a bit to let a bit more work to come into the queue log.debug("work found for IndexBuilder, starting update"); updatedIndex(); } else { log.debug("there is no indexing working to do, waiting for work"); - synchronized (this) { this.wait(reindexInterval); } + synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } } } catch (InterruptedException e) { log.debug("woken up",e); }catch(Throwable e){ - if( ! stopRequested && log != null )//may be null on shutdown - log.error(e,e); + log.error(e,e); } } if( indexer != null) indexer.abortIndexingAndCleanUp(); - if(! stopRequested && log != null )//may be null on shutdown - log.info("Stopping IndexBuilder thread"); + log.info("Stopping IndexBuilder thread"); } @@ -201,37 +184,29 @@ public class IndexBuilder extends Thread { /* ******************** non-public methods ************************* */ - private List getAndEmptyChangedStatements(){ - List localChangedStmt = null; - synchronized( changedStmtQueue ){ - localChangedStmt = new ArrayList(changedStmtQueue.size()); - localChangedStmt.addAll( changedStmtQueue ); - changedStmtQueue.clear(); - } - return localChangedStmt; - } - /** - * For a collection of statements, find the URIs that need to be updated in + * Take the changed statements from the queue and determine which URIs that need to be updated in * the index. */ - private Collection statementsToUris( Collection localChangedStmt ){ + private Collection changedStatementsToUris(){ //inform StatementToURIsToUpdate that index is starting - for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) + for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) { stu.startIndexing(); + } Collection urisToUpdate = new HashSet(); - for( Statement stmt : localChangedStmt){ - if( stmt == null ) - continue; - for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ - urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); - } + + Statement stmt ; + while (null != (stmt = changedStmtQueue.poll())) { + for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ + urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); + } } //inform StatementToURIsToUpdate that they are done - for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) + for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) { stu.endIndxing(); + } return urisToUpdate; } @@ -269,8 +244,8 @@ public class IndexBuilder extends Thread { log.info("Rebuild of search index is starting."); // clear out changed URIs since we are doing a full index rebuild - getAndEmptyChangedStatements(); - + changedStmtQueue.clear(); + log.debug("Getting all URIs in the model"); Iterator uris = wdf.getIndividualDao().getAllOfThisTypeIterator(); @@ -284,7 +259,7 @@ public class IndexBuilder extends Thread { protected void updatedIndex() { log.debug("Starting updateIndex()"); - UriLists uriLists = makeAddAndDeleteLists( statementsToUris(getAndEmptyChangedStatements()) ); + UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() ); this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, uriLists.updatedUris.size() / 20); doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris ); @@ -432,12 +407,6 @@ public class IndexBuilder extends Thread { return work; } - protected boolean isThereWorkToDo(){ - synchronized( changedStmtQueue ){ - return reindexRequested || ! changedStmtQueue.isEmpty() ; - } - } - private static class UriLists { private final List updatedUris = new ArrayList(); private final List deletedUris = new ArrayList();