Restore pause logic to IndexingChangeListener.
Adjust comments, remove compiler warnings and vestigial code items, rename method.
This commit is contained in:
Jim Blake 2015-02-17 12:08:09 -05:00
parent 680953115f
commit 066d013360
7 changed files with 102 additions and 81 deletions

View file

@ -44,12 +44,13 @@ public interface SearchIndexer extends Application.Module {
void pause(); void pause();
/** /**
* Stop processing new tasks. Requests will be ignored and the index rebuilt when unpaused. * Stop processing new tasks. If any request is received while the indexer
* Fires a PAUSED event to listeners. * is paused, the request will be ignored, but the index will be rebuilt
* when unpaused. Fires a PAUSED event to listeners.
* *
* This call has no effect if already paused, or if called after shutdown. * This call has no effect if already paused, or if called after shutdown.
*/ */
void pauseWithoutDeferring(); void pauseInAnticipationOfRebuild();
/** /**
* Resume processing new tasks. Any requests that were received since the * Resume processing new tasks. Any requests that were received since the

View file

@ -104,7 +104,7 @@ public class ABoxRecomputer {
} }
try { try {
if (searchIndexer != null) { if (searchIndexer != null) {
searchIndexer.pauseWithoutDeferring(); searchIndexer.pauseInAnticipationOfRebuild();
} }
recomputeABox(); recomputeABox();
} finally { } finally {

View file

@ -15,10 +15,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.hp.hpl.jena.graph.Triple;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.RiotReader;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
@ -28,8 +30,6 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event; import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener; import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import org.apache.jena.riot.RDFLanguages;
import org.apache.jena.riot.RiotReader;
/** /**
* When a change is heard, wait for an interval to see if more changes come in. * When a change is heard, wait for an interval to see if more changes come in.
@ -62,6 +62,7 @@ public class IndexingChangeListener implements ChangeListener,
private final SearchIndexer searchIndexer; private final SearchIndexer searchIndexer;
private final Ticker ticker; private final Ticker ticker;
private volatile boolean paused = true;
private final Model defaultModel; private final Model defaultModel;
/** All access to the list must be synchronized. */ /** All access to the list must be synchronized. */
@ -78,20 +79,34 @@ public class IndexingChangeListener implements ChangeListener,
private synchronized void noteChange(Statement stmt) { private synchronized void noteChange(Statement stmt) {
changes.add(stmt); changes.add(stmt);
if (!paused) {
ticker.start(); ticker.start();
} }
}
@Override @Override
public void receiveSearchIndexerEvent(Event event) { public void receiveSearchIndexerEvent(Event event) {
if (event.getType() == PAUSE) {
paused = true;
} else if (event.getType() == UNPAUSE) {
paused = false;
ticker.start();
} else if (event.getType() == START_REBUILD) {
discardChanges();
}
} }
private synchronized void respondToTicker() { private synchronized void respondToTicker() {
if (!changes.isEmpty()) { if (!paused && !changes.isEmpty()) {
searchIndexer.scheduleUpdatesForStatements(changes); searchIndexer.scheduleUpdatesForStatements(changes);
changes.clear(); changes.clear();
} }
} }
private synchronized void discardChanges() {
changes.clear();
}
public void shutdown() { public void shutdown() {
ticker.shutdown(); ticker.shutdown();
} }
@ -128,13 +143,14 @@ public class IndexingChangeListener implements ChangeListener,
try { try {
// Use RiotReader to parse a Triple // Use RiotReader to parse a Triple
// NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " ."; // NB A Triple can be serialized correctly with: FmtUtils.stringForTriple(triple, PrefixMapping.Factory.create()) + " .";
Iterator<Triple> it = RiotReader.createIteratorTriples(new ByteArrayInputStream(serializedTriple.getBytes("UTF-8")), RDFLanguages.NTRIPLES, null); ByteArrayInputStream input = new ByteArrayInputStream(serializedTriple.getBytes("UTF-8"));
Iterator<Triple> it = RiotReader.createIteratorTriples(input, RDFLanguages.NTRIPLES, null);
if (it.hasNext()) { if (it.hasNext()) {
Triple triple = it.next(); Triple triple = it.next();
if (it.hasNext()) { if (it.hasNext()) {
log.warn("More than one triple parsed from change event"); log.warn("More than one triple parsed from change event: '" + serializedTriple + "'");
} }
// Use the retained defaultModel instance to convert the Triple to a Statement // Use the retained defaultModel instance to convert the Triple to a Statement
@ -143,7 +159,7 @@ public class IndexingChangeListener implements ChangeListener,
// is created and attached to all of the Statements created by this instance // is created and attached to all of the Statements created by this instance
return defaultModel.asStatement(triple); return defaultModel.asStatement(triple);
} else { } else {
throw new RuntimeException("no triple parsed from change event"); throw new RuntimeException("no triple parsed from change event: '" + serializedTriple + "'");
} }
} catch (RuntimeException riot) { } catch (RuntimeException riot) {
log.error("Failed to parse triple " + serializedTriple, riot); log.error("Failed to parse triple " + serializedTriple, riot);

View file

@ -28,12 +28,12 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Statement; import com.hp.hpl.jena.rdf.model.Statement;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering; import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils; import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
@ -87,7 +87,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private final ListenerList listeners = new ListenerList(); private final ListenerList listeners = new ListenerList();
private final TaskQueue taskQueue = new TaskQueue(); private final TaskQueue taskQueue = new TaskQueue();
private final Scheduler scheduler = new Scheduler(this, taskQueue); private final Scheduler scheduler = new Scheduler(taskQueue);
private Integer threadPoolSize; private Integer threadPoolSize;
private WorkerThreadPool pool; private WorkerThreadPool pool;
@ -209,7 +209,7 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
@Override @Override
public void pauseWithoutDeferring() { public void pauseInAnticipationOfRebuild() {
if (!isPaused() && !isShutdown()) { if (!isPaused() && !isShutdown()) {
ignoreTasksWhilePaused = true; ignoreTasksWhilePaused = true;
rebuildOnUnpause = false; rebuildOnUnpause = false;
@ -398,12 +398,10 @@ public class SearchIndexerImpl implements SearchIndexer {
private static class Scheduler { private static class Scheduler {
private final TaskQueue taskQueue; private final TaskQueue taskQueue;
private final List<Task> deferredQueue; private final List<Task> deferredQueue;
private final SearchIndexerImpl indexer;
private volatile boolean started; private volatile boolean started;
private volatile boolean paused; private volatile boolean paused;
public Scheduler(SearchIndexerImpl indexer, TaskQueue taskQueue) { public Scheduler(TaskQueue taskQueue) {
this.indexer = indexer;
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
this.deferredQueue = new ArrayList<Task>(); this.deferredQueue = new ArrayList<Task>();
} }
@ -449,8 +447,6 @@ public class SearchIndexerImpl implements SearchIndexer {
taskQueue.scheduleTask(task); taskQueue.scheduleTask(task);
log.debug("moved task from deferred queue to task queue: " + task); log.debug("moved task from deferred queue to task queue: " + task);
} }
// Empty out the deferred queue as we've now processed it
deferredQueue.clear(); deferredQueue.clear();
} }
@ -559,29 +555,59 @@ public class SearchIndexerImpl implements SearchIndexer {
*/ */
public static interface IndexerConfig { public static interface IndexerConfig {
public IndexingUriFinderList uriFinderList(); public IndexingUriFinderList uriFinderList();
public SearchIndexExcluderList excluderList(); public SearchIndexExcluderList excluderList();
public DocumentModifierList documentModifierList(); public DocumentModifierList documentModifierList();
public IndividualDao individualDao(); public IndividualDao individualDao();
public ListenerList listenerList(); public ListenerList listenerList();
public WorkerThreadPool workerThreadPool(); public WorkerThreadPool workerThreadPool();
} }
/** /**
* Implementation of IndexerConfig * Implementation of IndexerConfig Defers access to the configuration until
* Defers access to the configuration until the task is running, so a Task * the task is running, so a Task created and deferred before the indexer is
* created and deferred before the indexer is started will not cause a NullPointerException * started will not cause a NullPointerException
*/ */
private static class IndexerConfigImpl implements IndexerConfig { private static class IndexerConfigImpl implements IndexerConfig {
private final SearchIndexerImpl sii; private final SearchIndexerImpl sii;
public IndexerConfigImpl(SearchIndexerImpl sii) { this.sii = sii; } public IndexerConfigImpl(SearchIndexerImpl sii) {
this.sii = sii;
}
public IndexingUriFinderList uriFinderList() { return sii.createFindersList(); } @Override
public SearchIndexExcluderList excluderList() { return sii.createExcludersList(); } public IndexingUriFinderList uriFinderList() {
public DocumentModifierList documentModifierList() { return sii.createModifiersList(); } return sii.createFindersList();
public IndividualDao individualDao() { return sii.wadf.getIndividualDao(); } }
public ListenerList listenerList() { return sii.listeners; }
public WorkerThreadPool workerThreadPool() { return sii.pool; } @Override
public SearchIndexExcluderList excluderList() {
return sii.createExcludersList();
}
@Override
public DocumentModifierList documentModifierList() {
return sii.createModifiersList();
}
@Override
public IndividualDao individualDao() {
return sii.wadf.getIndividualDao();
}
@Override
public ListenerList listenerList() {
return sii.listeners;
}
@Override
public WorkerThreadPool workerThreadPool() {
return sii.pool;
}
} }
public static interface Task extends Runnable { public static interface Task extends Runnable {
@ -688,28 +714,4 @@ public class SearchIndexerImpl implements SearchIndexer {
} }
} }
private static class StatementList {
List<Statement> changes;
public StatementList() {
changes = new ArrayList<Statement>();
}
public synchronized void addStatement(Statement stmt) {
changes.add(stmt);
}
public synchronized List<Statement> getStatements() {
try {
return new ArrayList<>(changes);
} finally {
changes.clear();
}
}
public synchronized int size() {
return changes.size();
}
};
} }

View file

@ -14,7 +14,6 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View file

@ -7,7 +7,10 @@ import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndex
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_URIS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS; import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS;
import java.util.*; import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;

View file

@ -32,7 +32,7 @@ public class SearchIndexerStub implements SearchIndexer {
} }
@Override @Override
public void pauseWithoutDeferring() { public void pauseInAnticipationOfRebuild() {
paused = true; paused = true;
} }