NIHVIVO-3209 Clarify logic in IndexBuilder; use a ConcurrentLinkedQueue to handle synchronization; use volatile flags so all threads will see changes; don't test if log is null since it cannot change.

This commit is contained in:
j2blake 2011-10-10 19:42:11 +00:00
parent be06b68871
commit 23b55d2ae6

View file

@ -7,8 +7,8 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -43,34 +43,25 @@ public class IndexBuilder extends Thread {
/** Statements that have changed in the model. The SearchReindexingListener /** Statements that have changed in the model. The SearchReindexingListener
* and other similar objects will use methods on IndexBuilder to add statements * and other similar objects will use methods on IndexBuilder to add statements
* to this queue. This should only be accessed from blocks synchronized on * to this queue.
* the changedStmtQueue object.
*/ */
protected List<Statement> changedStmtQueue; private final ConcurrentLinkedQueue<Statement> changedStmtQueue = new ConcurrentLinkedQueue<Statement>();
/** This is the list of URIs that need to be updated in the search
* index. The IndexBuilder thread will process Statements in changedStmtQueue
* to create this set of URIs.
* This should only be accessed by the IndexBuilder thread. */
private HashSet<String> urisRequiringUpdate;
/** This is a list of objects that will compute what URIs need to be /** This is a list of objects that will compute what URIs need to be
* updated in the search index when a statement changes. */ * updated in the search index when a statement changes. */
protected List<StatementToURIsToUpdate> stmtToURIsToIndexFunctions; private final List<StatementToURIsToUpdate> stmtToURIsToIndexFunctions;
/** /** Indicates that a full index re-build has been requested. */
* Indicates that a full index re-build has been requested. private volatile boolean reindexRequested = false;
*/
private boolean reindexRequested = false;
/** Indicates that a stop of the indexing objects has been requested. */ /** Indicates that a stop of the indexing objects has been requested. */
protected boolean stopRequested = false; private volatile boolean stopRequested = false;
/** Length of pause between a model change an the start of indexing. */ /** Length of time to wait before looking for work (if not wakened sooner). */
protected long reindexInterval = 1000 * 60 /* msec */ ; public static final long MAX_IDLE_INTERVAL = 1000 * 60 /* msec */ ;
/** Length of pause between when work comes into queue to when indexing starts */ /** Length of pause between when work comes into queue to when indexing starts */
protected long waitAfterNewWorkInterval = 500; //msec public static final long WAIT_AFTER_NEW_WORK_INTERVAL = 500; //msec
/** Number of threads to use during indexing. */ /** Number of threads to use during indexing. */
protected int numberOfThreads = 10; protected int numberOfThreads = 10;
@ -79,9 +70,6 @@ public class IndexBuilder extends Thread {
public static final int MAX_UPDATE_THREADS= 10; public static final int MAX_UPDATE_THREADS= 10;
public static final int MAX_THREADS = Math.max( MAX_UPDATE_THREADS, MAX_REINDEX_THREADS); public static final int MAX_THREADS = Math.max( MAX_UPDATE_THREADS, MAX_REINDEX_THREADS);
//public static final boolean UPDATE_DOCS = false;
//public static final boolean NEW_DOCS = true;
private static final Log log = LogFactory.getLog(IndexBuilder.class); private static final Log log = LogFactory.getLog(IndexBuilder.class);
public IndexBuilder(IndexerIface indexer, public IndexBuilder(IndexerIface indexer,
@ -92,13 +80,12 @@ public class IndexBuilder extends Thread {
this.indexer = indexer; this.indexer = indexer;
this.wdf = wdf; this.wdf = wdf;
if( stmtToURIsToIndexFunctions != null ) if( stmtToURIsToIndexFunctions != null )
this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions; this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions;
else else
this.stmtToURIsToIndexFunctions = Collections.emptyList(); this.stmtToURIsToIndexFunctions = Collections.emptyList();
this.changedStmtQueue = new LinkedList<Statement>();
this.urisRequiringUpdate = new HashSet<String>();
this.start(); this.start();
} }
@ -117,10 +104,8 @@ public class IndexBuilder extends Thread {
* your changes with a call to doUpdateIndex(). * your changes with a call to doUpdateIndex().
*/ */
public void addToChanged(Statement stmt) { public void addToChanged(Statement stmt) {
synchronized(changedStmtQueue){
changedStmtQueue.add(stmt); changedStmtQueue.add(stmt);
} }
}
/** /**
* This method will cause the IndexBuilder to completely rebuild * This method will cause the IndexBuilder to completely rebuild
@ -159,21 +144,20 @@ public class IndexBuilder extends Thread {
public void run() { public void run() {
while(! stopRequested ){ while(! stopRequested ){
try{ try{
if( !stopRequested && reindexRequested ){ if( reindexRequested ){
log.debug("full re-index requested"); log.debug("full re-index requested");
indexRebuild(); indexRebuild();
}else if( !stopRequested && isThereWorkToDo() ){ }else if( !changedStmtQueue.isEmpty() ){
Thread.sleep(waitAfterNewWorkInterval); //wait a bit to let a bit more work to come into the queue Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); //wait a bit to let a bit more work to come into the queue
log.debug("work found for IndexBuilder, starting update"); log.debug("work found for IndexBuilder, starting update");
updatedIndex(); updatedIndex();
} else { } else {
log.debug("there is no indexing working to do, waiting for work"); log.debug("there is no indexing working to do, waiting for work");
synchronized (this) { this.wait(reindexInterval); } synchronized (this) { this.wait(MAX_IDLE_INTERVAL); }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("woken up",e); log.debug("woken up",e);
}catch(Throwable e){ }catch(Throwable e){
if( ! stopRequested && log != null )//may be null on shutdown
log.error(e,e); log.error(e,e);
} }
} }
@ -181,7 +165,6 @@ public class IndexBuilder extends Thread {
if( indexer != null) if( indexer != null)
indexer.abortIndexingAndCleanUp(); indexer.abortIndexingAndCleanUp();
if(! stopRequested && log != null )//may be null on shutdown
log.info("Stopping IndexBuilder thread"); log.info("Stopping IndexBuilder thread");
} }
@ -201,37 +184,29 @@ public class IndexBuilder extends Thread {
/* ******************** non-public methods ************************* */ /* ******************** non-public methods ************************* */
private List<Statement> getAndEmptyChangedStatements(){
List<Statement> localChangedStmt = null;
synchronized( changedStmtQueue ){
localChangedStmt = new ArrayList<Statement>(changedStmtQueue.size());
localChangedStmt.addAll( changedStmtQueue );
changedStmtQueue.clear();
}
return localChangedStmt;
}
/** /**
* For a collection of statements, find the URIs that need to be updated in * Take the changed statements from the queue and determine which URIs that need to be updated in
* the index. * the index.
*/ */
private Collection<String> statementsToUris( Collection<Statement> localChangedStmt ){ private Collection<String> changedStatementsToUris(){
//inform StatementToURIsToUpdate that index is starting //inform StatementToURIsToUpdate that index is starting
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) {
stu.startIndexing(); stu.startIndexing();
}
Collection<String> urisToUpdate = new HashSet<String>(); Collection<String> urisToUpdate = new HashSet<String>();
for( Statement stmt : localChangedStmt){
if( stmt == null ) Statement stmt ;
continue; while (null != (stmt = changedStmtQueue.poll())) {
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){
urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) );
} }
} }
//inform StatementToURIsToUpdate that they are done //inform StatementToURIsToUpdate that they are done
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) {
stu.endIndxing(); stu.endIndxing();
}
return urisToUpdate; return urisToUpdate;
} }
@ -269,7 +244,7 @@ public class IndexBuilder extends Thread {
log.info("Rebuild of search index is starting."); log.info("Rebuild of search index is starting.");
// clear out changed URIs since we are doing a full index rebuild // clear out changed URIs since we are doing a full index rebuild
getAndEmptyChangedStatements(); changedStmtQueue.clear();
log.debug("Getting all URIs in the model"); log.debug("Getting all URIs in the model");
Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator(); Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator();
@ -284,7 +259,7 @@ public class IndexBuilder extends Thread {
protected void updatedIndex() { protected void updatedIndex() {
log.debug("Starting updateIndex()"); log.debug("Starting updateIndex()");
UriLists uriLists = makeAddAndDeleteLists( statementsToUris(getAndEmptyChangedStatements()) ); UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() );
this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, uriLists.updatedUris.size() / 20); this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, uriLists.updatedUris.size() / 20);
doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris ); doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris );
@ -432,12 +407,6 @@ public class IndexBuilder extends Thread {
return work; return work;
} }
protected boolean isThereWorkToDo(){
synchronized( changedStmtQueue ){
return reindexRequested || ! changedStmtQueue.isEmpty() ;
}
}
private static class UriLists { private static class UriLists {
private final List<String> updatedUris = new ArrayList<String>(); private final List<String> updatedUris = new ArrayList<String>();
private final List<String> deletedUris = new ArrayList<String>(); private final List<String> deletedUris = new ArrayList<String>();