VIVO-870 Implement UpdateStatementsTask.
Also add the listener to support it and get rid of the last of the vestigial classes.
This commit is contained in:
parent
0602406c53
commit
dee53e3aac
24 changed files with 845 additions and 1317 deletions
|
@ -3,6 +3,9 @@
|
|||
package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.Application;
|
||||
|
||||
|
@ -13,6 +16,23 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application;
|
|||
* removeListener().
|
||||
*/
|
||||
public interface SearchIndexer extends Application.Module {
|
||||
/**
|
||||
* Update any search documents that are affected by these statements.
|
||||
*
|
||||
* These statements are a mixture of additions and deletions. In either
|
||||
* case, we feed them to the URI finders to see what individuals might have
|
||||
* been affected by the change.
|
||||
*
|
||||
* We accumulate a batch of affected URIs, removing duplicates if they
|
||||
* occur, and then submit them for updates.
|
||||
*
|
||||
* @param urls
|
||||
* if null or empty, this call has no effect.
|
||||
* @throws IllegalStateException
|
||||
* if called after shutdown()
|
||||
*/
|
||||
void scheduleUpdatesForStatements(List<Statement> changes);
|
||||
|
||||
/**
|
||||
* Update the search documents for these URIs.
|
||||
*
|
||||
|
@ -116,7 +136,7 @@ public interface SearchIndexer extends Application.Module {
|
|||
|
||||
START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS,
|
||||
|
||||
REBUILD_REQUESTED, REBUILD_COMPLETE,
|
||||
START_REBUILD, STOP_REBUILD,
|
||||
|
||||
SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE
|
||||
}
|
||||
|
@ -136,6 +156,11 @@ public interface SearchIndexer extends Application.Module {
|
|||
public SearchIndexerStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return type + ", " + status;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
|
@ -55,12 +56,17 @@ public class SearchIndexerStatus {
|
|||
return counts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new SimpleDateFormat().format(since) + ", " + counts;
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// helper classes
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
public enum State {
|
||||
IDLE, PROCESSING_URIS, PROCESSING_STMTS, PREPARING_REBUILD, SHUTDOWN
|
||||
IDLE, PROCESSING_URIS, PROCESSING_STMTS, REBUILDING, SHUTDOWN
|
||||
}
|
||||
|
||||
public abstract static class Counts {
|
||||
|
@ -125,6 +131,11 @@ public class SearchIndexerStatus {
|
|||
return total;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[deleted=" + deleted + ", updated=" + updated
|
||||
+ ", remaining=" + remaining + ", total=" + total + "]";
|
||||
}
|
||||
}
|
||||
|
||||
public static class StatementCounts extends Counts {
|
||||
|
@ -151,25 +162,46 @@ public class SearchIndexerStatus {
|
|||
return total;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[processed=" + processed + ", remaining=" + remaining
|
||||
+ ", total=" + total + "]";
|
||||
}
|
||||
}
|
||||
|
||||
public static class RebuildCounts extends Counts {
|
||||
private final int numberOfIndividuals;
|
||||
private final int documentsBefore;
|
||||
private final int documentsAfter;
|
||||
|
||||
public RebuildCounts(int numberOfIndividuals) {
|
||||
public RebuildCounts(int documentsBefore, int documentsAfter) {
|
||||
super(Type.REBUILD_COUNTS);
|
||||
this.numberOfIndividuals = numberOfIndividuals;
|
||||
this.documentsBefore = documentsBefore;
|
||||
this.documentsAfter = documentsAfter;
|
||||
}
|
||||
|
||||
public int getNumberOfIndividuals() {
|
||||
return numberOfIndividuals;
|
||||
public int getDocumentsBefore() {
|
||||
return documentsBefore;
|
||||
}
|
||||
|
||||
public int getDocumentsAfter() {
|
||||
return documentsAfter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[documentsBefore=" + documentsBefore + ", documentsAfter="
|
||||
+ documentsAfter + "]";
|
||||
}
|
||||
}
|
||||
|
||||
public static class NoCounts extends Counts {
|
||||
public NoCounts() {
|
||||
super(Type.NO_COUNTS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[]";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search;
|
||||
|
||||
public class IndexingException extends java.lang.Exception {
|
||||
public IndexingException(String i){
|
||||
super(i);
|
||||
}
|
||||
public IndexingException(){
|
||||
super();
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search;
|
||||
|
||||
public class SearchException extends Exception{
|
||||
public SearchException(String in){
|
||||
super(in);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,212 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
|
||||
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery.Order;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResponse;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResultDocumentList;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
|
||||
|
||||
|
||||
public class SearchIndexer implements IndexerIface {
|
||||
private final static Log log = LogFactory.getLog(SearchIndexer.class);
|
||||
|
||||
protected SearchEngine server;
|
||||
protected boolean indexing;
|
||||
protected HashSet<String> urisIndexed;
|
||||
|
||||
/**
|
||||
* System is shutting down if true.
|
||||
*/
|
||||
protected boolean shutdownRequested = false;
|
||||
|
||||
/**
|
||||
* This records when a full re-index starts so that once it is done
|
||||
* all the documents in the search index that are earlier than the
|
||||
* reindexStart can be removed.
|
||||
*/
|
||||
protected long reindexStart = 0L;
|
||||
|
||||
/**
|
||||
* If true, then a full index rebuild was requested and reindexStart
|
||||
* will be used to determine what documents to remove from the index
|
||||
* once the re-index is complete.
|
||||
*/
|
||||
protected boolean doingFullIndexRebuild = false;
|
||||
|
||||
public SearchIndexer( SearchEngine server){
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void index(Individual ind) throws IndexingException {
|
||||
if( ! indexing )
|
||||
throw new IndexingException("SearchIndexer: must call " +
|
||||
"startIndexing() before index().");
|
||||
|
||||
if( ind == null ) {
|
||||
log.debug("Individual to index was null, ignoring.");
|
||||
return;
|
||||
}
|
||||
|
||||
try{
|
||||
if( urisIndexed.contains(ind.getURI()) ){
|
||||
log.debug("already indexed " + ind.getURI() );
|
||||
return;
|
||||
}else{
|
||||
SearchInputDocument doc = null;
|
||||
synchronized(this){
|
||||
urisIndexed.add(ind.getURI());
|
||||
}
|
||||
log.debug("indexing " + ind.getURI());
|
||||
// doc = individualToSearchDoc.translate(ind);
|
||||
|
||||
if( doc != null){
|
||||
if( log.isDebugEnabled()){
|
||||
log.info("boost for " + ind.getName() + " is " + doc.getDocumentBoost());
|
||||
log.debug( doc.toString() );
|
||||
}
|
||||
|
||||
server.add( doc );
|
||||
log.debug("Added docs to server.");
|
||||
}else{
|
||||
log.debug("removing from index " + ind.getURI());
|
||||
removeFromIndex(ind.getURI());
|
||||
}
|
||||
}
|
||||
} catch (SearchEngineException ex) {
|
||||
throw new IndexingException(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isIndexing() {
|
||||
return indexing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepareForRebuild() throws IndexingException {
|
||||
reindexStart = System.currentTimeMillis();
|
||||
doingFullIndexRebuild = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeFromIndex(String uri) throws IndexingException {
|
||||
if( uri != null ){
|
||||
try {
|
||||
// server.deleteById(individualToSearchDoc.getIdForUri(uri));
|
||||
log.debug("deleted " + " " + uri);
|
||||
} catch (Exception e) {
|
||||
log.error( "could not delete individual " + uri, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void startIndexing() throws IndexingException {
|
||||
if( indexing)
|
||||
log.debug("SearchIndexer.startIndexing() Indexing in progress, waiting for completion...");
|
||||
while( indexing && ! shutdownRequested ){ //wait for indexing to end.
|
||||
try{ wait( 250 ); }
|
||||
catch(InterruptedException ex){}
|
||||
}
|
||||
|
||||
log.debug("Starting to index");
|
||||
indexing = true;
|
||||
urisIndexed = new HashSet<String>();
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortIndexingAndCleanUp() {
|
||||
shutdownRequested = true;
|
||||
try{
|
||||
// individualToSearchDoc.shutdown();
|
||||
}catch(Exception e){
|
||||
if( log != null)
|
||||
log.debug(e,e);
|
||||
}
|
||||
endIndexing();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void endIndexing() {
|
||||
try {
|
||||
if( doingFullIndexRebuild ){
|
||||
removeDocumentsFromBeforeRebuild( );
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
if( ! shutdownRequested )
|
||||
log.debug("could not remove documents from before build, " ,e);
|
||||
}
|
||||
try {
|
||||
server.commit();
|
||||
} catch (Throwable e) {
|
||||
if( ! shutdownRequested ){
|
||||
log.debug("could not commit to the search engine, " +
|
||||
"this should not be a problem since the search engine will do autocommit");
|
||||
}
|
||||
}
|
||||
indexing = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
protected void removeDocumentsFromBeforeRebuild(){
|
||||
try {
|
||||
server.deleteByQuery("indexedTime:[ * TO " + reindexStart + " ]");
|
||||
server.commit();
|
||||
} catch (SearchEngineException e) {
|
||||
if( ! shutdownRequested )
|
||||
log.error("could not delete documents from before rebuild.",e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getModified() {
|
||||
long modified = 0;
|
||||
|
||||
SearchQuery query = ApplicationUtils.instance().getSearchEngine().createQuery();
|
||||
query.setQuery("*:*");
|
||||
query.addSortField("indexedTime", Order.DESC);
|
||||
|
||||
try {
|
||||
SearchResponse rsp = server.query(query);
|
||||
SearchResultDocumentList docs = rsp.getResults();
|
||||
if(docs!=null){
|
||||
modified = (Long)docs.get(0).getFirstValue("indexedTime");
|
||||
}
|
||||
} catch (SearchEngineException e) {
|
||||
log.error(e,e);
|
||||
}
|
||||
|
||||
return modified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if there are documents in the index, false if there are none,
|
||||
* and returns false on failure to connect to server.
|
||||
*/
|
||||
@Override
|
||||
public boolean isIndexEmpty() {
|
||||
try {
|
||||
return server.documentCount() == 0;
|
||||
} catch (SearchEngineException e) {
|
||||
log.error("Could not connect to the search engine." ,e.getCause());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.beans;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
|
||||
|
||||
/**
|
||||
* IntexerIface is for objects that will be used by the IndexBuilder. The
|
||||
* IndexBuilder will manage getting lists of object to index and then use
|
||||
* an object that implements IndexerIface to stuff the backend index.
|
||||
*
|
||||
* An example is SearchIndexer which is set up and associated with a
|
||||
* IndexBuilder in SearchIndexerSetup.
|
||||
*
|
||||
* @author bdc34
|
||||
*
|
||||
*/
|
||||
public interface IndexerIface {
|
||||
|
||||
/**
|
||||
* Check if indexing is currently running.
|
||||
*/
|
||||
public boolean isIndexing();
|
||||
|
||||
/**
|
||||
* Index a document. This should do an update of the
|
||||
* document in the index of the semantics of the index require it.
|
||||
*
|
||||
* @param doc
|
||||
* @param newDoc - if true, just insert doc, if false attempt to update.
|
||||
* @throws IndexingException
|
||||
*/
|
||||
public void index(Individual ind)throws IndexingException;
|
||||
|
||||
|
||||
/**
|
||||
* Remove a document from the index.
|
||||
* @param obj
|
||||
* @throws IndexingException
|
||||
*/
|
||||
public void removeFromIndex(String uri) throws IndexingException;
|
||||
|
||||
public void prepareForRebuild() throws IndexingException;
|
||||
|
||||
public void startIndexing() throws IndexingException;
|
||||
public void endIndexing();
|
||||
|
||||
public long getModified();
|
||||
|
||||
/**
|
||||
* Ends the indexing and removes any temporary files.
|
||||
* This may be called instead of endIndexing()
|
||||
*/
|
||||
public void abortIndexingAndCleanUp();
|
||||
|
||||
public boolean isIndexEmpty();
|
||||
}
|
|
@ -24,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.Res
|
|||
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.TemplateResponseValues;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts;
|
||||
|
@ -79,20 +80,20 @@ public class IndexController extends FreemarkerHttpServlet {
|
|||
public static final RequestedAction REQUIRED_ACTIONS = SimplePermission.MANAGE_SEARCH_INDEX.ACTION;
|
||||
|
||||
private SearchIndexer indexer;
|
||||
private IndexHistory history;
|
||||
private static IndexHistory history;
|
||||
|
||||
@Override
|
||||
public void init() throws ServletException {
|
||||
super.init();
|
||||
this.indexer = ApplicationUtils.instance().getSearchIndexer();
|
||||
this.history = new IndexHistory();
|
||||
this.indexer.addListener(this.history);
|
||||
super.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
this.indexer.removeListener(this.history);
|
||||
super.destroy();
|
||||
/**
|
||||
* Called by SearchIndexerSetup to provide a history that dates from
|
||||
* startup, not just from servlet load time.
|
||||
*/
|
||||
public static void setHistory(IndexHistory history) {
|
||||
IndexController.history = history;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,6 +187,10 @@ public class IndexController extends FreemarkerHttpServlet {
|
|||
map.put("expectedCompletion",
|
||||
figureExpectedCompletion(status.getSince(),
|
||||
counts.getTotal(), counts.getProcessed()));
|
||||
} else if (state == State.REBUILDING) {
|
||||
RebuildCounts counts = status.getCounts().asRebuildCounts();
|
||||
map.put("documentsBefore", counts.getDocumentsBefore());
|
||||
map.put("documentsAfter", counts.getDocumentsAfter());
|
||||
} else {
|
||||
// nothing for IDLE or SHUTDOWN, except what's already there.
|
||||
}
|
||||
|
@ -217,7 +222,6 @@ public class IndexController extends FreemarkerHttpServlet {
|
|||
long seconds = (elapsedMillis / 1000L) % 60L;
|
||||
long minutes = (elapsedMillis / 60000L) % 60L;
|
||||
long hours = elapsedMillis / 3600000L;
|
||||
return new int[] {(int) hours, (int) minutes, (int) seconds};
|
||||
return new int[] { (int) hours, (int) minutes, (int) seconds };
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,6 +9,9 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
||||
|
@ -23,12 +26,17 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatu
|
|||
* format them for display in a Freemarker template.
|
||||
*/
|
||||
public class IndexHistory implements SearchIndexer.Listener {
|
||||
private static final Log log = LogFactory.getLog(IndexHistory.class);
|
||||
|
||||
private final static int MAX_EVENTS = 10;
|
||||
|
||||
private final Deque<Event> events = new LinkedList<>();
|
||||
|
||||
@Override
|
||||
public void receiveSearchIndexerEvent(Event event) {
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info(event);
|
||||
}
|
||||
synchronized (events) {
|
||||
events.addFirst(event);
|
||||
while (events.size() > MAX_EVENTS) {
|
||||
|
@ -89,7 +97,8 @@ public class IndexHistory implements SearchIndexer.Listener {
|
|||
}
|
||||
|
||||
private void addCounts(RebuildCounts counts, Map<String, Object> map) {
|
||||
map.put("numberOfIndividuals", counts.getNumberOfIndividuals());
|
||||
map.put("documentsBefore", counts.getDocumentsBefore());
|
||||
map.put("documentsAfter", counts.getDocumentsAfter());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.documentBuilding;
|
||||
|
||||
public class SkipIndividualException extends Exception{
|
||||
|
||||
public SkipIndividualException(String string) {
|
||||
super(string);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,546 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.indexing;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpSession;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.query.QueryParseException;
|
||||
import com.hp.hpl.jena.rdf.model.ResourceFactory;
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
||||
|
||||
|
||||
/**
|
||||
* The IndexBuilder is used to rebuild or update a search index.
|
||||
* There should only be one IndexBuilder in a vitro web application.
|
||||
* It uses an implementation of a back-end through an object that
|
||||
* implements IndexerIface. An example of a back-end is SearchIndexer.
|
||||
*
|
||||
* See the class SearchReindexingListener for an example of how a model change
|
||||
* listener can use an IndexBuilder to keep the full text index in sncy with
|
||||
* updates to a model. It calls IndexBuilder.addToChangedUris().
|
||||
*/
|
||||
public class IndexBuilder extends VitroBackgroundThread {
|
||||
private WebappDaoFactory wdf;
|
||||
private final IndexerIface indexer;
|
||||
|
||||
/** Statements that have changed in the model. The SearchReindexingListener
|
||||
* and other similar objects will use methods on IndexBuilder to add statements
|
||||
* to this queue.
|
||||
*/
|
||||
//private final ConcurrentLinkedQueue<Statement> changedStmtQueue = new ConcurrentLinkedQueue<Statement>();
|
||||
private final HashSet<Statement> changedStmts = new HashSet<Statement>();
|
||||
|
||||
/** This is a list of objects that will compute what URIs need to be
|
||||
* updated in the search index when a statement changes. */
|
||||
private final List<IndexingUriFinder> stmtToURIsToIndexFunctions;
|
||||
|
||||
/** Indicates that a full index re-build has been requested. */
|
||||
private volatile boolean reindexRequested = false;
|
||||
|
||||
/** Indicates that a stop of the indexing objects has been requested. */
|
||||
private volatile boolean stopRequested = false;
|
||||
|
||||
/** Indicates that new updates should not be started. */
|
||||
private boolean deferNewUpdates = false;
|
||||
|
||||
/** Length of time to wait before looking for work (if not wakened sooner). */
|
||||
public static final long MAX_IDLE_INTERVAL = 1000 * 60 /* msec */ ;
|
||||
|
||||
/** Length of pause between when work comes into queue to when indexing starts */
|
||||
public static final long WAIT_AFTER_NEW_WORK_INTERVAL = 500; //msec
|
||||
|
||||
/** Flag so we can tell that the index is being updated. */
|
||||
public static final String FLAG_UPDATING = "updating";
|
||||
|
||||
/** Flag so we can tell that the index is being rebuilt. */
|
||||
public static final String FLAG_REBUILDING = "rebuilding";
|
||||
|
||||
/** List of IndexingEventListeners */
|
||||
protected LinkedList<IndexingEventListener> indexingEventListeners =
|
||||
new LinkedList<IndexingEventListener>();
|
||||
|
||||
/** number of threads to use during a full index rebuild. */
|
||||
public static final int REINDEX_THREADS= 10;
|
||||
|
||||
/** Max threads to use during an update. Smaller updates will use fewer threads. */
|
||||
public static final int MAX_UPDATE_THREADS= 10;
|
||||
|
||||
/** Number of individuals to index per update thread. */
|
||||
public static final int URIS_PER_UPDATE_THREAD = 50;
|
||||
|
||||
private static final Log log = LogFactory.getLog(IndexBuilder.class);
|
||||
|
||||
public static IndexBuilder getBuilder(ServletContext ctx) {
|
||||
Object o = ctx.getAttribute(IndexBuilder.class.getName());
|
||||
if (o instanceof IndexBuilder) {
|
||||
return (IndexBuilder) o;
|
||||
} else {
|
||||
log.error("IndexBuilder has not been initialized.");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public IndexBuilder(IndexerIface indexer,
|
||||
WebappDaoFactory wdf,
|
||||
List<IndexingUriFinder> stmtToURIsToIndexFunctions ){
|
||||
super("IndexBuilder");
|
||||
|
||||
this.indexer = indexer;
|
||||
|
||||
this.wdf = wdf;
|
||||
|
||||
if( stmtToURIsToIndexFunctions != null )
|
||||
this.stmtToURIsToIndexFunctions = stmtToURIsToIndexFunctions;
|
||||
else
|
||||
this.stmtToURIsToIndexFunctions = Collections.emptyList();
|
||||
|
||||
this.start();
|
||||
}
|
||||
|
||||
protected IndexBuilder(){
|
||||
//for testing only
|
||||
this( null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to add URIs that need to be indexed. Should be
|
||||
* able to add to changedStmtQueue while indexing is in process.
|
||||
*
|
||||
* If you have a statement that has been added or removed from the
|
||||
* RDF model and you would like it to take effect in the search
|
||||
* index this is the method you should use. Follow the adding of
|
||||
* your changes with a call to doUpdateIndex().
|
||||
*/
|
||||
public void addToChanged(Statement stmt) {
|
||||
log.debug("call to addToChanged(Statement)");
|
||||
synchronized(changedStmts){
|
||||
changedStmts.add(stmt);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method to add a URI to the change queue.
|
||||
*/
|
||||
public void addToChanged(String uri){
|
||||
addToChanged(ResourceFactory.createStatement(
|
||||
ResourceFactory.createResource(uri),
|
||||
ResourceFactory.createProperty("http://ex.com/f"),
|
||||
ResourceFactory.createPlainLiteral("added by IndexBuilder.addToChanged(uri)")));
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will cause the IndexBuilder to completely rebuild
|
||||
* the index.
|
||||
*/
|
||||
public synchronized void doIndexRebuild() {
|
||||
log.debug("call to doIndexRebuild()");
|
||||
//set flag for full index rebuild
|
||||
this.reindexRequested = true;
|
||||
//wake up
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* This will re-index Individuals were added with addToChanged().
|
||||
*/
|
||||
public synchronized void doUpdateIndex() {
|
||||
log.debug("call to doUpdateIndex()");
|
||||
//wake up thread and it will attempt to index anything in changedUris
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a listener for indexing events. Methods on listener will be called when
|
||||
* events happen in the IndexBuilder. This is not a Jena ModelListener.
|
||||
*/
|
||||
public synchronized void addIndexBuilderListener(IndexingEventListener listener){
|
||||
indexingEventListeners.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called when the system shuts down.
|
||||
*/
|
||||
public synchronized void stopIndexingThread() {
|
||||
stopRequested = true;
|
||||
this.notifyAll();
|
||||
this.interrupt();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calling this will cause the IndexBuider to no start a new index update
|
||||
* until unpuase is called. This is intended to allow a large change
|
||||
* without slowing it down with incremental search index updates.
|
||||
*/
|
||||
public synchronized void pause(){
|
||||
this.deferNewUpdates = true;
|
||||
}
|
||||
|
||||
public synchronized void unpause(){
|
||||
if( deferNewUpdates == true ){
|
||||
this.deferNewUpdates = false;
|
||||
this.notifyAll();
|
||||
this.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while(! stopRequested ){
|
||||
try{
|
||||
if ( deferNewUpdates ){
|
||||
log.debug("there is no indexing working to do, waiting for work");
|
||||
synchronized (this) { this.wait(MAX_IDLE_INTERVAL); }
|
||||
}
|
||||
else if ( reindexRequested ){
|
||||
setWorkLevel(WorkLevel.WORKING, FLAG_REBUILDING);
|
||||
log.debug("full re-index requested");
|
||||
|
||||
notifyListeners( IndexingEventListener.EventTypes.START_FULL_REBUILD );
|
||||
indexRebuild();
|
||||
notifyListeners( IndexingEventListener.EventTypes.FINISH_FULL_REBUILD );
|
||||
|
||||
setWorkLevel(WorkLevel.IDLE);
|
||||
}
|
||||
else{
|
||||
boolean workToDo = false;
|
||||
synchronized (changedStmts ){
|
||||
workToDo = !changedStmts.isEmpty();
|
||||
}
|
||||
if( workToDo ){
|
||||
setWorkLevel(WorkLevel.WORKING, FLAG_UPDATING);
|
||||
|
||||
//wait a bit to let a bit more work to come into the queue
|
||||
Thread.sleep(WAIT_AFTER_NEW_WORK_INTERVAL);
|
||||
log.debug("work found for IndexBuilder, starting update");
|
||||
|
||||
notifyListeners( IndexingEventListener.EventTypes.START_UPDATE );
|
||||
updatedIndex();
|
||||
notifyListeners( IndexingEventListener.EventTypes.FINISHED_UPDATE );
|
||||
setWorkLevel(WorkLevel.IDLE);
|
||||
} else {
|
||||
log.debug("there is no indexing working to do, waiting for work");
|
||||
synchronized (this) { this.wait(MAX_IDLE_INTERVAL); }
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.debug("woken up",e);
|
||||
}catch(Throwable e){
|
||||
log.error(e,e);
|
||||
}
|
||||
}
|
||||
|
||||
if( indexer != null)
|
||||
indexer.abortIndexingAndCleanUp();
|
||||
}
|
||||
|
||||
|
||||
public static void checkIndexOnRootLogin(HttpServletRequest req){
|
||||
HttpSession session = req.getSession();
|
||||
ServletContext context = session.getServletContext();
|
||||
IndexBuilder indexBuilder = (IndexBuilder)context.getAttribute(IndexBuilder.class.getName());
|
||||
|
||||
log.debug("Checking if the index is empty");
|
||||
if(indexBuilder.indexer.isIndexEmpty()){
|
||||
log.info("Search index is empty. Running a full index rebuild.");
|
||||
indexBuilder.doIndexRebuild();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ******************** non-public methods ************************* */
|
||||
|
||||
/**
|
||||
* Take the changed statements from the queue and determine which URIs that need to be updated in
|
||||
* the index.
|
||||
*/
|
||||
private Collection<String> changedStatementsToUris(){
|
||||
//inform StatementToURIsToUpdate that index is starting
|
||||
for( IndexingUriFinder stu : stmtToURIsToIndexFunctions ) {
|
||||
stu.startIndexing();
|
||||
}
|
||||
|
||||
//keep uris unique by using a HashSet
|
||||
Collection<String> urisToUpdate = new HashSet<String>();
|
||||
Statement[] changedStatements = getAndClearChangedStmts();
|
||||
int howManyChanges = changedStatements.length;
|
||||
|
||||
if (howManyChanges > 100) {
|
||||
log.info("Finding URIs that are affected by " + howManyChanges
|
||||
+ " changed statements.");
|
||||
}
|
||||
|
||||
for (int i = 0; i < howManyChanges; i++) {
|
||||
Statement stmt = changedStatements[i];
|
||||
for (IndexingUriFinder stu : stmtToURIsToIndexFunctions) {
|
||||
urisToUpdate.addAll(stu.findAdditionalURIsToIndex(stmt));
|
||||
}
|
||||
if ((i > 0) && (i % 1000 == 0)) {
|
||||
log.info("Processed " + i + " changed statements; found "
|
||||
+ urisToUpdate.size() + " affected URIs.");
|
||||
}
|
||||
}
|
||||
|
||||
//inform StatementToURIsToUpdate that they are done
|
||||
for( IndexingUriFinder stu : stmtToURIsToIndexFunctions ) {
|
||||
stu.endIndexing();
|
||||
}
|
||||
|
||||
return urisToUpdate;
|
||||
}
|
||||
|
||||
private Statement[] getAndClearChangedStmts(){
|
||||
//get the statements that changed
|
||||
Statement[] stmts = null;
|
||||
synchronized( changedStmts ){
|
||||
stmts = new Statement[changedStmts.size()];
|
||||
stmts = changedStmts.toArray( stmts );
|
||||
changedStmts.clear();
|
||||
}
|
||||
return stmts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Take the URIs that we got from the changedStmtQueue, and create the lists
|
||||
* of updated URIs and deleted URIs.
|
||||
*/
|
||||
private UriLists makeAddAndDeleteLists(Collection<String> uris) {
|
||||
IndividualDao indDao = wdf.getIndividualDao();
|
||||
|
||||
UriLists uriLists = new UriLists();
|
||||
for (String uri : uris) {
|
||||
if (uri != null) {
|
||||
try {
|
||||
Individual ind = indDao.getIndividualByURI(uri);
|
||||
if (ind != null) {
|
||||
log.debug("uri to update or add to search index: " + uri);
|
||||
uriLists.updatedUris.add(uri);
|
||||
} else {
|
||||
log.debug("found delete in changed uris: " + uri);
|
||||
uriLists.deletedUris.add(uri);
|
||||
}
|
||||
} catch (QueryParseException ex) {
|
||||
log.error("could not get Individual " + uri, ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
return uriLists;
|
||||
}
|
||||
|
||||
/**
|
||||
* This rebuilds the whole index.
|
||||
*/
|
||||
protected void indexRebuild() {
|
||||
log.info("Rebuild of search index is starting.");
|
||||
|
||||
// clear out changed URIs since we are doing a full index rebuild
|
||||
changedStmts.clear();
|
||||
|
||||
log.debug("Getting all URIs in the model");
|
||||
Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator();
|
||||
|
||||
doBuild(uris, Collections.<String>emptyList(), REINDEX_THREADS );
|
||||
|
||||
if( log != null ) //log might be null if system is shutting down.
|
||||
log.info("Rebuild of search index is complete.");
|
||||
}
|
||||
|
||||
protected void updatedIndex() {
|
||||
log.debug("Starting updateIndex()");
|
||||
|
||||
UriLists uriLists = makeAddAndDeleteLists( changedStatementsToUris() );
|
||||
int numberOfThreads =
|
||||
Math.min( MAX_UPDATE_THREADS,
|
||||
Math.max( uriLists.updatedUris.size() / URIS_PER_UPDATE_THREAD, 1));
|
||||
|
||||
doBuild( uriLists.updatedUris.iterator(), uriLists.deletedUris , numberOfThreads);
|
||||
|
||||
log.debug("Ending updateIndex()");
|
||||
}
|
||||
|
||||
/**
|
||||
* For each sourceIterator, get all of the objects and attempt to
|
||||
* index them.
|
||||
*
|
||||
* This takes a list of source Iterators and, for each of these,
|
||||
* calls indexForSource.
|
||||
*
|
||||
* @param sourceIterators
|
||||
* @param newDocs true if we know that the document is new. Set
|
||||
* to false if we want to attempt to remove the object from the index before
|
||||
* attempting to index it. If an object is not on the list but you set this
|
||||
* to false, and a check is made before adding, it will work fine; but
|
||||
* checking if an object is on the index is slow.
|
||||
*/
|
||||
private void doBuild(Iterator<String> updates, Collection<String> deletes, int numberOfThreads ){
|
||||
boolean updateRequested = ! reindexRequested;
|
||||
|
||||
try {
|
||||
if( reindexRequested ){
|
||||
indexer.prepareForRebuild();
|
||||
}
|
||||
|
||||
indexer.startIndexing();
|
||||
reindexRequested = false;
|
||||
|
||||
if( updateRequested ){
|
||||
//if this is not a full reindex, deleted indivdiuals need to be removed from the index
|
||||
for(String deleteMe : deletes ){
|
||||
try{
|
||||
indexer.removeFromIndex(deleteMe);
|
||||
}catch(Exception ex){
|
||||
log.debug("could not remove individual " + deleteMe
|
||||
+ " from index, usually this is harmless",ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
indexUriList(updates, numberOfThreads);
|
||||
|
||||
} catch (Exception e) {
|
||||
if( log != null) log.debug("Exception during indexing",e);
|
||||
}
|
||||
|
||||
indexer.endIndexing();
|
||||
}
|
||||
|
||||
/**
|
||||
* Use the back end indexer to index each object that the Iterator returns.
|
||||
* @throws AbortIndexing
|
||||
*/
|
||||
private void indexUriList(Iterator<String> updateUris , int numberOfThreads) {
|
||||
//make lists of work URIs for workers
|
||||
List<List<String>> workLists = makeWorkerUriLists(updateUris, numberOfThreads);
|
||||
|
||||
//setup workers with work
|
||||
List<IndexWorkerThread> workers = new ArrayList<IndexWorkerThread>();
|
||||
for(int i = 0; i< numberOfThreads ;i++){
|
||||
Iterator<Individual> workToDo = new UriToIndividualIterator(workLists.get(i), wdf);
|
||||
workers.add( new IndexWorkerThread(indexer, i, workToDo) );
|
||||
}
|
||||
|
||||
// reset the counters so we can monitor the progress
|
||||
IndexWorkerThread.resetCounters(System.currentTimeMillis(), figureWorkLoad(workLists));
|
||||
|
||||
log.debug("Starting the building and indexing of documents in worker threads");
|
||||
// starting worker threads
|
||||
for(int i =0; i < numberOfThreads; i++){
|
||||
workers.get(i).start();
|
||||
}
|
||||
|
||||
//waiting for all the work to finish
|
||||
for(int i =0; i < numberOfThreads; i++){
|
||||
try{
|
||||
workers.get(i).join();
|
||||
}catch(InterruptedException e){
|
||||
//this thread will get interrupted if the system is trying to shut down.
|
||||
if( log != null )
|
||||
log.debug(e,e);
|
||||
for( IndexWorkerThread thread: workers){
|
||||
thread.requestStop();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected class UriToIndividualIterator implements Iterator<Individual>{
|
||||
private final Iterator<String> uris;
|
||||
private final WebappDaoFactory wdf;
|
||||
|
||||
public UriToIndividualIterator( Iterator<String> uris, WebappDaoFactory wdf){
|
||||
this.uris= uris;
|
||||
this.wdf = wdf;
|
||||
}
|
||||
|
||||
public UriToIndividualIterator( List<String> uris, WebappDaoFactory wdf){
|
||||
this.uris= uris.iterator();
|
||||
this.wdf = wdf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return uris.hasNext();
|
||||
}
|
||||
|
||||
/** may return null */
|
||||
@Override
|
||||
public Individual next() {
|
||||
String uri = uris.next();
|
||||
return wdf.getIndividualDao().getIndividualByURI(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new IllegalAccessError("");
|
||||
}
|
||||
}
|
||||
|
||||
private static List<List<String>> makeWorkerUriLists(Iterator<String> uris,int workers){
|
||||
List<List<String>> work = new ArrayList<List<String>>(workers);
|
||||
for(int i =0; i< workers; i++){
|
||||
work.add( new ArrayList<String>() );
|
||||
}
|
||||
|
||||
int counter = 0;
|
||||
while(uris.hasNext()){
|
||||
work.get( counter % workers ).add( uris.next() );
|
||||
counter ++;
|
||||
}
|
||||
log.info("Number of individuals to be indexed : " + counter + " by "
|
||||
+ workers + " worker threads.");
|
||||
return work;
|
||||
}
|
||||
|
||||
private long figureWorkLoad(List<List<String>> workLists) {
|
||||
long load = 0;
|
||||
for (List<String> list: workLists) {
|
||||
load += list.size();
|
||||
}
|
||||
return load;
|
||||
}
|
||||
|
||||
public long getCompletedCount() {
|
||||
return IndexWorkerThread.getCount();
|
||||
}
|
||||
|
||||
public long getTotalToDo() {
|
||||
return IndexWorkerThread.getCountToIndex();
|
||||
}
|
||||
|
||||
private static class UriLists {
|
||||
private final List<String> updatedUris = new ArrayList<String>();
|
||||
private final List<String> deletedUris = new ArrayList<String>();
|
||||
}
|
||||
|
||||
protected void notifyListeners(IndexingEventListener.EventTypes event){
|
||||
for ( IndexingEventListener listener : indexingEventListeners ){
|
||||
try{
|
||||
if(listener != null )
|
||||
listener.notifyOfIndexingEvent( event );
|
||||
}catch(Throwable th){
|
||||
log.error("problem during NotifyListeners(): " , th);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,108 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.indexing;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
||||
|
||||
class IndexWorkerThread extends VitroBackgroundThread{
|
||||
private static final Log log = LogFactory.getLog(IndexWorkerThread.class);
|
||||
|
||||
protected final int threadNum;
|
||||
protected final IndexerIface indexer;
|
||||
protected final Iterator<Individual> individualsToIndex;
|
||||
protected volatile boolean stopRequested = false;
|
||||
|
||||
private static AtomicLong countCompleted= new AtomicLong();
|
||||
private static AtomicLong countToIndex= new AtomicLong();
|
||||
private static long starttime = 0;
|
||||
|
||||
public IndexWorkerThread(IndexerIface indexer, int threadNum , Iterator<Individual> individualsToIndex){
|
||||
super("IndexWorkerThread"+threadNum);
|
||||
this.indexer = indexer;
|
||||
this.threadNum = threadNum;
|
||||
this.individualsToIndex = individualsToIndex;
|
||||
}
|
||||
|
||||
public void requestStop(){
|
||||
stopRequested = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(){
|
||||
setWorkLevel(WorkLevel.WORKING, "indexing " + individualsToIndex + " individuals");
|
||||
|
||||
while( ! stopRequested ){
|
||||
|
||||
//do the actual indexing work
|
||||
log.debug("work found for Worker number " + threadNum);
|
||||
addDocsToIndex();
|
||||
|
||||
// done so shut this thread down.
|
||||
stopRequested = true;
|
||||
}
|
||||
setWorkLevel(WorkLevel.IDLE);
|
||||
|
||||
log.debug("Worker number " + threadNum + " exiting.");
|
||||
}
|
||||
|
||||
protected void addDocsToIndex() {
|
||||
|
||||
while( individualsToIndex.hasNext() ){
|
||||
//need to stop right away if requested to
|
||||
if( stopRequested ) return;
|
||||
try{
|
||||
//build the document and add it to the index
|
||||
Individual ind = null;
|
||||
try {
|
||||
ind = individualsToIndex.next();
|
||||
indexer.index( ind );
|
||||
} catch (IndexingException e) {
|
||||
if( stopRequested )
|
||||
return;
|
||||
|
||||
if( ind != null )
|
||||
log.error("Could not index individual " + ind.getURI() , e );
|
||||
else
|
||||
log.warn("Could not index, individual was null");
|
||||
}
|
||||
|
||||
|
||||
long countNow = countCompleted.incrementAndGet();
|
||||
if( log.isInfoEnabled() ){
|
||||
if( (countNow % 100 ) == 0 ){
|
||||
long dt = (System.currentTimeMillis() - starttime);
|
||||
log.info("individuals indexed: " + countNow + " in " + dt + " msec " +
|
||||
" time per individual = " + (dt / countNow) + " msec" );
|
||||
}
|
||||
}
|
||||
}catch(Throwable th){
|
||||
//on tomcat shutdown odd exceptions get thrown
|
||||
if( ! stopRequested )
|
||||
log.error("Exception during index building",th);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void resetCounters(long time, long workload) {
|
||||
IndexWorkerThread.starttime = time;
|
||||
IndexWorkerThread.countToIndex.set(workload);
|
||||
IndexWorkerThread.countCompleted.set(0);
|
||||
}
|
||||
|
||||
public static long getCount() {
|
||||
return countCompleted.get();
|
||||
}
|
||||
|
||||
public static long getCountToIndex() {
|
||||
return countToIndex.get();
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
package edu.cornell.mannlib.vitro.webapp.search.indexing;
|
||||
|
||||
/**
|
||||
* Classes that implement this interface can get informed of
|
||||
* events that happen to the IndexBuilder.
|
||||
*/
|
||||
public interface IndexingEventListener {
|
||||
|
||||
public enum EventTypes {
|
||||
START_UPDATE,
|
||||
FINISHED_UPDATE,
|
||||
START_FULL_REBUILD,
|
||||
FINISH_FULL_REBUILD
|
||||
}
|
||||
|
||||
|
||||
public void notifyOfIndexingEvent(EventTypes ie);
|
||||
}
|
|
@ -1,171 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.indexing;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Model;
|
||||
import com.hp.hpl.jena.rdf.model.ModelChangedListener;
|
||||
import com.hp.hpl.jena.rdf.model.ModelFactory;
|
||||
import com.hp.hpl.jena.rdf.model.Resource;
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
import com.hp.hpl.jena.rdf.model.StmtIterator;
|
||||
import com.hp.hpl.jena.shared.Lock;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
|
||||
|
||||
/**
|
||||
* This class is thread safe. Notice that doAsyncIndexBuild() is frequently
|
||||
* called because the inference system does not seem to send notifyEvents.
|
||||
*/
|
||||
public class SearchReindexingListener implements ModelChangedListener {
|
||||
private IndexBuilder indexBuilder;
|
||||
|
||||
/** Model just for creating statements */
|
||||
private Model statementCreator;
|
||||
|
||||
public SearchReindexingListener(IndexBuilder indexBuilder ) {
|
||||
if(indexBuilder == null )
|
||||
throw new IllegalArgumentException("Constructor parameter indexBuilder must not be null");
|
||||
this.indexBuilder = indexBuilder;
|
||||
this.statementCreator = ModelFactory.createDefaultModel();
|
||||
log.debug("new SearchReindexingListener");
|
||||
}
|
||||
|
||||
private synchronized void addChange(Statement stmt){
|
||||
if( stmt == null ) return;
|
||||
if( log.isDebugEnabled() ){
|
||||
String sub="unknown";
|
||||
String pred = "unknown";
|
||||
String obj ="unknown";
|
||||
|
||||
if( stmt.getSubject().isURIResource() ){
|
||||
sub = stmt.getSubject().getURI();
|
||||
}
|
||||
if( stmt.getPredicate() != null ){
|
||||
pred = stmt.getPredicate().getURI();
|
||||
}
|
||||
if( stmt.getObject().isURIResource() ){
|
||||
obj = ((Resource) (stmt.getObject().as(Resource.class))).getURI();
|
||||
}else{
|
||||
obj = stmt.getObject().toString();
|
||||
}
|
||||
log.debug("changed statement: sub='" + sub + "' pred='" + pred +"' obj='" + obj + "'");
|
||||
}
|
||||
|
||||
indexBuilder.addToChanged( copyStmt(stmt) );
|
||||
}
|
||||
|
||||
private void requestAsyncIndexUpdate(){
|
||||
log.debug("requestAsyncIndexUpdate()");
|
||||
indexBuilder.doUpdateIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a copy of the statement to make sure that
|
||||
* it is not associated with any Models. This is
|
||||
* to avoid the Models being held back from GC.
|
||||
*/
|
||||
private Statement copyStmt( Statement in){
|
||||
return statementCreator.createStatement(in.getSubject(),in.getPredicate(),in.getObject());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyEvent(Model arg0, Object arg1) {
|
||||
if ( (arg1 instanceof EditEvent) ){
|
||||
EditEvent editEvent = (EditEvent)arg1;
|
||||
if( !editEvent.getBegin() ){// editEvent is the end of an edit
|
||||
log.debug("Doing search index build at end of EditEvent");
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
} else{
|
||||
log.debug("ignoring event " + arg1.getClass().getName() + " "+ arg1 );
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addedStatement(Statement stmt) {
|
||||
addChange(stmt);
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatement(Statement stmt){
|
||||
addChange(stmt);
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
private static final Log log = LogFactory.getLog(SearchReindexingListener.class.getName());
|
||||
|
||||
@Override
|
||||
public void addedStatements(Statement[] arg0) {
|
||||
for( Statement s: arg0){
|
||||
addChange(s);
|
||||
}
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addedStatements(List<Statement> arg0) {
|
||||
for( Statement s: arg0){
|
||||
addChange(s);
|
||||
}
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addedStatements(StmtIterator arg0) {
|
||||
try{
|
||||
while(arg0.hasNext()){
|
||||
Statement s = arg0.nextStatement();
|
||||
addChange(s);
|
||||
}
|
||||
}finally{
|
||||
arg0.close();
|
||||
}
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addedStatements(Model m) {
|
||||
m.enterCriticalSection(Lock.READ);
|
||||
StmtIterator it = null;
|
||||
try{
|
||||
it = m.listStatements();
|
||||
while(it.hasNext()){
|
||||
addChange(it.nextStatement());
|
||||
}
|
||||
}finally{
|
||||
if( it != null ) it.close();
|
||||
m.leaveCriticalSection();
|
||||
}
|
||||
requestAsyncIndexUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatements(Statement[] arg0) {
|
||||
//same as add stmts
|
||||
this.addedStatements(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatements(List<Statement> arg0) {
|
||||
//same as add
|
||||
this.addedStatements(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatements(StmtIterator arg0) {
|
||||
//same as add
|
||||
this.addedStatements(arg0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatements(Model arg0) {
|
||||
//same as add
|
||||
this.addedStatements(arg0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Model;
|
||||
import com.hp.hpl.jena.rdf.model.ModelFactory;
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
import com.hp.hpl.jena.rdf.model.StmtIterator;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
|
||||
|
||||
/**
|
||||
* When a change is heard, wait for an interval to see if more changes come in.
|
||||
* When changes stop coming in for a specified interval, send what has
|
||||
* accumulated.
|
||||
*/
|
||||
public class IndexingChangeListener implements ChangeListener {
|
||||
private static final Log log = LogFactory
|
||||
.getLog(IndexingChangeListener.class);
|
||||
|
||||
private final SearchIndexer searchIndexer;
|
||||
private final Ticker ticker;
|
||||
|
||||
/** All access to the list must be synchronized. */
|
||||
private final List<Statement> changes;
|
||||
|
||||
public IndexingChangeListener(SearchIndexer searchIndexer) {
|
||||
this.searchIndexer = searchIndexer;
|
||||
this.ticker = new Ticker();
|
||||
this.changes = new ArrayList<>();
|
||||
}
|
||||
|
||||
private synchronized void noteChange(Statement stmt) {
|
||||
changes.add(stmt);
|
||||
ticker.start();
|
||||
}
|
||||
|
||||
private synchronized void respondToTicker() {
|
||||
searchIndexer.scheduleUpdatesForStatements(changes);
|
||||
changes.clear();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
ticker.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addedStatement(String serializedTriple, String graphURI) {
|
||||
noteChange(parseTriple(serializedTriple));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatement(String serializedTriple, String graphURI) {
|
||||
noteChange(parseTriple(serializedTriple));
|
||||
}
|
||||
|
||||
/**
|
||||
* We only care about events that signal the end of an edit operation.
|
||||
*/
|
||||
@Override
|
||||
public void notifyEvent(String graphURI, Object event) {
|
||||
if ((event instanceof EditEvent)) {
|
||||
EditEvent editEvent = (EditEvent) event;
|
||||
if (!editEvent.getBegin()) { // editEvent is the end of an edit
|
||||
log.debug("Doing search index build at end of EditEvent");
|
||||
ticker.start();
|
||||
}
|
||||
} else {
|
||||
log.debug("ignoring event " + event.getClass().getName() + " "
|
||||
+ event);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO avoid overhead of Model.
|
||||
// TODO avoid duplication with JenaChangeListener
|
||||
private Statement parseTriple(String serializedTriple) {
|
||||
try {
|
||||
Model m = ModelFactory.createDefaultModel();
|
||||
m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")),
|
||||
null, "N3");
|
||||
StmtIterator sit = m.listStatements();
|
||||
if (!sit.hasNext()) {
|
||||
throw new RuntimeException("no triple parsed from change event");
|
||||
} else {
|
||||
Statement s = sit.nextStatement();
|
||||
if (sit.hasNext()) {
|
||||
log.warn("More than one triple parsed from change event");
|
||||
}
|
||||
return s;
|
||||
}
|
||||
} catch (RuntimeException riot) {
|
||||
log.error("Failed to parse triple " + serializedTriple, riot);
|
||||
throw riot;
|
||||
} catch (UnsupportedEncodingException uee) {
|
||||
throw new RuntimeException(uee);
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// helper classes
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* The ticker will ask for a response after two ticks, unless it is started
|
||||
* again before the second one.
|
||||
*
|
||||
* <pre>
|
||||
* On a call to start():
|
||||
* Start the timer unless it is already running.
|
||||
* Reset the hasOneTick flag.
|
||||
*
|
||||
* When the timer expires:
|
||||
* If the timer hasOneTick, we're done: call for a response.
|
||||
* Otherwise, record that it hasOneTick, and keep the timer running.
|
||||
* </pre>
|
||||
*
|
||||
* All methods are synchronized on the enclosing IndexingChangeListener.
|
||||
*/
|
||||
private class Ticker {
|
||||
private final ScheduledExecutorService queue;
|
||||
private volatile boolean running;
|
||||
private volatile boolean hasOneTick;
|
||||
|
||||
public Ticker() {
|
||||
this.queue = Executors.newScheduledThreadPool(1,
|
||||
new VitroBackgroundThread.Factory(
|
||||
"IndexingChangeListener_Ticker"));
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
synchronized (IndexingChangeListener.this) {
|
||||
this.queue.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
synchronized (IndexingChangeListener.this) {
|
||||
if (!running) {
|
||||
startTicker();
|
||||
}
|
||||
hasOneTick = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void startTicker() {
|
||||
if (queue.isShutdown()) {
|
||||
log.warn("Attempt to start ticker after shutdown request.");
|
||||
} else {
|
||||
queue.schedule(new TickerResponse(), 500, TimeUnit.MILLISECONDS);
|
||||
running = true;
|
||||
}
|
||||
}
|
||||
|
||||
private class TickerResponse implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (IndexingChangeListener.this) {
|
||||
running = false;
|
||||
if (hasOneTick) {
|
||||
respondToTicker();
|
||||
hasOneTick = false;
|
||||
} else {
|
||||
startTicker();
|
||||
hasOneTick = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,6 +26,8 @@ import javax.servlet.ServletContext;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
|
||||
|
@ -40,6 +42,7 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentMod
|
|||
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.RebuildIndexTask;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.UpdateStatementsTask;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.UpdateUrisTask;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoader;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoaderException;
|
||||
|
@ -105,16 +108,48 @@ public class SearchIndexerImpl implements SearchIndexer {
|
|||
return new WebappDaoFactoryFiltering(rawWadf, vf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleUpdatesForStatements(List<Statement> changes) {
|
||||
if (changes == null || changes.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (taskQueue.isShutdown()) {
|
||||
throw new IllegalStateException("SearchIndexer is shut down.");
|
||||
}
|
||||
|
||||
Task task = new UpdateStatementsTask(changes, uriFinders, excluders,
|
||||
modifiers, wadf.getIndividualDao(), listeners, pool);
|
||||
scheduler.scheduleTask(task);
|
||||
log.debug("Scheduled updates for " + changes.size() + " statements.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scheduleUpdatesForUris(Collection<String> uris) {
|
||||
log.debug("Schedule updates for " + uris.size() + " uris.");
|
||||
scheduler.scheduleTask(new UpdateUrisTask(uris, excluders, modifiers,
|
||||
wadf.getIndividualDao(), listeners, pool));
|
||||
if (uris == null || uris.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (taskQueue.isShutdown()) {
|
||||
throw new IllegalStateException("SearchIndexer is shut down.");
|
||||
}
|
||||
|
||||
Task task = new UpdateUrisTask(uris, excluders, modifiers,
|
||||
wadf.getIndividualDao(), listeners, pool);
|
||||
scheduler.scheduleTask(task);
|
||||
log.debug("Scheduled updates for " + uris.size() + " uris.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rebuildIndex() {
|
||||
scheduler.scheduleTask(new RebuildIndexTask());
|
||||
if (taskQueue.isShutdown()) {
|
||||
throw new IllegalStateException("SearchIndexer is shut down.");
|
||||
}
|
||||
|
||||
Task task = new RebuildIndexTask(excluders, modifiers,
|
||||
wadf.getIndividualDao(), listeners, pool);
|
||||
scheduler.scheduleTask(task);
|
||||
log.debug("Scheduled a full rebuild.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -148,6 +183,7 @@ public class SearchIndexerImpl implements SearchIndexer {
|
|||
if (status.getState() != State.SHUTDOWN) {
|
||||
listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status));
|
||||
|
||||
pool.shutdown();
|
||||
taskQueue.shutdown();
|
||||
|
||||
for (DocumentModifier dm : modifiers) {
|
||||
|
@ -230,7 +266,8 @@ public class SearchIndexerImpl implements SearchIndexer {
|
|||
paused = false;
|
||||
for (Task task : deferredQueue) {
|
||||
taskQueue.scheduleTask(task);
|
||||
log.debug("moved task from deferred queue to task queue: " + task);
|
||||
log.debug("moved task from deferred queue to task queue: "
|
||||
+ task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -275,6 +312,10 @@ public class SearchIndexerImpl implements SearchIndexer {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isShutdown() {
|
||||
return queue.isShutdown();
|
||||
}
|
||||
|
||||
/** When this wrapper is run, we will know the current task and status. */
|
||||
private class TaskWrapper implements Runnable {
|
||||
private final Task task;
|
||||
|
|
|
@ -2,11 +2,6 @@
|
|||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex;
|
||||
|
||||
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletContextEvent;
|
||||
import javax.servlet.ServletContextListener;
|
||||
|
@ -14,92 +9,75 @@ import javax.servlet.ServletContextListener;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.ontology.OntModel;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext;
|
||||
import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.SearchIndexer;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.indexing.SearchReindexingListener;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.Application;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
|
||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
|
||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.controller.IndexController;
|
||||
import edu.cornell.mannlib.vitro.webapp.search.controller.IndexHistory;
|
||||
import edu.cornell.mannlib.vitro.webapp.startup.ComponentStartupStatusImpl;
|
||||
import edu.cornell.mannlib.vitro.webapp.startup.StartupStatus;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoader;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoaderException;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.developer.Key;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisabledModelChangeListener;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisabledChangeListener;
|
||||
|
||||
/**
|
||||
* TODO A silly implementation that just wraps the old IndexBuilder with a new
|
||||
* SearchIndexerImpl.
|
||||
* Start the SearchIndexer. Create a listener on the RDFService and link it to
|
||||
* the indexer. Create a history object as a listener and make it avaiable to
|
||||
* the IndexController.
|
||||
*/
|
||||
public class SearchIndexerSetup implements ServletContextListener {
|
||||
private static final Log log = LogFactory.getLog(SearchIndexerSetup.class);
|
||||
|
||||
private ServletContext ctx;
|
||||
private OntModel displayModel;
|
||||
private ConfigurationBeanLoader beanLoader;
|
||||
private Application app;
|
||||
private SearchIndexer searchIndexer;
|
||||
private IndexingChangeListener listener;
|
||||
private DeveloperDisabledChangeListener listenerWrapper;
|
||||
private IndexHistory history;
|
||||
|
||||
@Override
|
||||
public void contextInitialized(ServletContextEvent sce) {
|
||||
this.ctx = sce.getServletContext();
|
||||
this.displayModel = ModelAccess.on(ctx).getOntModel(DISPLAY);
|
||||
this.beanLoader = new ConfigurationBeanLoader(displayModel, ctx);
|
||||
ctx = sce.getServletContext();
|
||||
app = ApplicationUtils.instance();
|
||||
|
||||
ServletContext context = sce.getServletContext();
|
||||
StartupStatus ss = StartupStatus.getBean(context);
|
||||
SearchEngine searchEngine = ApplicationUtils.instance()
|
||||
.getSearchEngine();
|
||||
StartupStatus ss = StartupStatus.getBean(ctx);
|
||||
|
||||
{ // >>>>> TODO
|
||||
try {
|
||||
// /* setup search indexer */
|
||||
// SearchIndexer searchIndexer = new SearchIndexer(searchEngine,
|
||||
// indToSearchDoc);
|
||||
//
|
||||
// // Make the IndexBuilder
|
||||
// IndexBuilder builder = new IndexBuilder(searchIndexer, wadf,
|
||||
// uriFinders);
|
||||
//
|
||||
// // Create listener to notify index builder of changes to model
|
||||
// // (can be disabled by developer setting.)
|
||||
// ModelContext
|
||||
// .registerListenerForChanges(
|
||||
// context,
|
||||
// new DeveloperDisabledModelChangeListener(
|
||||
// new SearchReindexingListener(builder),
|
||||
// Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER));
|
||||
//
|
||||
// ss.info(this, "Setup of search indexer completed.");
|
||||
//
|
||||
} catch (Throwable e) {
|
||||
ss.fatal(this, "could not setup search engine", e);
|
||||
}
|
||||
try {
|
||||
searchIndexer = app.getSearchIndexer();
|
||||
|
||||
listener = new IndexingChangeListener(searchIndexer);
|
||||
|
||||
listenerWrapper = new DeveloperDisabledChangeListener(listener,
|
||||
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER);
|
||||
RDFServiceUtils.getRDFServiceFactory(ctx).registerListener(
|
||||
listenerWrapper);
|
||||
|
||||
this.history = new IndexHistory();
|
||||
searchIndexer.addListener(this.history);
|
||||
IndexController.setHistory(this.history);
|
||||
|
||||
searchIndexer
|
||||
.startup(app, new ComponentStartupStatusImpl(this, ss));
|
||||
|
||||
ss.info(this, "Setup of search indexer completed.");
|
||||
} catch (RDFServiceException e) {
|
||||
ss.fatal(this, "Failed to register the model changed listener.", e);
|
||||
}
|
||||
ApplicationUtils
|
||||
.instance()
|
||||
.getSearchIndexer()
|
||||
.startup(ApplicationUtils.instance(),
|
||||
new ComponentStartupStatusImpl(this, ss));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void contextDestroyed(ServletContextEvent sce) {
|
||||
ApplicationUtils.instance().getSearchIndexer()
|
||||
.shutdown(ApplicationUtils.instance());
|
||||
searchIndexer.shutdown(app);
|
||||
|
||||
{ // >>>>> TODO
|
||||
IndexBuilder builder = (IndexBuilder) sce.getServletContext()
|
||||
.getAttribute(IndexBuilder.class.getName());
|
||||
if (builder != null)
|
||||
builder.stopIndexingThread();
|
||||
searchIndexer.removeListener(this.history);
|
||||
|
||||
try {
|
||||
RDFServiceUtils.getRDFServiceFactory(ctx).unregisterListener(
|
||||
listenerWrapper);
|
||||
} catch (RDFServiceException e) {
|
||||
log.warn("Failed to unregister the indexing listener.");
|
||||
}
|
||||
listener.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -7,23 +7,37 @@ import java.util.List;
|
|||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
/**
|
||||
* Interface to use with IndexBuilder to find more URIs to index given a changed statement.
|
||||
* The statement may have been added or removed from the model.
|
||||
* Interface to use with IndexBuilder to find more URIs to index given a changed
|
||||
* statement. The statement may have been added or removed from the model.
|
||||
*
|
||||
* Implementing classes must be threadsafe, as multiple threads are used to
|
||||
* complete the task.
|
||||
*
|
||||
* The life-cycle is as follows: startIndexing(), 0 or more calls to
|
||||
* findAdditionalURIsToIndex(), endIndexing().
|
||||
*
|
||||
* Repeat as desired.
|
||||
*/
|
||||
public interface IndexingUriFinder {
|
||||
|
||||
/**
|
||||
* For the domain that is the responsibility of the given implementation,
|
||||
* calculate the URIs that need to be updated in the search index.
|
||||
* The URIs in the list will be updated by the IndexBuilder, which will
|
||||
* handle URIs of new individuals, URIs of individuals that have changes,
|
||||
* and URIs of individuals that have been removed from the model.
|
||||
*
|
||||
* @return List of URIs.
|
||||
*/
|
||||
List<String> findAdditionalURIsToIndex(Statement stmt);
|
||||
|
||||
void startIndexing();
|
||||
|
||||
void endIndexing();
|
||||
|
||||
/**
|
||||
* For the domain that is the responsibility of the given implementation,
|
||||
* calculate the URIs that need to be updated in the search index. The URIs
|
||||
* in the list will be updated by the IndexBuilder, which will handle URIs
|
||||
* of new individuals, URIs of individuals that have changes, and URIs of
|
||||
* individuals that have been removed from the model.
|
||||
*
|
||||
* @return List of URIs. Never return null.
|
||||
*/
|
||||
List<String> findAdditionalURIsToIndex(Statement stmt);
|
||||
|
||||
/**
|
||||
* Indicates that a new collection of statements is about to be processed.
|
||||
*/
|
||||
void startIndexing();
|
||||
|
||||
/**
|
||||
* Indicates that the collection of statements being processed is complete.
|
||||
*/
|
||||
void endIndexing();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
|
||||
|
||||
/**
|
||||
* Ask all of the URI Finders to find URIs that might be affected by this
|
||||
* statement.
|
||||
*/
|
||||
public class FindUrisForStatementWorkUnit implements Runnable {
|
||||
private final Statement stmt;
|
||||
private final Collection<IndexingUriFinder> uriFinders;
|
||||
private final Set<String> uris;
|
||||
|
||||
public FindUrisForStatementWorkUnit(Statement stmt,
|
||||
Collection<IndexingUriFinder> uriFinders) {
|
||||
this.stmt = stmt;
|
||||
this.uriFinders = uriFinders;
|
||||
this.uris = new HashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (IndexingUriFinder uriFinder : uriFinders) {
|
||||
uris.addAll(uriFinder.findAdditionalURIsToIndex(stmt));
|
||||
}
|
||||
}
|
||||
|
||||
public Statement getStatement() {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
public Set<String> getUris() {
|
||||
return uris;
|
||||
}
|
||||
|
||||
}
|
|
@ -2,40 +2,132 @@
|
|||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
|
||||
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_REBUILD;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_REBUILD;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.REBUILDING;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
|
||||
|
||||
/**
|
||||
* TODO
|
||||
* Get the URIs of all individuals in the model. Update each of their search
|
||||
* documents.
|
||||
*
|
||||
* Delete all search documents that have not been updated since this rebuild
|
||||
* began. That removes all obsolete documents from the index.
|
||||
*/
|
||||
public class RebuildIndexTask implements Task {
|
||||
private static final Log log = LogFactory.getLog(RebuildIndexTask.class);
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see java.lang.Runnable#run()
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
// TODO Auto-generated method stub
|
||||
throw new RuntimeException("RebuildIndexTask.run() not implemented.");
|
||||
private final IndividualDao indDao;
|
||||
private final List<SearchIndexExcluder> excluders;
|
||||
private final List<DocumentModifier> modifiers;
|
||||
private final ListenerList listeners;
|
||||
private final WorkerThreadPool pool;
|
||||
private final SearchEngine searchEngine;
|
||||
|
||||
private final Date requestedAt;
|
||||
private final int documentsBefore;
|
||||
|
||||
private volatile SearchIndexerStatus status;
|
||||
|
||||
public RebuildIndexTask(Collection<SearchIndexExcluder> excluders,
|
||||
Collection<DocumentModifier> modifiers, IndividualDao indDao,
|
||||
ListenerList listeners, WorkerThreadPool pool) {
|
||||
this.excluders = new ArrayList<>(excluders);
|
||||
this.modifiers = new ArrayList<>(modifiers);
|
||||
this.indDao = indDao;
|
||||
this.listeners = listeners;
|
||||
this.pool = pool;
|
||||
|
||||
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
|
||||
|
||||
this.requestedAt = new Date();
|
||||
this.documentsBefore = getDocumentCount();
|
||||
this.status = buildStatus(REBUILDING, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
listeners.fireEvent(new Event(START_REBUILD, status));
|
||||
|
||||
Collection<String> uris = getAllUrisInTheModel();
|
||||
updateTheUris(uris);
|
||||
deleteOutdatedDocuments();
|
||||
|
||||
status = buildStatus(REBUILDING, getDocumentCount());
|
||||
listeners.fireEvent(new Event(STOP_REBUILD, status));
|
||||
}
|
||||
|
||||
private Collection<String> getAllUrisInTheModel() {
|
||||
return indDao.getAllIndividualUris();
|
||||
}
|
||||
|
||||
private void updateTheUris(Collection<String> uris) {
|
||||
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool)
|
||||
.run();
|
||||
}
|
||||
|
||||
private void deleteOutdatedDocuments() {
|
||||
String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]";
|
||||
try {
|
||||
searchEngine.deleteByQuery(query);
|
||||
searchEngine.commit();
|
||||
} catch (SearchEngineException e) {
|
||||
log.warn("Failed to delete outdated documents "
|
||||
+ "from the search index", e);
|
||||
}
|
||||
}
|
||||
|
||||
private int getDocumentCount() {
|
||||
try {
|
||||
return searchEngine.documentCount();
|
||||
} catch (SearchEngineException e) {
|
||||
log.warn("Failed to get docoument count from the search index.", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private SearchIndexerStatus buildStatus(State state, int documentsAfter) {
|
||||
return new SearchIndexerStatus(state, new Date(), new RebuildCounts(
|
||||
documentsBefore, documentsAfter));
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task#getStatus()
|
||||
*/
|
||||
@Override
|
||||
public SearchIndexerStatus getStatus() {
|
||||
// TODO Auto-generated method stub
|
||||
throw new RuntimeException(
|
||||
"RebuildIndexTask.getStatus() not implemented.");
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyWorkUnitCompletion(Runnable workUnit) {
|
||||
// TODO Auto-generated method stub
|
||||
throw new RuntimeException("RebuildIndexTask.notifyWorkUnitCompletion() not implemented.");
|
||||
|
||||
// We don't submit any work units, so we won't see any calls to this.
|
||||
log.error("Why was this called?");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RebuildIndexTask[requestedAt="
|
||||
+ new SimpleDateFormat().format(requestedAt) + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
|
||||
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PROGRESS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_PROCESSING_STATEMENTS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_STATEMENTS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_STMTS;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import com.hp.hpl.jena.rdf.model.Statement;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
|
||||
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
|
||||
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
|
||||
|
||||
/**
|
||||
* Receive a collection of statements that have been added to the model, or
|
||||
* delete from it.
|
||||
*
|
||||
* Find the URIs of search documents that may have been affected by these
|
||||
* changes, and update those documents.
|
||||
*
|
||||
* -------------------
|
||||
*
|
||||
* It would be nice to stream this whole thing, finding the URIs affected by
|
||||
* each statement and updating those documents before proceding. However, it is
|
||||
* very common for several statements within a group to affect the same
|
||||
* document, so that method would result in rebuilding the document several
|
||||
* times.
|
||||
*
|
||||
* Instead, we final all of the URIs affected by all statements, store them in a
|
||||
* Set to remove duplicates, and then process the URIs in the set.
|
||||
*/
|
||||
public class UpdateStatementsTask implements Task {
|
||||
private static final Log log = LogFactory
|
||||
.getLog(UpdateStatementsTask.class);
|
||||
|
||||
private final List<Statement> changes;
|
||||
private final Set<IndexingUriFinder> uriFinders;
|
||||
private final Set<SearchIndexExcluder> excluders;
|
||||
private final Set<DocumentModifier> modifiers;
|
||||
private final IndividualDao indDao;
|
||||
private final ListenerList listeners;
|
||||
private final WorkerThreadPool pool;
|
||||
|
||||
private final Set<String> uris;
|
||||
private final Status status;
|
||||
|
||||
public UpdateStatementsTask(List<Statement> changes,
|
||||
Set<IndexingUriFinder> uriFinders,
|
||||
Set<SearchIndexExcluder> excluders,
|
||||
Set<DocumentModifier> modifiers, IndividualDao indDao,
|
||||
ListenerList listeners, WorkerThreadPool pool) {
|
||||
this.changes = new ArrayList<>(changes);
|
||||
this.uriFinders = uriFinders;
|
||||
this.excluders = excluders;
|
||||
this.modifiers = modifiers;
|
||||
this.indDao = indDao;
|
||||
this.listeners = listeners;
|
||||
this.pool = pool;
|
||||
|
||||
this.uris = Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
this.status = new Status(changes.size(), 200, listeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
listeners
|
||||
.fireEvent(new Event(START_PROCESSING_STATEMENTS, getStatus()));
|
||||
|
||||
findAffectedUris();
|
||||
listeners.fireEvent(new Event(PROGRESS, getStatus()));
|
||||
|
||||
updateTheUris();
|
||||
listeners.fireEvent(new Event(STOP_PROCESSING_STATEMENTS, getStatus()));
|
||||
}
|
||||
|
||||
private void findAffectedUris() {
|
||||
tellFindersWeAreStarting();
|
||||
|
||||
for (Statement stmt : changes) {
|
||||
if (isInterrupted()) {
|
||||
log.info("Interrupted: " + status.getSearchIndexerStatus());
|
||||
return;
|
||||
} else {
|
||||
findUrisForStatement(stmt);
|
||||
}
|
||||
}
|
||||
waitForWorkUnitsToComplete();
|
||||
|
||||
tellFindersWeAreStopping();
|
||||
}
|
||||
|
||||
private void tellFindersWeAreStarting() {
|
||||
log.debug("Tell finders we are starting.");
|
||||
for (IndexingUriFinder uriFinder : uriFinders) {
|
||||
uriFinder.startIndexing();
|
||||
}
|
||||
}
|
||||
|
||||
private void tellFindersWeAreStopping() {
|
||||
log.debug("Tell finders we are stopping.");
|
||||
for (IndexingUriFinder uriFinder : uriFinders) {
|
||||
uriFinder.endIndexing();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isInterrupted() {
|
||||
if (Thread.interrupted()) {
|
||||
Thread.currentThread().interrupt();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void findUrisForStatement(Statement stmt) {
|
||||
Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders);
|
||||
pool.submit(workUnit, this);
|
||||
log.debug("scheduled uri finders for " + stmt);
|
||||
}
|
||||
|
||||
private void waitForWorkUnitsToComplete() {
|
||||
pool.waitUntilIdle();
|
||||
}
|
||||
|
||||
private void updateTheUris() {
|
||||
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool)
|
||||
.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SearchIndexerStatus getStatus() {
|
||||
return status.getSearchIndexerStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyWorkUnitCompletion(Runnable workUnit) {
|
||||
FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit;
|
||||
|
||||
Set<String> foundUris = worker.getUris();
|
||||
Statement stmt = worker.getStatement();
|
||||
log.debug("Found " + foundUris.size() + " uris for statement: " + stmt);
|
||||
|
||||
uris.addAll(foundUris);
|
||||
status.incrementProcessed();
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// Helper classes
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* A thread-safe collection of status information. All methods are
|
||||
* synchronized.
|
||||
*/
|
||||
private static class Status {
|
||||
private final int total;
|
||||
private final int progressInterval;
|
||||
private final ListenerList listeners;
|
||||
private int processed = 0;
|
||||
private Date since = new Date();
|
||||
|
||||
public Status(int total, int progressInterval, ListenerList listeners) {
|
||||
this.total = total;
|
||||
this.progressInterval = progressInterval;
|
||||
this.listeners = listeners;
|
||||
}
|
||||
|
||||
public synchronized void incrementProcessed() {
|
||||
processed++;
|
||||
since = new Date();
|
||||
maybeFireProgressEvent();
|
||||
}
|
||||
|
||||
private void maybeFireProgressEvent() {
|
||||
if (processed > 0 && processed % progressInterval == 0) {
|
||||
listeners.fireEvent(new Event(PROGRESS,
|
||||
getSearchIndexerStatus()));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
|
||||
int remaining = total - processed;
|
||||
return new SearchIndexerStatus(PROCESSING_STMTS, since,
|
||||
new StatementCounts(processed, remaining, total));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
|
||||
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PROGRESS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_PROCESSING_URIS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_URIS;
|
||||
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS;
|
||||
|
@ -67,7 +68,7 @@ public class UpdateUrisTask implements Task {
|
|||
this.listeners = listeners;
|
||||
this.pool = pool;
|
||||
|
||||
this.status = new Status(uris.size());
|
||||
this.status = new Status(uris.size(), 200, listeners);
|
||||
|
||||
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
|
||||
}
|
||||
|
@ -90,10 +91,19 @@ public class UpdateUrisTask implements Task {
|
|||
}
|
||||
}
|
||||
pool.waitUntilIdle();
|
||||
commitChanges();
|
||||
listeners.fireEvent(new Event(STOP_PROCESSING_URIS, status
|
||||
.getSearchIndexerStatus()));
|
||||
}
|
||||
|
||||
private void commitChanges() {
|
||||
try {
|
||||
searchEngine.commit();
|
||||
} catch (SearchEngineException e) {
|
||||
log.error("Failed to commit the changes.");
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isInterrupted() {
|
||||
if (Thread.interrupted()) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -170,17 +180,22 @@ public class UpdateUrisTask implements Task {
|
|||
*/
|
||||
private static class Status {
|
||||
private final int total;
|
||||
private final int progressInterval;
|
||||
private final ListenerList listeners;
|
||||
private int updated = 0;
|
||||
private int deleted = 0;
|
||||
private Date since = new Date();
|
||||
|
||||
public Status(int total) {
|
||||
public Status(int total, int progressInterval, ListenerList listeners) {
|
||||
this.total = total;
|
||||
this.progressInterval = progressInterval;
|
||||
this.listeners = listeners;
|
||||
}
|
||||
|
||||
public synchronized void incrementUpdates() {
|
||||
updated++;
|
||||
since = new Date();
|
||||
maybeFireProgressEvent();
|
||||
}
|
||||
|
||||
public synchronized void incrementDeletes() {
|
||||
|
@ -188,6 +203,13 @@ public class UpdateUrisTask implements Task {
|
|||
since = new Date();
|
||||
}
|
||||
|
||||
private void maybeFireProgressEvent() {
|
||||
if (updated > 0 && updated % progressInterval == 0) {
|
||||
listeners.fireEvent(new Event(PROGRESS,
|
||||
getSearchIndexerStatus()));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
|
||||
int remaining = total - updated - deleted;
|
||||
return new SearchIndexerStatus(PROCESSING_URIS, since,
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.utils.developer.listeners;
|
||||
|
||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.developer.DeveloperSettings;
|
||||
import edu.cornell.mannlib.vitro.webapp.utils.developer.Key;
|
||||
|
||||
/**
|
||||
* If a particular developer flag is NOT set to true, this is transparent.
|
||||
*
|
||||
* Set the flag and this becomes opaque, passing no events through.
|
||||
*/
|
||||
public class DeveloperDisabledChangeListener implements ChangeListener {
|
||||
private final ChangeListener inner;
|
||||
private final Key disablingKey;
|
||||
|
||||
public DeveloperDisabledChangeListener(ChangeListener inner,
|
||||
Key disablingKey) {
|
||||
this.inner = inner;
|
||||
this.disablingKey = disablingKey;
|
||||
}
|
||||
|
||||
private boolean isEnabled() {
|
||||
return !DeveloperSettings.getInstance().getBoolean(disablingKey);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// Delegated methods.
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public void addedStatement(String serializedTriple, String graphURI) {
|
||||
if (isEnabled()) {
|
||||
inner.addedStatement(serializedTriple, graphURI);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removedStatement(String serializedTriple, String graphURI) {
|
||||
if (isEnabled()) {
|
||||
inner.removedStatement(serializedTriple, graphURI);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyEvent(String graphURI, Object event) {
|
||||
if (isEnabled()) {
|
||||
inner.notifyEvent(graphURI, event);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||
|
||||
package edu.cornell.mannlib.vitro.webapp.search.indexing;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import edu.cornell.mannlib.vitro.testing.AbstractTestClass;
|
||||
|
||||
|
||||
public class IndexBuilderThreadTest extends AbstractTestClass {
|
||||
@Ignore
|
||||
@Test
|
||||
public void testStoppingTheThread(){
|
||||
setLoggerLevel(IndexBuilder.class, Level.OFF);
|
||||
|
||||
IndexBuilder ib = new IndexBuilder();
|
||||
Assert.assertNotSame(Thread.State.NEW, ib.getState() );
|
||||
Assert.assertNotSame(Thread.State.TERMINATED, ib.getState() );
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
ib.stopIndexingThread();
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
Assert.assertFalse(ib.isAlive());
|
||||
Assert.assertSame(Thread.State.TERMINATED, ib.getState() );
|
||||
}
|
||||
}
|
|
@ -18,8 +18,9 @@
|
|||
<p><@showIndexerCounts "STATEMENT_COUNTS", status /></p>
|
||||
<p><@showElapsedTime status.elapsed /> Expected completion ${status.expectedCompletion?datetime}.</p>
|
||||
|
||||
<#elseif status.statusType = "PREPARING_REBUILD">
|
||||
<h3>The search indexer has been preparing to rebuild the index since ${status.since?datetime}</h3>
|
||||
<#elseif status.statusType = "REBUILDING">
|
||||
<h3>The search indexer has been rebuilding the index since ${status.since?datetime}</h3>
|
||||
<p><@showIndexerCounts "REBUILD_COUNTS", status /></p>
|
||||
|
||||
<#else>
|
||||
<h3>The search indexer status is: ${status.statusType}
|
||||
|
@ -67,6 +68,11 @@
|
|||
<#elseif countsType == "STATEMENT_COUNTS">
|
||||
Processed: ${counts.processed}, remaining: ${counts.remaining}, total: ${counts.total}
|
||||
<#elseif countsType == "REBUILD_COUNTS">
|
||||
Number of individuals before rebuild: ${counts.numberOfIndividuals}
|
||||
Number of document before rebuild: ${counts.documentsBefore}, after rebuild:
|
||||
<#if counts.documentsAfter == 0>
|
||||
UNKNOWN
|
||||
<#else>
|
||||
${counts.documentsAfter}
|
||||
</#if>
|
||||
</#if>
|
||||
</#macro>
|
||||
|
|
Loading…
Add table
Reference in a new issue