Merge branch 'grahamtriggs-gt-defer-and-reader' into develop

This commit is contained in:
Jim Blake 2015-02-17 12:05:39 -05:00
commit 680953115f
8 changed files with 658 additions and 510 deletions

View file

@ -40,10 +40,18 @@ public interface SearchIndexer extends Application.Module {
* unpause(). Fires a PAUSED event to listeners. * unpause(). Fires a PAUSED event to listeners.
* *
* This call has no effect if already paused, or if called after shutdown. * This call has no effect if already paused, or if called after shutdown.
*/ */
void pause(); void pause();
/** /**
* Stop processing new tasks. Requests will be ignored and the index rebuilt when unpaused.
* Fires a PAUSED event to listeners.
*
* This call has no effect if already paused, or if called after shutdown.
*/
void pauseWithoutDeferring();
/**
* Resume processing new tasks. Any requests that were received since the * Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing. Fires an UNPAUSED * call to pause() will now be scheduled for processing. Fires an UNPAUSED
* event to listeners. * event to listeners.

View file

@ -104,7 +104,7 @@ public class ABoxRecomputer {
} }
try { try {
if (searchIndexer != null) { if (searchIndexer != null) {
searchIndexer.pause(); searchIndexer.pauseWithoutDeferring();
} }
recomputeABox(); recomputeABox();
} finally { } finally {

View file

@ -9,30 +9,27 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils; import com.hp.hpl.jena.graph.Triple;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Literal;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent; import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.RiotReader;
/** /**
* When a change is heard, wait for an interval to see if more changes come in. * When a change is heard, wait for an interval to see if more changes come in.
@ -65,7 +62,7 @@ public class IndexingChangeListener implements ChangeListener,
private final SearchIndexer searchIndexer; private final SearchIndexer searchIndexer;
private final Ticker ticker; private final Ticker ticker;
private volatile boolean paused = true; private final Model defaultModel;
/** All access to the list must be synchronized. */ /** All access to the list must be synchronized. */
private final List<Statement> changes; private final List<Statement> changes;
@ -73,82 +70,28 @@ public class IndexingChangeListener implements ChangeListener,
public IndexingChangeListener(SearchIndexer searchIndexer) { public IndexingChangeListener(SearchIndexer searchIndexer) {
this.searchIndexer = searchIndexer; this.searchIndexer = searchIndexer;
this.ticker = new Ticker(); this.ticker = new Ticker();
this.defaultModel = ModelFactory.createDefaultModel();
this.changes = new ArrayList<>(); this.changes = new ArrayList<>();
searchIndexer.addListener(this); searchIndexer.addListener(this);
} }
private synchronized void noteChange(Statement stmt) { private synchronized void noteChange(Statement stmt) {
try { changes.add(stmt);
changes.add(sanitize(stmt)); ticker.start();
if (!paused) {
ticker.start();
}
} catch (Exception e) {
log.warn("Failed to sanitize this statement: " + stmt);
}
}
private Statement sanitize(Statement rawStmt) {
return ResourceFactory.createStatement(
sanitizeSubject(rawStmt.getSubject()),
sanitizePredicate(rawStmt.getPredicate()),
sanitizeObject(rawStmt.getObject()));
}
private Resource sanitizeSubject(Resource rawSubject) {
if (rawSubject.isURIResource()) {
return ResourceFactory.createResource(rawSubject.getURI());
}
return ResourceFactory.createResource();
}
private Property sanitizePredicate(Property rawPredicate) {
return ResourceFactory.createProperty(rawPredicate.getURI());
}
private RDFNode sanitizeObject(RDFNode rawObject) {
if (rawObject.isURIResource()) {
return ResourceFactory.createResource(rawObject.asResource()
.getURI());
}
if (rawObject.isResource()) {
return ResourceFactory.createResource();
}
Literal l = rawObject.asLiteral();
if (StringUtils.isNotEmpty(l.getLanguage())) {
return ResourceFactory.createLangLiteral(l.getString(),
l.getLanguage());
}
if (null != l.getDatatype()) {
return ResourceFactory.createTypedLiteral(l.getValue());
}
return ResourceFactory.createPlainLiteral(l.getString());
} }
@Override @Override
public void receiveSearchIndexerEvent(Event event) { public void receiveSearchIndexerEvent(Event event) {
if (event.getType() == PAUSE) {
paused = true;
} else if (event.getType() == UNPAUSE) {
paused = false;
ticker.start();
} else if (event.getType() == START_REBUILD) {
discardChanges();
}
} }
private synchronized void respondToTicker() { private synchronized void respondToTicker() {
if (!paused && !changes.isEmpty()) { if (!changes.isEmpty()) {
searchIndexer.scheduleUpdatesForStatements(changes); searchIndexer.scheduleUpdatesForStatements(changes);
changes.clear(); changes.clear();
} }
} }
private synchronized void discardChanges() {
changes.clear();
}
public void shutdown() { public void shutdown() {
ticker.shutdown(); ticker.shutdown();
} }
@ -176,33 +119,38 @@ public class IndexingChangeListener implements ChangeListener,
} }
} else { } else {
log.debug("ignoring event " + event.getClass().getName() + " " log.debug("ignoring event " + event.getClass().getName() + " "
+ event); + event);
} }
} }
// TODO avoid overhead of Model.
// TODO avoid duplication with JenaChangeListener // TODO avoid duplication with JenaChangeListener
private Statement parseTriple(String serializedTriple) { private Statement parseTriple(String serializedTriple) {
try { try {
Model m = ModelFactory.createDefaultModel(); // Use RiotReader to parse a Triple
m.read(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " .";
null, "N3"); Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null);
StmtIterator sit = m.listStatements();
if (!sit.hasNext()) { if (it.hasNext()) {
throw new RuntimeException("no triple parsed from change event"); Triple triple = it.next();
} else {
Statement s = sit.nextStatement(); if (it.hasNext()) {
if (sit.hasNext()) { log.warn("More than one triple parsed from change event");
log.warn("More than one triple parsed from change event"); }
}
return s; // Use the retained defaultModel instance to convert the Triple to a Statement
} // This does not add the Statement to the Model, so the Statement can be disposed when unused
} catch (RuntimeException riot) { // And whilst the Model is attached to the Statement, using a single instance means only one Model
log.error("Failed to parse triple " + serializedTriple, riot); // is created and attached to all of the Statements created by this instance
throw riot; return defaultModel.asStatement(triple);
} catch (UnsupportedEncodingException uee) { } else {
throw new RuntimeException(uee); throw new RuntimeException("no triple parsed from change event");
} }
} catch (RuntimeException riot) {
log.error("Failed to parse triple " + serializedTriple, riot);
throw riot;
} catch (UnsupportedEncodingException uee) {
throw new RuntimeException(uee);
}
} }
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------

