Refactored SearchReindexingListener so additionalUriFinders is run async in model listener methods. NIHVIVO-2951

This commit is contained in:
briancaruso 2011-07-18 16:27:11 +00:00
parent 8095541706
commit 1f39e6a96d
10 changed files with 215 additions and 145 deletions

View file

@ -1,15 +0,0 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search.beans;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Statement;
/**
* Interface to use with IndexBuilder to find more URIs to index given a changed statement.
* The stmt may have been added or removed from the model.
*/
public interface AdditionalURIsToIndex {
List<String> findAdditionalURIsToIndex(Statement stmt);
}

View file

@ -0,0 +1,25 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search.beans;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Statement;
/**
* Interface to use with IndexBuilder to find more URIs to index given a changed statement.
* The statement may have been added or removed from the model.
*/
public interface StatementToURIsToUpdate {
/**
* For the domain that is the responsibility of the given implementation,
* calculate the URIs that need to be updated in the search index.
* The URIs in the list will be updated by the IndexBuilder, which will
* handle URIs of new individuals, URIs of individuals that have changes,
* and URIs of individuals that have been removed from the model.
*
* @return List of URIs.
*/
List<String> findAdditionalURIsToIndex(Statement stmt);
}

View file

@ -15,7 +15,7 @@ import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.vocabulary.RDF;
import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
/**
* If a class changes classgroups, then all members of that class
@ -29,7 +29,7 @@ import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
* changes, all members of the class core:Summer need to be update so they get the new classgroup values.
*/
public class AdditionalURIsForClassGroupChanges implements
AdditionalURIsToIndex {
StatementToURIsToUpdate {
private OntModel model;

View file

@ -25,9 +25,9 @@ import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.shared.Lock;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
public class AdditionalURIsForContextNodes implements AdditionalURIsToIndex {
public class AdditionalURIsForContextNodes implements StatementToURIsToUpdate {
private OntModel model;
private static final List<String> multiValuedQueriesForAgent = new ArrayList<String>();

View file

@ -0,0 +1,22 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search.indexing;
import java.util.Collections;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Statement;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
public class AdditionalURIsForDataProperties implements StatementToURIsToUpdate{
@Override
public List<String> findAdditionalURIsToIndex(Statement stmt) {
if( stmt != null && stmt.getObject().isLiteral() && stmt.getSubject().getURI() != null )
return Collections.singletonList( stmt.getSubject().getURI() );
else
return Collections.emptyList();
}
}

View file

@ -26,7 +26,7 @@ import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.vocabulary.RDFS;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
/**
* For a given statement, return the URIs that may need to be updated in
@ -35,7 +35,7 @@ import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
*
* Context nodes are not handled here. They are taken care of in AdditionalURIsForContextNodex.
*/
public class AdditionalURIsForObjectProperties implements AdditionalURIsToIndex {
public class AdditionalURIsForObjectProperties implements StatementToURIsToUpdate {
protected static final Log log = LogFactory.getLog(AdditionalURIsForObjectProperties.class);
protected Model model;

View file

@ -7,8 +7,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletContext;
@ -16,11 +16,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.query.QueryParseException;
import com.hp.hpl.jena.rdf.model.Statement;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
/**
@ -31,42 +33,78 @@ import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
*
* See the class SearchReindexingListener for an example of how a model change
* listener can use an IndexBuilder to keep the full text index in sncy with
* updates to a model. It calls IndexBuilder.addToChangedUris().
*
* @author bdc34
*
* updates to a model. It calls IndexBuilder.addToChangedUris().
*/
public class IndexBuilder extends Thread {
private WebappDaoFactory wdf;
private final IndexerIface indexer;
private final ServletContext context;
private final IndexerIface indexer;
/** Statements that have changed in the model. The SearchReindexingListener
* and other similar objects will use methods on IndexBuilder to add statements
* to this queue. This should only be accessed from blocks synchronized on
* the changedStmtQueue object.
*/
protected List<Statement> changedStmtQueue;
/* changedUris should only be accessed from synchronized blocks */
private HashSet<String> changedUris = null;
/** This is the list of URIs that need to be updated in the search
* index. The IndexBuilder thread will process Statements in changedStmtQueue
* to create this set of URIs.
* This should only be accessed by the IndexBuilder thread. */
private HashSet<String> urisRequiringUpdate;
private List<String> updatedInds = null;
private List<String> deletedInds = null;
/** This is a list of objects that will compute what URIs need to be
* updated in the search index when a statement changes. */
protected List<StatementToURIsToUpdate> stmtToURIsToIndexFunctions;
private boolean reindexRequested = false;
/**
* updatedUris will only be accessed from the IndexBuilder thread
* so it doesn't need to be synchronized.
*/
private List<String> updatedUris = null;
/**
* deletedUris will only be accessed from the IndexBuilder thread
* so it doesn't need to be synchronized.
*/
private List<String> deletedUris = null;
/**
* Indicates that a full index re-build has been requested.
*/
private boolean reindexRequested = false;
/** Indicates that a stop of the indexing objects has been requested. */
protected boolean stopRequested = false;
/** Length of pause between a model change an the start of indexing. */
protected long reindexInterval = 1000 * 60 /* msec */ ;
/** Number of threads to use during indexing. */
protected int numberOfThreads = 10;
public static final boolean UPDATE_DOCS = false;
public static final boolean NEW_DOCS = true;
public static final int MAX_REINDEX_THREADS= 10;
public static final int MAX_UPDATE_THREADS= 10;
public static final int MAX_THREADS = Math.max( MAX_UPDATE_THREADS, MAX_REINDEX_THREADS);
//public static final boolean UPDATE_DOCS = false;
//public static final boolean NEW_DOCS = true;
private static final Log log = LogFactory.getLog(IndexBuilder.class);
public IndexBuilder(
ServletContext context,
IndexerIface indexer,
WebappDaoFactory wdf ){
public IndexBuilder(IndexerIface indexer,
WebappDaoFactory wdf,
List<StatementToURIsToUpdate> stmtToURIsToIndexFunctions ){
super("IndexBuilder");
this.indexer = indexer;
this.wdf = wdf;
this.context = context;
this.changedUris = new HashSet<String>();
this.wdf = wdf;
if( stmtToURIsToIndexFunctions != null )
this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions;
else
this.stmtToURIsToIndexFunctions = Collections.emptyList();
this.changedStmtQueue = new LinkedList<Statement>();
this.urisRequiringUpdate = new HashSet<String>();
this.start();
}
@ -75,21 +113,32 @@ public class IndexBuilder extends Thread {
this( null, null, null);
}
public void setWdf(WebappDaoFactory wdf){
this.wdf = wdf;
}
public boolean isIndexing(){
return indexer.isIndexing();
}
/**
* Use this method to add URIs that need to be indexed. Should be
* able to add to changedStmtQueue while indexing is in process.
*
* If you have a statement that has been added or removed from the
* RDF model and you would like it to take effect in the search
* index this is the method you should use. Follow the adding of
* your changes with a call to doUpdateIndex().
*/
public void addToChanged(Statement stmt){
synchronized(changedStmtQueue){
changedStmtQueue.add(stmt);
}
}
/**
* This method will cause the IndexBuilder to completely rebuild
* the index.
*/
public synchronized void doIndexRebuild() {
//set flag for full index rebuild
this.reindexRequested = true;
//wake up
this.notifyAll();
//set flag for full index rebuild
this.reindexRequested = true;
//wake up
this.notifyAll();
}
/**
* This will re-index Individuals that changed because of modtime or because they
* were added with addChangedUris().
@ -98,30 +147,11 @@ public class IndexBuilder extends Thread {
//wake up thread and it will attempt to index anything in changedUris
this.notifyAll();
}
/**
* Use this method to add URIs that need to be indexed.
*/
public synchronized void addToChangedUris(String uri){
changedUris.add(uri);
}
/**
* Use this method to add URIs that need to be indexed.
*/
public synchronized void addToChangedUris(Collection<String> uris){
changedUris.addAll(uris);
}
public synchronized boolean isReindexRequested() {
return reindexRequested;
}
public synchronized boolean isThereWorkToDo(){
return isReindexRequested() || ! changedUris.isEmpty() ;
}
public boolean isIndexing(){
return indexer.isIndexing();
}
/**
* This is called when the system shuts down.
*/
@ -135,11 +165,11 @@ public class IndexBuilder extends Thread {
public void run() {
while(! stopRequested ){
try{
if( !stopRequested && isReindexRequested() ){
if( !stopRequested && reindexRequested ){
log.debug("full re-index requested");
indexRebuild();
}else if( !stopRequested && isThereWorkToDo() ){
Thread.sleep(250); //wait a bit to let a bit more work to come into the queue
Thread.sleep(500); //wait a bit to let a bit more work to come into the queue
log.debug("work found for IndexBuilder, starting update");
updatedIndex();
}else if( !stopRequested && ! isThereWorkToDo() ){
@ -163,30 +193,47 @@ public class IndexBuilder extends Thread {
/* ******************** non-public methods ************************* */
private synchronized Collection<String> getAndEmptyChangedUris(){
Collection<String> out = changedUris;
changedUris = new HashSet<String>();
return out;
private synchronized Collection<String> getAndEmptyChangedStatements(){
List<Statement> localChangedStmt = null;
synchronized( changedStmtQueue ){
localChangedStmt = new ArrayList<Statement>(changedStmtQueue.size());
localChangedStmt.addAll( changedStmtQueue );
changedStmtQueue.clear();
}
Collection<String> urisToUpdate = new HashSet<String>();
for( Statement stmt : localChangedStmt){
if( stmt == null )
continue;
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){
urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) );
}
}
return urisToUpdate;
}
/**
* Sets updatedUris and deletedUris lists.
* Sets updatedUris and deletedUris lists from the changedStmtQueue.
* updatedUris and deletedUris will only be accessed from the IndexBuilder thread
* so they don't need to be synchronized.
*/
private void makeAddAndDeleteLists( Collection<String> uris){
IndividualDao indDao = wdf.getIndividualDao();
/* clear updateInds and deletedUris. This is the only method that should set these. */
this.updatedInds = new ArrayList<String>();
this.deletedInds = new ArrayList<String>();
this.updatedUris = new LinkedList<String>();
this.deletedUris = new LinkedList<String>();
for( String uri: uris){
if( uri != null ){
try{
Individual ind = wdf.getIndividualDao().getIndividualByURI(uri);
Individual ind = indDao.getIndividualByURI(uri);
if( ind != null)
this.updatedInds.add(uri);
this.updatedUris.add(uri);
else{
log.debug("found delete in changed uris");
this.deletedInds.add(uri);
this.deletedUris.add(uri);
}
} catch(QueryParseException ex){
log.error("could not get Individual "+ uri,ex);
@ -201,12 +248,13 @@ public class IndexBuilder extends Thread {
protected void indexRebuild() {
log.info("Rebuild of search index is starting.");
// clear out changed uris since we are doing a full index rebuild
getAndEmptyChangedUris();
// clear out changed URIs since we are doing a full index rebuild
getAndEmptyChangedStatements();
log.debug("Getting all URIs in the model");
Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator();
this.numberOfThreads = MAX_REINDEX_THREADS;
doBuild(uris, Collections.<String>emptyList() );
if( log != null ) //log might be null if system is shutting down.
@ -214,13 +262,16 @@ public class IndexBuilder extends Thread {
}
protected void updatedIndex() {
log.debug("Starting updateIndex()");
//long since = indexer.getModified() - 60000;
//List updatedUris = wdf.getIndividualDao().getUpdatedSinceIterator(since);
log.debug("Starting updateIndex()");
makeAddAndDeleteLists( getAndEmptyChangedUris() );
makeAddAndDeleteLists( getAndEmptyChangedStatements() );
this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, updatedUris.size() / 20);
doBuild( updatedUris.iterator(), deletedUris );
this.updatedUris = null;
this.deletedUris = null;
doBuild( updatedInds.iterator(), deletedInds );
log.debug("Ending updateIndex()");
}
@ -239,9 +290,8 @@ public class IndexBuilder extends Thread {
* checking if an object is on the index is slow.
*/
private void doBuild(Iterator<String> updates, Collection<String> deletes ){
boolean aborted = false;
boolean newDocs = reindexRequested;
boolean forceNewIndex = reindexRequested;
boolean aborted = false;
boolean updateRequested = ! reindexRequested;
try {
if( reindexRequested )
@ -250,9 +300,8 @@ public class IndexBuilder extends Thread {
indexer.startIndexing();
reindexRequested = false;
if( ! forceNewIndex ){
//if this is not a full reindex, deleted indivdiuals
//need to be removed from the index
if( updateRequested ){
//if this is not a full reindex, deleted indivdiuals need to be removed from the index
for(String deleteMe : deletes ){
try{
indexer.removeFromIndex(deleteMe);
@ -263,7 +312,7 @@ public class IndexBuilder extends Thread {
}
}
indexUriList(updates, newDocs);
indexUriList(updates);
} catch (AbortIndexing abort){
if( log != null)
log.debug("aborting the indexing because thread stop was requested");
@ -284,9 +333,12 @@ public class IndexBuilder extends Thread {
* Use the back end indexer to index each object that the Iterator returns.
* @throws AbortIndexing
*/
private void indexUriList(Iterator<String> updateUris , boolean newDocs) throws AbortIndexing{
private void indexUriList(Iterator<String> updateUris ) throws AbortIndexing{
//make a copy of numberOfThreads so the local copy is safe during this method.
int numberOfThreads = this.numberOfThreads;
if( numberOfThreads > MAX_THREADS )
numberOfThreads = MAX_THREADS;
IndexWorkerThread.setStartTime(System.currentTimeMillis());
//make lists of work URIs for workers
@ -367,10 +419,15 @@ public class IndexBuilder extends Thread {
work.get( counter % workers ).add( uris.next() );
counter ++;
}
log.info("Number of individuals to be indexed : " + counter);
log.debug("Number of individuals to be indexed : " + counter);
return work;
}
protected boolean isThereWorkToDo(){
synchronized( changedStmtQueue ){
return reindexRequested || ! changedStmtQueue.isEmpty() ;
}
}
private class AbortIndexing extends Exception {
// Just a vanilla exception

View file

@ -2,7 +2,6 @@
package edu.cornell.mannlib.vitro.webapp.search.indexing;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@ -16,24 +15,18 @@ import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.shared.Lock;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
/**
* This class is thread safe. Notice that doAsyncIndexBuild() is frequently
* called because the inference system does not seem to send notifyEvents.
*/
public class SearchReindexingListener implements ModelChangedListener {
private IndexBuilder indexBuilder;
private List<AdditionalURIsToIndex> additionalUriFinders;
private IndexBuilder indexBuilder;
public SearchReindexingListener(IndexBuilder indexBuilder, List<AdditionalURIsToIndex> addUrisList ) {
public SearchReindexingListener(IndexBuilder indexBuilder ) {
if(indexBuilder == null )
throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null");
this.indexBuilder = indexBuilder;
if( addUrisList != null )
this.additionalUriFinders = addUrisList;
else
this.additionalUriFinders = Collections.emptyList();
this.indexBuilder = indexBuilder;
}
private synchronized void addChange(Statement stmt){
@ -56,19 +49,8 @@ public class SearchReindexingListener implements ModelChangedListener {
}
log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'");
}
if( stmt.getSubject().isURIResource() ){
indexBuilder.addToChangedUris(stmt.getSubject().getURI());
log.debug("subject: " + stmt.getSubject().getURI());
}
if( stmt.getObject().isURIResource() ){
indexBuilder.addToChangedUris(((Resource) stmt.getObject()).getURI());
}
for( AdditionalURIsToIndex au : additionalUriFinders ){
indexBuilder.addToChangedUris( au.findAdditionalURIsToIndex(stmt) );
}
indexBuilder.addToChanged(stmt);
}
private void requestAsyncIndexUpdate(){

View file

@ -19,7 +19,6 @@ import org.apache.solr.client.solrj.impl.XMLResponseParser;
import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.DatasetFactory;
import com.hp.hpl.jena.tdb.base.file.FileBase;
import edu.cornell.mannlib.vitro.webapp.config.ConfigurationProperties;
import edu.cornell.mannlib.vitro.webapp.dao.DisplayVocabulary;
@ -29,13 +28,12 @@ import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters;
import edu.cornell.mannlib.vitro.webapp.dao.jena.JenaBaseDao;
import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext;
import edu.cornell.mannlib.vitro.webapp.dao.jena.WebappDaoFactoryJena;
import edu.cornell.mannlib.vitro.webapp.search.IndexConstants;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.FileBasedProhibitedFromSearch;
import edu.cornell.mannlib.vitro.webapp.search.beans.IndividualProhibitedFromSearchImpl;
import edu.cornell.mannlib.vitro.webapp.search.beans.ProhibitedFromSearch;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
import edu.cornell.mannlib.vitro.webapp.search.indexing.AdditionalURIsForContextNodes;
import edu.cornell.mannlib.vitro.webapp.search.indexing.AdditionalURIsForDataProperties;
import edu.cornell.mannlib.vitro.webapp.search.indexing.AdditionalURIsForObjectProperties;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder;
import edu.cornell.mannlib.vitro.webapp.search.indexing.SearchReindexingListener;
@ -107,19 +105,20 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
WebappDaoFactory wadf = (WebappDaoFactory) context.getAttribute("webappDaoFactory");
VitroFilters vf = VitroFilterUtils.getPublicFilter(context);
wadf = new WebappDaoFactoryFiltering(wadf, vf);
IndexBuilder builder = new IndexBuilder(context, solrIndexer, wadf );
// to the servlet context so we can access it later in the webapp.
context.setAttribute(IndexBuilder.class.getName(), builder);
//make objects that will find additional URIs for context nodes etc
List<AdditionalURIsToIndex> uriFinders = new ArrayList<AdditionalURIsToIndex>();
//make objects that will find additional URIs for context nodes etc
List<StatementToURIsToUpdate> uriFinders = new ArrayList<StatementToURIsToUpdate>();
uriFinders.add( new AdditionalURIsForDataProperties() );
uriFinders.add( new AdditionalURIsForObjectProperties(jenaOntModel) );
uriFinders.add( new AdditionalURIsForContextNodes(jenaOntModel) );
IndexBuilder builder = new IndexBuilder( solrIndexer, wadf, uriFinders );
// to the servlet context so we can access it later in the webapp.
context.setAttribute(IndexBuilder.class.getName(), builder);
// set up listeners so search index builder is notified of changes to model
ServletContext ctx = sce.getServletContext();
SearchReindexingListener srl = new SearchReindexingListener(builder, uriFinders);
SearchReindexingListener srl = new SearchReindexingListener( builder );
ModelContext.registerListenerForChanges(ctx, srl);
log.info("Setup of Solr index completed.");

View file

@ -20,7 +20,7 @@ import com.hp.hpl.jena.vocabulary.OWL;
import com.hp.hpl.jena.vocabulary.RDFS;
import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary;
import edu.cornell.mannlib.vitro.webapp.search.beans.AdditionalURIsToIndex;
import edu.cornell.mannlib.vitro.webapp.search.beans.StatementToURIsToUpdate;
/**
* @author bdc34
@ -43,7 +43,7 @@ public class AdditionalURIsForClassGroupChangesTest {
OntModel model = ModelFactory.createOntologyModel();
model.read( new StringReader(n3ForPresentationClass), null, "N3");
AdditionalURIsToIndex uriFinder = new AdditionalURIsForClassGroupChanges( model );
StatementToURIsToUpdate uriFinder = new AdditionalURIsForClassGroupChanges( model );
List<String> uris = uriFinder.findAdditionalURIsToIndex(
ResourceFactory.createStatement(
ResourceFactory.createResource("http://vivoweb.org/ontology/core#Presentation"),