Changed IndexBuilder to use a Set instead of a Queue for changed statements, NIHVIVO-3969.

Changed SearchReindexingListener to copy statements so they are not associated to any Model, NIHVIVO-3969.
This commit is contained in:
briancaruso 2012-09-21 18:25:28 +00:00
parent b005179a0b
commit fddf1d2af9
2 changed files with 66 additions and 28 deletions

View file

@ -10,6 +10,7 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -47,7 +48,8 @@ public class IndexBuilder extends VitroBackgroundThread {
* and other similar objects will use methods on IndexBuilder to add statements * and other similar objects will use methods on IndexBuilder to add statements
* to this queue. * to this queue.
*/ */
private final ConcurrentLinkedQueue<Statement> changedStmtQueue = new ConcurrentLinkedQueue<Statement>(); //private final ConcurrentLinkedQueue<Statement> changedStmtQueue = new ConcurrentLinkedQueue<Statement>();
private final HashSet<Statement> changedStmts = new HashSet<Statement>();
/** This is a list of objects that will compute what URIs need to be /** This is a list of objects that will compute what URIs need to be
* updated in the search index when a statement changes. */ * updated in the search index when a statement changes. */
@ -128,7 +130,10 @@ public class IndexBuilder extends VitroBackgroundThread {
* your changes with a call to doUpdateIndex(). * your changes with a call to doUpdateIndex().
*/ */
public void addToChanged(Statement stmt) { public void addToChanged(Statement stmt) {
changedStmtQueue.add(stmt); log.debug("call to addToChanged()");
synchronized(changedStmts){
changedStmts.add(stmt);
}
} }
/** /**
@ -136,6 +141,7 @@ public class IndexBuilder extends VitroBackgroundThread {
* the index. * the index.
*/ */
public synchronized void doIndexRebuild() { public synchronized void doIndexRebuild() {
log.debug("call to doIndexRebuild()");
//set flag for full index rebuild //set flag for full index rebuild
this.reindexRequested = true; this.reindexRequested = true;
//wake up //wake up
@ -145,7 +151,8 @@ public class IndexBuilder extends VitroBackgroundThread {
/** /**
* This will re-index Individuals were added with addToChanged(). * This will re-index Individuals were added with addToChanged().
*/ */
public synchronized void doUpdateIndex() { public synchronized void doUpdateIndex() {
log.debug("callto doUpdateIndex()");
//wake up thread and it will attempt to index anything in changedUris //wake up thread and it will attempt to index anything in changedUris
this.notifyAll(); this.notifyAll();
} }
@ -180,20 +187,26 @@ public class IndexBuilder extends VitroBackgroundThread {
notifyListeners( IndexingEventListener.EventTypes.FINISH_FULL_REBUILD ); notifyListeners( IndexingEventListener.EventTypes.FINISH_FULL_REBUILD );
setWorkLevel(WorkLevel.IDLE); setWorkLevel(WorkLevel.IDLE);
}else if( !changedStmtQueue.isEmpty() ){ }else{
setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING); boolean workToDo = false;
synchronized (changedStmts ){
//wait a bit to let a bit more work to come into the queue workToDo = !changedStmts.isEmpty();
Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); }
log.debug("work found for IndexBuilder, starting update"); if( workToDo ){
setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING);
notifyListeners( IndexingEventListener.EventTypes.START_UPDATE );
updatedIndex(); //wait a bit to let a bit more work to come into the queue
notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE ); Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL);
setWorkLevel(WorkLevel.IDLE); log.debug("work found for IndexBuilder, starting update");
} else {
log.debug("there is no indexing working to do, waiting for work"); notifyListeners( IndexingEventListener.EventTypes.START_UPDATE );
synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } updatedIndex();
notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE );
setWorkLevel(WorkLevel.IDLE);
} else {
log.debug("there is no indexing working to do, waiting for work");
synchronized (this) { this.wait(MAX_IDLE_INTERVAL); }
}
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("woken up",e); log.debug("woken up",e);
@ -231,11 +244,9 @@ public class IndexBuilder extends VitroBackgroundThread {
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) { for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) {
stu.startIndexing(); stu.startIndexing();
} }
Collection<String> urisToUpdate = new HashSet<String>(); Collection<String> urisToUpdate = new HashSet<String>();
for( Statement stmt : getAndClearChangedStmts() ){
Statement stmt ;
while (null != (stmt = changedStmtQueue.poll())) {
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){
urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) );
} }
@ -249,6 +260,17 @@ public class IndexBuilder extends VitroBackgroundThread {
return urisToUpdate; return urisToUpdate;
} }
private Statement[] getAndClearChangedStmts(){
//get the statements that changed
Statement[] stmts = null;
synchronized( changedStmts ){
stmts = new Statement[changedStmts.size()];
stmts = changedStmts.toArray( stmts );
changedStmts.clear();
}
return stmts;
}
/** /**
* Take the URIs that we got from the changedStmtQueue, and create the lists * Take the URIs that we got from the changedStmtQueue, and create the lists
* of updated URIs and deleted URIs. * of updated URIs and deleted URIs.
@ -283,7 +305,7 @@ public class IndexBuilder extends VitroBackgroundThread {
log.info("Rebuild of search index is starting."); log.info("Rebuild of search index is starting.");
// clear out changed URIs since we are doing a full index rebuild // clear out changed URIs since we are doing a full index rebuild
changedStmtQueue.clear(); changedStmts.clear();
log.debug("Getting all URIs in the model"); log.debug("Getting all URIs in the model");
Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator(); Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator();
@ -297,11 +319,11 @@ public class IndexBuilder extends VitroBackgroundThread {
protected void updatedIndex() { protected void updatedIndex() {
log.debug("Starting updateIndex()"); log.debug("Starting updateIndex()");
UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() ); UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() );
int numberOfThreads = int numberOfThreads =
Math.min( MAX_UPDATE_THREADS, Math.min( MAX_UPDATE_THREADS,
Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1)); Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1));
doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris , numberOfThreads); doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris , numberOfThreads);
log.debug("Ending updateIndex()"); log.debug("Ending updateIndex()");

View file

@ -9,6 +9,7 @@ import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelChangedListener; import com.hp.hpl.jena.rdf.model.ModelChangedListener;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.rdf.model.StmtIterator;
@ -23,10 +24,15 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
public class SearchReindexingListener implements ModelChangedListener { public class SearchReindexingListener implements ModelChangedListener {
private IndexBuilder indexBuilder; private IndexBuilder indexBuilder;
/** Model just for creating statements */
private Model statementCreator;
public SearchReindexingListener(IndexBuilder indexBuilder ) { public SearchReindexingListener(IndexBuilder indexBuilder ) {
if(indexBuilder == null ) if(indexBuilder == null )
throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null"); throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null");
this.indexBuilder = indexBuilder; this.indexBuilder = indexBuilder;
this.statementCreator = ModelFactory.createDefaultModel();
log.debug("new SearchReindexingListener");
} }
private synchronized void addChange(Statement stmt){ private synchronized void addChange(Statement stmt){
@ -50,13 +56,23 @@ public class SearchReindexingListener implements ModelChangedListener {
log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'"); log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'");
} }
indexBuilder.addToChanged(stmt); indexBuilder.addToChanged( copyStmt(stmt) );
} }
private void requestAsyncIndexUpdate(){ private void requestAsyncIndexUpdate(){
log.debug("requestAsyncIndexUpdate()");
indexBuilder.doUpdateIndex(); indexBuilder.doUpdateIndex();
} }
/**
* Create a copy of the statement to make sure that
* it is not associated with any Models. This is
* to avoid the Models being held back from GC.
*/
private Statement copyStmt( Statement in){
return statementCreator.createStatement(in.getSubject(),in.getPredicate(),in.getObject());
}
@Override @Override
public void notifyEvent(Model arg0, Object arg1) { public void notifyEvent(Model arg0, Object arg1) {
if ( (arg1 instanceof EditEvent) ){ if ( (arg1 instanceof EditEvent) ){