From fddf1d2af958eb354defb6080a7f53dd464ba7fe Mon Sep 17 00:00:00 2001 From: briancaruso Date: Fri, 21 Sep 2012 18:25:28 +0000 Subject: [PATCH] Changed IndexBuilder to use a Set instead of a Queue for changed statements, NIHVIVO-3969. Changed SearchReindexingListener to copy statements so they are not associated to any Model, NIHVIVO-3969. --- .../webapp/search/indexing/IndexBuilder.java | 74 ++++++++++++------- .../indexing/SearchReindexingListener.java | 20 ++++- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java index f397d197b..add4210a2 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java @@ -10,6 +10,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListSet; import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; @@ -47,7 +48,8 @@ public class IndexBuilder extends VitroBackgroundThread { * and other similar objects will use methods on IndexBuilder to add statements * to this queue. */ - private final ConcurrentLinkedQueue changedStmtQueue = new ConcurrentLinkedQueue(); + //private final ConcurrentLinkedQueue changedStmtQueue = new ConcurrentLinkedQueue(); + private final HashSet changedStmts = new HashSet(); /** This is a list of objects that will compute what URIs need to be * updated in the search index when a statement changes. */ @@ -128,7 +130,10 @@ public class IndexBuilder extends VitroBackgroundThread { * your changes with a call to doUpdateIndex(). */ public void addToChanged(Statement stmt) { - changedStmtQueue.add(stmt); + log.debug("call to addToChanged()"); + synchronized(changedStmts){ + changedStmts.add(stmt); + } } /** @@ -136,6 +141,7 @@ public class IndexBuilder extends VitroBackgroundThread { * the index. */ public synchronized void doIndexRebuild() { + log.debug("call to doIndexRebuild()"); //set flag for full index rebuild this.reindexRequested = true; //wake up @@ -145,7 +151,8 @@ public class IndexBuilder extends VitroBackgroundThread { /** * This will re-index Individuals were added with addToChanged(). */ - public synchronized void doUpdateIndex() { + public synchronized void doUpdateIndex() { + log.debug("callto doUpdateIndex()"); //wake up thread and it will attempt to index anything in changedUris this.notifyAll(); } @@ -180,20 +187,26 @@ public class IndexBuilder extends VitroBackgroundThread { notifyListeners( IndexingEventListener.EventTypes.FINISH_FULL_REBUILD ); setWorkLevel(WorkLevel.IDLE); - }else if( !changedStmtQueue.isEmpty() ){ - setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING); - - //wait a bit to let a bit more work to come into the queue - Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); - log.debug("work found for IndexBuilder, starting update"); - - notifyListeners( IndexingEventListener.EventTypes.START_UPDATE ); - updatedIndex(); - notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE ); - setWorkLevel(WorkLevel.IDLE); - } else { - log.debug("there is no indexing working to do, waiting for work"); - synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } + }else{ + boolean workToDo = false; + synchronized (changedStmts ){ + workToDo = !changedStmts.isEmpty(); + } + if( workToDo ){ + setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING); + + //wait a bit to let a bit more work to come into the queue + Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL); + log.debug("work found for IndexBuilder, starting update"); + + notifyListeners( IndexingEventListener.EventTypes.START_UPDATE ); + updatedIndex(); + notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE ); + setWorkLevel(WorkLevel.IDLE); + } else { + log.debug("there is no indexing working to do, waiting for work"); + synchronized (this) { this.wait(MAX_IDLE_INTERVAL); } + } } } catch (InterruptedException e) { log.debug("woken up",e); @@ -231,11 +244,9 @@ public class IndexBuilder extends VitroBackgroundThread { for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ) { stu.startIndexing(); } - - Collection urisToUpdate = new HashSet(); - - Statement stmt ; - while (null != (stmt = changedStmtQueue.poll())) { + + Collection urisToUpdate = new HashSet(); + for( Statement stmt : getAndClearChangedStmts() ){ for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); } @@ -249,6 +260,17 @@ public class IndexBuilder extends VitroBackgroundThread { return urisToUpdate; } + private Statement[] getAndClearChangedStmts(){ + //get the statements that changed + Statement[] stmts = null; + synchronized( changedStmts ){ + stmts = new Statement[changedStmts.size()]; + stmts = changedStmts.toArray( stmts ); + changedStmts.clear(); + } + return stmts; + } + /** * Take the URIs that we got from the changedStmtQueue, and create the lists * of updated URIs and deleted URIs. @@ -283,7 +305,7 @@ public class IndexBuilder extends VitroBackgroundThread { log.info("Rebuild of search index is starting."); // clear out changed URIs since we are doing a full index rebuild - changedStmtQueue.clear(); + changedStmts.clear(); log.debug("Getting all URIs in the model"); Iterator uris = wdf.getIndividualDao().getAllOfThisTypeIterator(); @@ -297,11 +319,11 @@ public class IndexBuilder extends VitroBackgroundThread { protected void updatedIndex() { log.debug("Starting updateIndex()"); - UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() ); - + UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() ); int numberOfThreads = Math.min( MAX_UPDATE_THREADS, - Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1)); + Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1)); + doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris , numberOfThreads); log.debug("Ending updateIndex()"); diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java index af181bf60..6327608c8 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/SearchReindexingListener.java @@ -9,6 +9,7 @@ import org.apache.commons.logging.LogFactory; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelChangedListener; +import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.StmtIterator; @@ -23,10 +24,15 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; public class SearchReindexingListener implements ModelChangedListener { private IndexBuilder indexBuilder; + /** Model just for creating statements */ + private Model statementCreator; + public SearchReindexingListener(IndexBuilder indexBuilder ) { if(indexBuilder == null ) throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null"); - this.indexBuilder = indexBuilder; + this.indexBuilder = indexBuilder; + this.statementCreator = ModelFactory.createDefaultModel(); + log.debug("new SearchReindexingListener"); } private synchronized void addChange(Statement stmt){ @@ -50,13 +56,23 @@ public class SearchReindexingListener implements ModelChangedListener { log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'"); } - indexBuilder.addToChanged(stmt); + indexBuilder.addToChanged( copyStmt(stmt) ); } private void requestAsyncIndexUpdate(){ + log.debug("requestAsyncIndexUpdate()"); indexBuilder.doUpdateIndex(); } + /** + * Create a copy of the statement to make sure that + * it is not associated with any Models. This is + * to avoid the Models being held back from GC. + */ + private Statement copyStmt( Statement in){ + return statementCreator.createStatement(in.getSubject(),in.getPredicate(),in.getObject()); + } + @Override public void notifyEvent(Model arg0, Object arg1) { if ( (arg1 instanceof EditEvent) ){