View file

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -86,7 +87,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private final ListenerList listeners = new ListenerList(); private final ListenerList listeners = new ListenerList();
private final TaskQueue taskQueue = new TaskQueue(); private final TaskQueue taskQueue = new TaskQueue();
private final Scheduler scheduler = new Scheduler(taskQueue); private final Scheduler scheduler = new Scheduler(this, taskQueue);
private Integer threadPoolSize; private Integer threadPoolSize;
private WorkerThreadPool pool; private WorkerThreadPool pool;
@ -97,6 +98,9 @@ public class SearchIndexerImpl implements SearchIndexer {
private Set<IndexingUriFinder> uriFinders; private Set<IndexingUriFinder> uriFinders;
private WebappDaoFactory wadf; private WebappDaoFactory wadf;
private boolean ignoreTasksWhilePaused = false;
private boolean rebuildOnUnpause = false;
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
// ConfigurationBeanLoader methods. // ConfigurationBeanLoader methods.
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@ -197,16 +201,32 @@ public class SearchIndexerImpl implements SearchIndexer {
@Override @Override
public void pause() { public void pause() {
if (!isPaused() && !isShutdown()) { if (!isPaused() && !isShutdown()) {
ignoreTasksWhilePaused = false;
rebuildOnUnpause = false;
scheduler.pause(); scheduler.pause();
fireEvent(PAUSE); fireEvent(PAUSE);
} }
} }
@Override
public void pauseWithoutDeferring() {
if (!isPaused() && !isShutdown()) {
ignoreTasksWhilePaused = true;
rebuildOnUnpause = false;
scheduler.pause();
fireEvent(PAUSE);
}
}
@Override @Override
public void unpause() { public void unpause() {
if (isPaused() && !isShutdown()) { if (isPaused() && !isShutdown()) {
scheduler.unpause(); scheduler.unpause();
fireEvent(UNPAUSE); fireEvent(UNPAUSE);
if (rebuildOnUnpause) {
rebuildOnUnpause = false;
rebuildIndex();
}
} }
} }
@ -244,11 +264,12 @@ public class SearchIndexerImpl implements SearchIndexer {
if (changes == null || changes.isEmpty()) { if (changes == null || changes.isEmpty()) {
return; return;
} }
if (ignoreTasksWhilePaused && isPaused()) {
rebuildOnUnpause = true;
return;
}
Task task = new UpdateStatementsTask(changes, createFindersList(), scheduler.scheduleTask(new UpdateStatementsTask(new IndexerConfigImpl(this), changes));
createExcludersList(), createModifiersList(),
wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled updates for " + changes.size() + " statements."); log.debug("Scheduled updates for " + changes.size() + " statements.");
} }
@ -261,10 +282,12 @@ public class SearchIndexerImpl implements SearchIndexer {
if (uris == null || uris.isEmpty()) { if (uris == null || uris.isEmpty()) {
return; return;
} }
if (ignoreTasksWhilePaused && isPaused()) {
rebuildOnUnpause = true;
return;
}
Task task = new UpdateUrisTask(uris, createExcludersList(), scheduler.scheduleTask(new UpdateUrisTask(new IndexerConfigImpl(this), uris));
createModifiersList(), wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled updates for " + uris.size() + " uris."); log.debug("Scheduled updates for " + uris.size() + " uris.");
} }
@ -273,10 +296,12 @@ public class SearchIndexerImpl implements SearchIndexer {
if (isShutdown()) { if (isShutdown()) {
log.warn("Call to rebuildIndex after shutdown."); log.warn("Call to rebuildIndex after shutdown.");
} }
if (ignoreTasksWhilePaused && isPaused()) {
rebuildOnUnpause = true;
return;
}
Task task = new RebuildIndexTask(createExcludersList(), scheduler.scheduleTask(new RebuildIndexTask(new IndexerConfigImpl(this)));
createModifiersList(), wadf.getIndividualDao(), listeners, pool);
scheduler.scheduleTask(task);
log.debug("Scheduled a full rebuild."); log.debug("Scheduled a full rebuild.");
} }
@ -373,10 +398,12 @@ public class SearchIndexerImpl implements SearchIndexer {
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<Task> deferredQueue;
private final SearchIndexerImpl indexer;
private volatile boolean started; private volatile boolean started;
private volatile boolean paused; private volatile boolean paused;
public Scheduler(TaskQueue taskQueue) { public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) {
this.indexer = indexer;
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
this.deferredQueue = new ArrayList<Task>(); this.deferredQueue = new ArrayList<Task>();
} }
@ -390,13 +417,13 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
public synchronized void scheduleTask(Task task) { public synchronized void scheduleTask(Task task) {
if (paused || !started) { if (paused || !started) {
deferredQueue.add(task); deferredQueue.add(task);
log.debug("added task to deferred queue: " + task); log.debug("added task to deferred queue: " + task);
} else { } else {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("added task to task queue: " + task); log.debug("added task to task queue: " + task);
} }
} }
public synchronized void start() { public synchronized void start() {
@ -419,10 +446,12 @@ public class SearchIndexerImpl implements SearchIndexer {
private void processDeferredTasks() { private void processDeferredTasks() {
for (Task task : deferredQueue) { for (Task task : deferredQueue) {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("moved task from deferred queue to task queue: " log.debug("moved task from deferred queue to task queue: " + task);
+ task);
} }
// Empty out the deferred queue as we've now processed it
deferredQueue.clear();
} }
} }
@ -525,6 +554,36 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
/**
* Interface for tasks to access the Indexer config
*/
public static interface IndexerConfig {
public IndexingUriFinderList uriFinderList();
public SearchIndexExcluderList excluderList();
public DocumentModifierList documentModifierList();
public IndividualDao individualDao();
public ListenerList listenerList();
public WorkerThreadPool workerThreadPool();
}
/**
* Implementation of IndexerConfig
* Defers access to the configuration until the task is running, so a Task
* created and deferred before the indexer is started will not cause a NullPointerException
*/
private static class IndexerConfigImpl implements IndexerConfig {
private final SearchIndexerImpl sii;
public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; }
public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); }
public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); }
public DocumentModifierList documentModifierList() { return sii.createModifiersList(); }
public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); }
public ListenerList listenerList() { return sii.listeners; }
public WorkerThreadPool workerThreadPool() { return sii.pool; }
}
public static interface Task extends Runnable { public static interface Task extends Runnable {
public SearchIndexerStatus getStatus(); public SearchIndexerStatus getStatus();
@ -629,4 +688,28 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
private static class StatementList {
List<Statement> changes;
public StatementList() {
changes = new ArrayList<Statement>();
}
public synchronized void addStatement(Statement stmt) {
changes.add(stmt);
}
public synchronized List<Statement> getStatements() {
try {
return new ArrayList<>(changes);
} finally {
changes.clear();
}
}
public synchronized int size() {
return changes.size();
}
};
} }

View file

@ -22,6 +22,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -37,117 +38,150 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud
*/ */
public class RebuildIndexTask implements Task { public class RebuildIndexTask implements Task {
private static final Log log = LogFactory.getLog(RebuildIndexTask.class); private static final Log log = LogFactory.getLog(RebuildIndexTask.class);
private final IndividualDao indDao;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final SearchEngine searchEngine;
private final Date requestedAt; private final Date requestedAt;
private final int documentsBefore;
private volatile SearchIndexerStatus status; private final IndexerConfig config;
private RebuildIndexTaskImpl impl;
public RebuildIndexTask(SearchIndexExcluderList excluders, public RebuildIndexTask(IndexerConfig config) {
DocumentModifierList modifiers, IndividualDao indDao, this.config = config;
ListenerList listeners, WorkerThreadPool pool) { this.requestedAt = new Date();
this.excluders = excluders; }
this.modifiers = modifiers;
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
this.requestedAt = new Date();
this.documentsBefore = getDocumentCount();
this.status = buildStatus(REBUILDING, 0);
}
@Override @Override
public void run() { public void run() {
listeners.fireEvent(new Event(START_REBUILD, status)); impl = new RebuildIndexTaskImpl(config, requestedAt);
impl.run();
Collection<String> uris = getAllUrisInTheModel();
if (!isInterrupted()) {
updateTheUris(uris);
if (!isInterrupted()) {
deleteOutdatedDocuments();
}
}
status = buildStatus(REBUILDING, getDocumentCount());
listeners.fireEvent(new Event(STOP_REBUILD, status));
}
private boolean isInterrupted() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private Collection<String> getAllUrisInTheModel() {
return indDao.getAllIndividualUris();
}
private void updateTheUris(Collection<String> uris) {
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool)
.run();
}
private void deleteOutdatedDocuments() {
String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]";
try {
searchEngine.deleteByQuery(query);
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete outdated documents from the search index: "
+ "the search engine is not responding.");
} catch (SearchEngineException e) {
log.warn("Failed to delete outdated documents "
+ "from the search index", e);
}
}
private int getDocumentCount() {
try {
return searchEngine.documentCount();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to get document count from the search index: "
+ "the search engine is not responding.");
return 0;
} catch (SearchEngineException e) {
log.warn("Failed to get document count from the search index.", e);
return 0;
}
}
private SearchIndexerStatus buildStatus(State state, int documentsAfter) {
return new SearchIndexerStatus(state, new Date(), new RebuildCounts(
documentsBefore, documentsAfter));
} }
@Override @Override
public SearchIndexerStatus getStatus() { public SearchIndexerStatus getStatus() {
return status; return impl == null ? null : impl.getStatus();
} }
@Override @Override
public void notifyWorkUnitCompletion(Runnable workUnit) { public void notifyWorkUnitCompletion(Runnable workUnit) {
// We don't submit any work units, so we won't see any calls to this. if (impl != null) {
log.error("Why was this called?"); impl.notifyWorkUnitCompletion(workUnit);
}
} }
@Override @Override
public String toString() { public String toString() {
return "RebuildIndexTask[requestedAt=" return "RebuildIndexTask[requestedAt=" + new SimpleDateFormat().format(requestedAt) + "]";
+ new SimpleDateFormat().format(requestedAt) + "]";
} }
private static class RebuildIndexTaskImpl implements Task {
private final IndexerConfig config;
private final IndividualDao indDao;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final SearchEngine searchEngine;
private final Date requestedAt;
private final int documentsBefore;
private volatile SearchIndexerStatus status;
public RebuildIndexTaskImpl(IndexerConfig config, Date requestedAt) {
this.config = config;
this.excluders = config.excluderList();
this.modifiers = config.documentModifierList();
this.indDao = config.individualDao();
this.listeners = config.listenerList();
this.pool = config.workerThreadPool();
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
this.requestedAt = requestedAt;
this.documentsBefore = getDocumentCount();
this.status = buildStatus(REBUILDING, 0);
}
@Override
public void run() {
listeners.fireEvent(new Event(START_REBUILD, status));
Collection<String> uris = getAllUrisInTheModel();
if (!isInterrupted()) {
updateTheUris(uris);
if (!isInterrupted()) {
deleteOutdatedDocuments();
}
}
status = buildStatus(REBUILDING, getDocumentCount());
listeners.fireEvent(new Event(STOP_REBUILD, status));
}
private boolean isInterrupted() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private Collection<String> getAllUrisInTheModel() {
return indDao.getAllIndividualUris();
}
private void updateTheUris(Collection<String> uris) {
UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool);
}
private void deleteOutdatedDocuments() {
String query = "indexedTime:[ * TO " + requestedAt.getTime() + " ]";
try {
searchEngine.deleteByQuery(query);
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete outdated documents from the search index: "
+ "the search engine is not responding.");
} catch (SearchEngineException e) {
log.warn("Failed to delete outdated documents "
+ "from the search index", e);
}
}
private int getDocumentCount() {
try {
return searchEngine.documentCount();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to get document count from the search index: "
+ "the search engine is not responding.");
return 0;
} catch (SearchEngineException e) {
log.warn("Failed to get document count from the search index.", e);
return 0;
}
}
private SearchIndexerStatus buildStatus(State state, int documentsAfter) {
return new SearchIndexerStatus(state, new Date(), new RebuildCounts(
documentsBefore, documentsAfter));
}
@Override
public SearchIndexerStatus getStatus() {
return status;
}
@Override
public void notifyWorkUnitCompletion(Runnable workUnit) {
// We don't submit any work units, so we won't see any calls to this.
log.error("Why was this called?");
}
@Override
public String toString() {
return "RebuildIndexTask[requestedAt="
+ new SimpleDateFormat().format(requestedAt) + "]";
}
}
} }

View file

@ -14,6 +14,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -23,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.StatementCounts;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -49,146 +51,169 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinderLi
* Set to remove duplicates, and then process the URIs in the set. * Set to remove duplicates, and then process the URIs in the set.
*/ */
public class UpdateStatementsTask implements Task { public class UpdateStatementsTask implements Task {
private static final Log log = LogFactory private static final Log log = LogFactory.getLog(UpdateStatementsTask.class);
.getLog(UpdateStatementsTask.class);
private final List<Statement> changes; private final IndexerConfig config;
private final IndexingUriFinderList uriFinders; private UpdateStatementsTaskImpl impl;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final IndividualDao indDao;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final Set<String> uris; private List<Statement> changes;
private final Status status;
public UpdateStatementsTask(List<Statement> changes, public UpdateStatementsTask(IndexerConfig config, List<Statement> changes) {
IndexingUriFinderList uriFinders, this.config = config;
SearchIndexExcluderList excluders, DocumentModifierList modifiers, this.changes = new ArrayList<>(changes);
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { }
this.changes = new ArrayList<>(changes);
this.uriFinders = uriFinders;
this.excluders = excluders;
this.modifiers = modifiers;
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.uris = Collections.synchronizedSet(new HashSet<String>()); @Override
public void run() {
impl = new UpdateStatementsTaskImpl(config, changes);
impl.run();
}
@Override
public SearchIndexerStatus getStatus() {
return impl == null ? null : impl.getStatus();
}
this.status = new Status(changes.size(), 500, listeners); @Override
} public void notifyWorkUnitCompletion(Runnable workUnit) {
if (impl != null) {
impl.notifyWorkUnitCompletion(workUnit);
}
}
@Override private static class UpdateStatementsTaskImpl implements Task {
public void run() { private final List<Statement> changes;
listeners.fireEvent(new Event(START_STATEMENTS, getStatus())); private final IndexingUriFinderList uriFinders;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final IndividualDao indDao;
private final ListenerList listeners;
private final WorkerThreadPool pool;
findAffectedUris(); private final Set<String> uris;
private final Status status;
updateTheUris(); public UpdateStatementsTaskImpl(IndexerConfig config, List<Statement> changes) {
listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus())); this.changes = changes;
} this.uriFinders = config.uriFinderList();
this.excluders = config.excluderList();
this.modifiers = config.documentModifierList();
this.indDao = config.individualDao();
this.listeners = config.listenerList();
this.pool = config.workerThreadPool();
private void findAffectedUris() { this.uris = Collections.synchronizedSet(new HashSet<String>());
log.debug("Tell finders we are starting.");
uriFinders.startIndexing();
for (Statement stmt : changes) { this.status = new Status(changes.size(), 500, listeners);
if (isInterrupted()) { }
log.info("Interrupted: " + status.getSearchIndexerStatus());
return;
} else {
findUrisForStatement(stmt);
}
}
waitForWorkUnitsToComplete();
log.debug("Tell finders we are stopping."); @Override
uriFinders.stopIndexing(); public void run() {
} listeners.fireEvent(new Event(START_STATEMENTS, getStatus()));
private boolean isInterrupted() { findAffectedUris();
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private void findUrisForStatement(Statement stmt) { updateTheUris();
Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders); listeners.fireEvent(new Event(STOP_STATEMENTS, getStatus()));
pool.submit(workUnit, this); }
log.debug("scheduled uri finders for " + stmt);
}
private void waitForWorkUnitsToComplete() { private void findAffectedUris() {
pool.waitUntilIdle(); log.debug("Tell finders we are starting.");
} uriFinders.startIndexing();
private void updateTheUris() { for (Statement stmt : changes) {
new UpdateUrisTask(uris, excluders, modifiers, indDao, listeners, pool) if (isInterrupted()) {
.run(); log.info("Interrupted: " + status.getSearchIndexerStatus());
} return;
} else {
findUrisForStatement(stmt);
}
}
waitForWorkUnitsToComplete();
@Override log.debug("Tell finders we are stopping.");
public SearchIndexerStatus getStatus() { uriFinders.stopIndexing();
return status.getSearchIndexerStatus(); }
}
@Override private boolean isInterrupted() {
public void notifyWorkUnitCompletion(Runnable workUnit) { if (Thread.interrupted()) {
FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit; Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
Set<String> foundUris = worker.getUris(); private void findUrisForStatement(Statement stmt) {
Statement stmt = worker.getStatement(); Runnable workUnit = new FindUrisForStatementWorkUnit(stmt, uriFinders);
log.debug("Found " + foundUris.size() + " uris for statement: " + stmt); pool.submit(workUnit, this);
log.debug("scheduled uri finders for " + stmt);
}
uris.addAll(foundUris); private void waitForWorkUnitsToComplete() {
status.incrementProcessed(); pool.waitUntilIdle();
} }
// ---------------------------------------------------------------------- private void updateTheUris() {
// Helper classes UpdateUrisTask.runNow(uris, excluders, modifiers, indDao, listeners, pool);
// ---------------------------------------------------------------------- }
/** @Override
* A thread-safe collection of status information. All methods are public SearchIndexerStatus getStatus() {
* synchronized. return status.getSearchIndexerStatus();
*/ }
private static class Status {
private final int total;
private final int progressInterval;
private final ListenerList listeners;
private int processed = 0;
private Date since = new Date();
public Status(int total, int progressInterval, ListenerList listeners) { @Override
this.total = total; public void notifyWorkUnitCompletion(Runnable workUnit) {
this.progressInterval = progressInterval; FindUrisForStatementWorkUnit worker = (FindUrisForStatementWorkUnit) workUnit;
this.listeners = listeners;
}
public synchronized void incrementProcessed() { Set<String> foundUris = worker.getUris();
processed++; Statement stmt = worker.getStatement();
since = new Date(); log.debug("Found " + foundUris.size() + " uris for statement: " + stmt);
maybeFireProgressEvent();
}
private void maybeFireProgressEvent() { uris.addAll(foundUris);
if (processed > 0 && processed % progressInterval == 0) { status.incrementProcessed();
listeners.fireEvent(new Event(PROGRESS, }
getSearchIndexerStatus()));
}
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() { // ----------------------------------------------------------------------
int remaining = total - processed; // Helper classes
return new SearchIndexerStatus(PROCESSING_STMTS, since, // ----------------------------------------------------------------------
new StatementCounts(processed, remaining, total));
}
} /**
* A thread-safe collection of status information. All methods are
* synchronized.
*/
private static class Status {
private final int total;
private final int progressInterval;
private final ListenerList listeners;
private int processed = 0;
private Date since = new Date();
public Status(int total, int progressInterval, ListenerList listeners) {
this.total = total;
this.progressInterval = progressInterval;
this.listeners = listeners;
}
public synchronized void incrementProcessed() {
processed++;
since = new Date();
maybeFireProgressEvent();
}
private void maybeFireProgressEvent() {
if (processed > 0 && processed % progressInterval == 0) {
listeners.fireEvent(new Event(PROGRESS,
getSearchIndexerStatus()));
}
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
int remaining = total - processed;
return new SearchIndexerStatus(PROCESSING_STMTS, since,
new StatementCounts(processed, remaining, total));
}
}
}
} }

View file

@ -7,11 +7,7 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS;
import java.util.Collection; import java.util.*;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -27,6 +23,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Even
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.IndexerConfig;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool; import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
@ -49,220 +46,268 @@ import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExclud
* again at the end of the task. * again at the end of the task.
*/ */
public class UpdateUrisTask implements Task { public class UpdateUrisTask implements Task {
private static final Log log = LogFactory.getLog(UpdateUrisTask.class); private static final Log log = LogFactory.getLog(UpdateUrisTask.class);
private final Set<String> uris; private final IndexerConfig config;
private final IndividualDao indDao; private UpdateUrisTaskImpl impl;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final Status status; private final Collection<String> uris;
private final SearchEngine searchEngine; private Date since = new Date();
public UpdateUrisTask(Collection<String> uris, public UpdateUrisTask(IndexerConfig config, Collection<String> uris) {
SearchIndexExcluderList excluders, DocumentModifierList modifiers, this.config = config;
IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) { this.uris = new HashSet<>(uris);
this.uris = new HashSet<>(uris); }
this.excluders = excluders;
this.modifiers = modifiers;
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.status = new Status(this, uris.size(), 500); static void runNow(Collection<String> uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
UpdateUrisTaskImpl impl = new UpdateUrisTaskImpl(uris, excluders, modifiers, indDao, listeners, pool);
impl.run();
}
this.searchEngine = ApplicationUtils.instance().getSearchEngine(); @Override
public void run() {
impl = new UpdateUrisTaskImpl(config, uris);
impl.run();
}
} @Override
public SearchIndexerStatus getStatus() {
if (impl != null) {
return impl.getStatus();
}
@Override return new SearchIndexerStatus(PROCESSING_URIS, since, new UriCounts(0, 0, 0, uris.size(), uris.size()));
public void run() { }
listeners.fireEvent(new Event(START_URIS, status
.getSearchIndexerStatus()));
excluders.startIndexing();
modifiers.startIndexing();
for (String uri : uris) { @Override
if (isInterrupted()) { public void notifyWorkUnitCompletion(Runnable workUnit) {
log.info("Interrupted: " + status.getSearchIndexerStatus()); if (impl != null) {
break; impl.notifyWorkUnitCompletion(workUnit);
} else if (uri == null) { }
// Nothing to do }
} else {
Individual ind = getIndividual(uri);
if (ind == null) {
deleteDocument(uri);
} else if (isExcluded(ind)) {
excludeDocument(uri);
} else {
updateDocument(ind);
}
}
}
pool.waitUntilIdle();
commitChanges(); private static class UpdateUrisTaskImpl implements Task {
private final Collection<String> uris;
private final IndividualDao indDao;
private final SearchIndexExcluderList excluders;
private final DocumentModifierList modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
excluders.stopIndexing(); private final Status status;
modifiers.stopIndexing(); private final SearchEngine searchEngine;
listeners.fireEvent(new Event(STOP_URIS, status
.getSearchIndexerStatus()));
}
private boolean isInterrupted() { public UpdateUrisTaskImpl(IndexerConfig config, Collection<String> uris) {
if (Thread.interrupted()) { this.excluders = config.excluderList();
Thread.currentThread().interrupt(); this.modifiers = config.documentModifierList();
return true; this.indDao = config.individualDao();
} else { this.listeners = config.listenerList();
return false; this.pool = config.workerThreadPool();
}
}
private Individual getIndividual(String uri) { this.uris = uris;
Individual ind = indDao.getIndividualByURI(uri); this.status = new Status(this, uris.size(), 500);
if (ind == null) {
log.debug("Found no individual for '" + uri + "'");
}
return ind;
}
private boolean isExcluded(Individual ind) { this.searchEngine = ApplicationUtils.instance().getSearchEngine();
return excluders.isExcluded(ind); }
}
/** A delete is fast enough to be done synchronously. */ public UpdateUrisTaskImpl(Collection<String> uris, SearchIndexExcluderList excluders, DocumentModifierList modifiers, IndividualDao indDao, ListenerList listeners, WorkerThreadPool pool) {
private void deleteDocument(String uri) { this.uris = uris;
try { this.excluders = excluders;
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri)); this.modifiers = modifiers;
status.incrementDeletes(); this.indDao = indDao;
log.debug("deleted '" + uri + "' from search index."); this.listeners = listeners;
} catch (SearchEngineNotRespondingException e) { this.pool = pool;
log.warn("Failed to delete '" + uri + "' from search index: " this.status = new Status(this, uris.size(), 500);
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
/** An exclusion is just a delete for different reasons. */ this.searchEngine = ApplicationUtils.instance().getSearchEngine();
private void excludeDocument(String uri) { }
try {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementExclusions();
log.debug("excluded '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to exclude '" + uri + "' from search index: "
+ "the search engine is not responding.", e);
} catch (Exception e) {
log.warn("Failed to exclude '" + uri + "' from search index", e);
}
}
private void updateDocument(Individual ind) { @Override
Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers); public void run() {
pool.submit(workUnit, this); listeners.fireEvent(new Event(START_URIS, status.getSearchIndexerStatus()));
log.debug("scheduled update to " + ind); excluders.startIndexing();
} modifiers.startIndexing();
private void fireEvent(Event event) { for (String uri : uris) {
listeners.fireEvent(event); if (isInterrupted()) {
if (event.getType() == PROGRESS || event.getType() == STOP_URIS) { log.info("Interrupted: " + status.getSearchIndexerStatus());
commitChanges(); break;
} } else if (uri == null) {
} // Nothing to do
} else {
Individual ind = getIndividual(uri);
if (ind == null) {
deleteDocument(uri);
} else if (isExcluded(ind)) {
excludeDocument(uri);
} else {
updateDocument(ind);
}
}
}
pool.waitUntilIdle();
private void commitChanges() { commitChanges();
try {
searchEngine.commit();
} catch (SearchEngineException e) {
log.warn("Failed to commit changes.", e);
}
}
@Override excluders.stopIndexing();
public void notifyWorkUnitCompletion(Runnable workUnit) { modifiers.stopIndexing();
log.debug("completed update to " listeners.fireEvent(new Event(STOP_URIS, status.getSearchIndexerStatus()));
+ ((UpdateDocumentWorkUnit) workUnit).getInd()); }
status.incrementUpdates();
}
@Override private boolean isInterrupted() {
public SearchIndexerStatus getStatus() { if (Thread.interrupted()) {
return status.getSearchIndexerStatus(); Thread.currentThread().interrupt();
} return true;
} else {
return false;
}
}
// ---------------------------------------------------------------------- private Individual getIndividual(String uri) {
// helper classes Individual ind = indDao.getIndividualByURI(uri);
// ---------------------------------------------------------------------- if (ind == null) {
log.debug("Found no individual for '" + uri + "'");
}
return ind;
}
/** private boolean isExcluded(Individual ind) {
* A thread-safe collection of status information. All methods are return excluders.isExcluded(ind);
* synchronized. }
*/
private static class Status {
private final UpdateUrisTask parent;
private final int total;
private final int progressInterval;
private int updated = 0;
private int deleted = 0;
private int excluded = 0;
private Date since = new Date();
public Status(UpdateUrisTask parent, int total, int progressInterval) { /**
this.parent = parent; * A delete is fast enough to be done synchronously.
this.total = total; */
this.progressInterval = progressInterval; private void deleteDocument(String uri) {
} try {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementDeletes();
log.debug("deleted '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete '" + uri + "' from search index: "
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
public synchronized void incrementUpdates() { /**
updated++; * An exclusion is just a delete for different reasons.
since = new Date(); */
maybeFireProgressEvent(); private void excludeDocument(String uri) {
} try {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementExclusions();
log.debug("excluded '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to exclude '" + uri + "' from search index: "
+ "the search engine is not responding.", e);
} catch (Exception e) {
log.warn("Failed to exclude '" + uri + "' from search index", e);
}
}
public synchronized void incrementDeletes() { private void updateDocument(Individual ind) {
deleted++; Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers);
since = new Date(); pool.submit(workUnit, this);
} log.debug("scheduled update to " + ind);
}
public synchronized void incrementExclusions() { private void fireEvent(Event event) {
excluded++; listeners.fireEvent(event);
since = new Date(); if (event.getType() == PROGRESS || event.getType() == STOP_URIS) {
} commitChanges();
}
}
private void maybeFireProgressEvent() { private void commitChanges() {
if (updated > 0 && updated % progressInterval == 0) { try {
parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus())); searchEngine.commit();
} } catch (SearchEngineException e) {
} log.warn("Failed to commit changes.", e);
}
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() { @Override
int remaining = total - updated - deleted - excluded; public void notifyWorkUnitCompletion(Runnable workUnit) {
return new SearchIndexerStatus(PROCESSING_URIS, since, log.debug("completed update to "
new UriCounts(excluded, deleted, updated, remaining, total)); + ((UpdateDocumentWorkUnit) workUnit).getInd());
} status.incrementUpdates();
}
} @Override
public SearchIndexerStatus getStatus() {
return status.getSearchIndexerStatus();
}
/** /**
* This will be first in the list of SearchIndexExcluders. * A thread-safe collection of status information. All methods are
*/ * synchronized.
public static class ExcludeIfNoVClasses implements SearchIndexExcluder { */
@Override private static class Status {
public String checkForExclusion(Individual ind) { private final UpdateUrisTaskImpl parent;
List<VClass> vclasses = ind.getVClasses(false); private final int total;
if (vclasses == null || vclasses.isEmpty()) { private final int progressInterval;
return "Individual " + ind + " has no classes."; private int updated = 0;
} private int deleted = 0;
return null; private int excluded = 0;
} private Date since = new Date();
@Override public Status(UpdateUrisTaskImpl parent, int total, int progressInterval) {
public String toString() { this.parent = parent;
return "Internal: ExcludeIfNoVClasses"; this.total = total;
} this.progressInterval = progressInterval;
}
} public synchronized void incrementUpdates() {
} updated++;
since = new Date();
maybeFireProgressEvent();
}
public synchronized void incrementDeletes() {
deleted++;
since = new Date();
}
public synchronized void incrementExclusions() {
excluded++;
since = new Date();
}
private void maybeFireProgressEvent() {
if (updated > 0 && updated % progressInterval == 0) {
parent.fireEvent(new Event(PROGRESS, getSearchIndexerStatus()));
}
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
int remaining = total - updated - deleted - excluded;
return new SearchIndexerStatus(PROCESSING_URIS, since,
new UriCounts(excluded, deleted, updated, remaining, total));
}
}
}
// ----------------------------------------------------------------------
// helper classes
// ----------------------------------------------------------------------
/**
* This will be first in the list of SearchIndexExcluders.
*/
public static class ExcludeIfNoVClasses implements SearchIndexExcluder {
@Override
public String checkForExclusion(Individual ind) {
List<VClass> vclasses = ind.getVClasses(false);
if (vclasses == null || vclasses.isEmpty()) {
return "Individual " + ind + " has no classes.";
}
return null;
}
@Override
public String toString() {
return "Internal: ExcludeIfNoVClasses";
}
}
}

View file

@ -31,7 +31,12 @@ public class SearchIndexerStub implements SearchIndexer {
paused = true; paused = true;
} }
@Override @Override
public void pauseWithoutDeferring() {
paused = true;
}
@Override
public void unpause() { public void unpause() {
paused = false; paused = false;
} }