VIVO-870 Implement UpdateUrisTask.

At the same time, broke the old IndexBuilder, so no listener and no rebuild.
This commit is contained in:
Jim Blake 2015-01-09 16:38:08 -05:00
parent 64624f2b84
commit 39f1ed0b27
11 changed files with 828 additions and 581 deletions

View file

@ -110,23 +110,15 @@ public interface SearchIndexer extends Application.Module {
*/
public static class Event {
public enum Type {
STARTUP,
START_PROCESSING_URIS,
STARTUP, PROGRESS,
PROGRESS_PROCESSING_URIS,
START_PROCESSING_URIS, STOP_PROCESSING_URIS,
STOP_PROCESSING_URIS,
START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS,
START_PROCESSING_STATEMENTS,
REBUILD_REQUESTED, REBUILD_COMPLETE,
PROGRESS_PROCESSING_STATEMENTS,
STOP_PROCESSING_STATEMENTS,
REBUILD_REQUESTED,
SHUTDOWN_REQUESTED
SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE
}
private final Type type;

View file

@ -5,23 +5,34 @@ package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer;
import java.util.Date;
/**
* An immutable summary of the status of the SearchIndexer, at some point in
* time.Contains the current state, and some counts.
* An immutable summary of the status of the SearchIndexer, at a fixed point in
* time. Contains the current state, the time, and some counts.
*
* If the indexer is processing URIs, processing statements, or preparing a
* rebuild, the counts are URI_COUNTS, STATEMENT_COUNTS, or REBUILD_COUNTS.
* rebuild, the counts are URI_COUNTS, STATEMENT_COUNTS, or REBUILD_COUNTS,
* respectively.
*
* When the indexer starts up, and when it is is shut down, the counts are
* When the indexer starts up, becomes idle, or shuts down, the counts are
* NO_COUNTS.
*
* If the indexer is idle, the counts are carried over from the previous
* operation.
*/
public class SearchIndexerStatus {
public enum State {
IDLE, PROCESSING_URIS, PROCESSING_STMTS, PREPARING_REBUILD, SHUTDOWN
// ----------------------------------------------------------------------
// factory methods
// ----------------------------------------------------------------------
public static SearchIndexerStatus idle() {
return new SearchIndexerStatus(State.IDLE, new Date(), new NoCounts());
}
public static SearchIndexerStatus shutdown() {
return new SearchIndexerStatus(State.SHUTDOWN, new Date(),
new NoCounts());
}
// ----------------------------------------------------------------------
// the instance
// ----------------------------------------------------------------------
private final State state;
private final Date since;
private final Counts counts;
@ -39,11 +50,19 @@ public class SearchIndexerStatus {
public Date getSince() {
return since;
}
public Counts getCounts() {
return counts;
}
// ----------------------------------------------------------------------
// helper classes
// ----------------------------------------------------------------------
public enum State {
IDLE, PROCESSING_URIS, PROCESSING_STMTS, PREPARING_REBUILD, SHUTDOWN
}
public abstract static class Counts {
public enum Type {
URI_COUNTS, STATEMENT_COUNTS, REBUILD_COUNTS, NO_COUNTS

View file

@ -0,0 +1,20 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.modules.searchIndexer;
/**
* Some handy methods for dealing with the search index.
*/
public class SearchIndexerUtils {
/**
* The document ID in the search index is derived from the individual's URI.
*/
public static String getIdForUri(String uri){
if( uri != null ){
return "vitroIndividual:" + uri;
}else{
return null;
}
}
}

View file

@ -18,7 +18,6 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResponse;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResultDocumentList;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.search.beans.IndexerIface;
import edu.cornell.mannlib.vitro.webapp.search.documentBuilding.IndividualToSearchDocument;
public class SearchIndexer implements IndexerIface {
@ -27,7 +26,6 @@ public class SearchIndexer implements IndexerIface {
protected SearchEngine server;
protected boolean indexing;
protected HashSet<String> urisIndexed;
protected IndividualToSearchDocument individualToSearchDoc;
/**
* System is shutting down if true.
@ -48,9 +46,8 @@ public class SearchIndexer implements IndexerIface {
*/
protected boolean doingFullIndexRebuild = false;
public SearchIndexer( SearchEngine server, IndividualToSearchDocument indToDoc){
public SearchIndexer( SearchEngine server){
this.server = server;
this.individualToSearchDoc = indToDoc;
}
@Override
@ -74,7 +71,7 @@ public class SearchIndexer implements IndexerIface {
urisIndexed.add(ind.getURI());
}
log.debug("indexing " + ind.getURI());
doc = individualToSearchDoc.translate(ind);
// doc = individualToSearchDoc.translate(ind);
if( doc != null){
if( log.isDebugEnabled()){
@ -109,9 +106,9 @@ public class SearchIndexer implements IndexerIface {
public void removeFromIndex(String uri) throws IndexingException {
if( uri != null ){
try {
server.deleteById(individualToSearchDoc.getIdForUri(uri));
// server.deleteById(individualToSearchDoc.getIdForUri(uri));
log.debug("deleted " + " " + uri);
} catch (SearchEngineException e) {
} catch (Exception e) {
log.error( "could not delete individual " + uri, e);
}
}
@ -136,7 +133,7 @@ public class SearchIndexer implements IndexerIface {
public void abortIndexingAndCleanUp() {
shutdownRequested = true;
try{
individualToSearchDoc.shutdown();
// individualToSearchDoc.shutdown();
}catch(Exception e){
if( log != null)
log.debug(e,e);

View file

@ -1,356 +0,0 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search.documentBuilding;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.ALLTEXT;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.ALLTEXTUNSTEMMED;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.CLASSGROUP_URI;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.DOCID;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.INDEXEDTIME;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.MOST_SPECIFIC_TYPE_URIS;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.NAME_LOWERCASE_SINGLE_VALUED;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.NAME_RAW;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.RDFTYPE;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.jsoup.Jsoup;
import com.hp.hpl.jena.shared.JenaException;
import com.hp.hpl.jena.vocabulary.OWL;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.beans.DataPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.IndividualImpl;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResultDocument;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
public class IndividualToSearchDocument {
public static final Log log = LogFactory.getLog(IndividualToSearchDocument.class.getName());
protected final String label = "http://www.w3.org/2000/01/rdf-schema#label";
protected List<DocumentModifier> documentModifiers = new ArrayList<DocumentModifier>();
protected List<SearchIndexExcluder> excludes;
public IndividualToSearchDocument(List<SearchIndexExcluder> excludes, List<DocumentModifier> docModifiers){
this.excludes = excludes;
this.documentModifiers = docModifiers;
}
public SearchInputDocument translate(Individual ind) throws IndexingException{
try{
String excludeMsg = checkExcludes( ind );
if( excludeMsg != DONT_EXCLUDE){
log.debug(ind.getURI() + " " + excludeMsg);
return null;
}
SearchInputDocument doc = ApplicationUtils.instance().getSearchEngine().createInputDocument();
//DocID
doc.addField(DOCID, getIdForUri( ind.getURI() ) );
//vitro id
doc.addField(URI, ind.getURI());
log.debug(ind.getURI() + " init boost: " + doc.getDocumentBoost());
//get label from ind
addLabel(ind, doc);
//add classes, classgroups get if prohibited because of its class
StringBuffer classPublicNames = new StringBuffer("");
addClasses(ind, doc, classPublicNames);
addMostSpecificTypeUris( ind, doc );
log.debug(ind.getURI() + " post class boost: " + doc.getDocumentBoost());
// collecting URIs and rdfs:labels of objects of statements
StringBuffer objectNames = new StringBuffer("");
StringBuffer addUri = new StringBuffer("");
addObjectPropertyText(ind, doc, objectNames, addUri);
//time of index in msec past epoch
doc.addField(INDEXEDTIME, (Object) new DateTime().getMillis() );
addAllText( ind, doc, classPublicNames, objectNames );
//boost for entity
if(ind.getSearchBoost() != null && ind.getSearchBoost() != 0) {
doc.setDocumentBoost(ind.getSearchBoost());
}
log.debug(ind.getURI() + " pre mod boost: " + doc.getDocumentBoost());
runAdditionalDocModifers(ind,doc);
log.debug(ind.getURI() + " post mod boost: " + doc.getDocumentBoost());
return doc;
}catch(SkipIndividualException ex){
//indicates that this individual should not be indexed by returning null
log.debug(ex);
return null;
}catch(Exception th){
log.error(th,th);
return null;
}
}
protected String checkExcludes(Individual ind) {
for( SearchIndexExcluder excluder : excludes){
try{
String msg = excluder.checkForExclusion(ind);
log.debug("individual=" + ind.getURI() + " (" + ind.getLabel()
+ "), excluder=" + excluder + ", types="
+ ind.getMostSpecificTypeURIs() + ", msg=" + msg);
if( msg != DONT_EXCLUDE)
return msg;
}catch (Exception e) {
return e.getMessage();
}
}
return DONT_EXCLUDE;
}
protected Map<String,Long> docModClassToTime = new HashMap<String,Long>();
protected long docModCount =0;
protected void runAdditionalDocModifers( Individual ind, SearchInputDocument doc )
throws SkipIndividualException{
//run the document modifiers
if( documentModifiers != null && !documentModifiers.isEmpty()){
docModCount++;
for(DocumentModifier modifier: documentModifiers){
long start = System.currentTimeMillis();
modifier.modifyDocument(ind, doc);
if( log.isDebugEnabled()){
long delta = System.currentTimeMillis() - start;
synchronized(docModClassToTime){
Class<?> clz = modifier.getClass();
if( docModClassToTime.containsKey( clz.getName() )){
Long time = docModClassToTime.get(clz.getName() );
docModClassToTime.put(clz.getName(), time + delta);
}else{
docModClassToTime.put(clz.getName(), delta);
}
}
if( docModCount % 200 == 0 ){
log.debug("DocumentModifier timings");
for( Entry<String, Long> entry: docModClassToTime.entrySet()){
log.debug("average msec to run " + entry.getKey() + ": " + (entry.getValue()/docModCount));
}
}
}
}
}
}
protected void addAllText(Individual ind, SearchInputDocument doc, StringBuffer classPublicNames, StringBuffer objectNames) {
String t=null;
//ALLTEXT, all of the 'full text'
StringBuffer allTextValue = new StringBuffer();
try{
//collecting data property statements
List<DataPropertyStatement> dataPropertyStatements = ind.getDataPropertyStatements();
if (dataPropertyStatements != null) {
Iterator<DataPropertyStatement> dataPropertyStmtIter = dataPropertyStatements.iterator();
while (dataPropertyStmtIter.hasNext()) {
DataPropertyStatement dataPropertyStmt = dataPropertyStmtIter.next();
if(dataPropertyStmt.getDatapropURI().equals(label)){ // we don't want label to be added to alltext
continue;
}
allTextValue.append(" ");
allTextValue.append(((t=dataPropertyStmt.getData()) == null)?"":t);
}
}
}catch(JenaException je){
//VIVO-15 Trap for characters that cause search indexing to abort
log.error(String.format("Continuing to index %s but could not get all dataproperties because %s",ind.getURI(),je.getMessage()));
}
allTextValue.append(objectNames.toString());
allTextValue.append(' ');
allTextValue.append(classPublicNames);
try {
String stripped = Jsoup.parse(allTextValue.toString()).text();
allTextValue.setLength(0);
allTextValue.append(stripped);
} catch(Exception e) {
log.debug("Could not strip HTML during search indexing. " + e);
}
String alltext = allTextValue.toString();
doc.addField(ALLTEXT, alltext);
doc.addField(ALLTEXTUNSTEMMED, alltext);
}
/**
* Get the rdfs:labes for objects of statements and put in objectNames.
* Get the URIs for objects of statements and put in addUri.
*/
protected void addObjectPropertyText(Individual ind, SearchInputDocument doc,
StringBuffer objectNames, StringBuffer addUri) {
try{
List<ObjectPropertyStatement> objectPropertyStatements = ind.getObjectPropertyStatements();
if (objectPropertyStatements != null) {
Iterator<ObjectPropertyStatement> objectPropertyStmtIter = objectPropertyStatements.iterator();
while (objectPropertyStmtIter.hasNext()) {
ObjectPropertyStatement objectPropertyStmt = objectPropertyStmtIter.next();
if( "http://www.w3.org/2002/07/owl#differentFrom".equals(objectPropertyStmt.getPropertyURI()) ){
continue;
}
try {
objectNames.append(" ");
String t=null;
objectNames.append(((t=objectPropertyStmt.getObject().getRdfsLabel()) == null)?"":t);
addUri.append(" ");
addUri.append(((t=objectPropertyStmt.getObject().getURI()) == null)?"":t);
} catch (Exception e) {
log.debug("could not index name of related object: " + e.getMessage());
}
}
}
}catch(JenaException je){
//VIVO-15 Trap for characters that cause search indexing to abort
log.error(String.format("Continuing to index %s but could not get all object properties because %s",ind.getURI(),je.getMessage()));
}
}
/**
* Adds the info about the classes that the individual is a member
* of, classgroups and checks if prohibited.
* @param classPublicNames
* @returns true if prohibited from search
* @throws SkipIndividualException
*/
protected void addClasses(Individual ind, SearchInputDocument doc, StringBuffer classPublicNames) throws SkipIndividualException{
List<VClass> vclasses = ind.getVClasses(false);
if( vclasses == null || vclasses.isEmpty() ){
throw new SkipIndividualException("Not indexing because individual has no classes");
}
for(VClass clz : vclasses){
if(clz.getURI() == null){
continue;
}else if(OWL.Thing.getURI().equals(clz.getURI())){
//don't add owl:Thing as the type in the index
continue;
} else {
if( clz.getSearchBoost() != null){
doc.setDocumentBoost(doc.getDocumentBoost() + clz.getSearchBoost());
}
doc.addField(RDFTYPE, clz.getURI());
if(clz.getName() != null){
classPublicNames.append(" ");
classPublicNames.append(clz.getName());
}
//Add the Classgroup URI to a field
if(clz.getGroupURI() != null){
doc.addField(CLASSGROUP_URI,clz.getGroupURI());
}
}
}
}
protected void addMostSpecificTypeUris(Individual ind, SearchInputDocument doc){
List<String> mstURIs = ind.getMostSpecificTypeURIs();
if( mstURIs != null ){
for( String typeURI : mstURIs ){
if( typeURI != null && ! typeURI.trim().isEmpty() )
doc.addField(MOST_SPECIFIC_TYPE_URIS, typeURI);
}
}
}
protected void addLabel(Individual ind, SearchInputDocument doc) {
String value = "";
String label = ind.getRdfsLabel();
if (label != null) {
value = label;
} else {
value = ind.getLocalName();
}
doc.addField(NAME_RAW, value);
doc.addField(NAME_LOWERCASE_SINGLE_VALUED,value);
// NAME_RAW will be copied by the search engine into the following fields:
// NAME_LOWERCASE, NAME_UNSTEMMED, NAME_STEMMED, NAME_PHONETIC, AC_NAME_UNTOKENIZED, AC_NAME_STEMMED
}
public Object getIndexId(Object obj) {
throw new Error("IndiviudalToSearchDocument.getIndexId() is unimplemented");
}
public String getIdForUri(String uri){
if( uri != null ){
return "vitroIndividual:" + uri;
}else{
return null;
}
}
public String getQueryForId(String uri ){
return DOCID + ':' + getIdForUri(uri);
}
public Individual unTranslate(Object result) {
Individual ent = null;
if( result instanceof SearchResultDocument){
SearchResultDocument hit = (SearchResultDocument) result;
String uri= (String) hit.getFirstValue(URI);
ent = new IndividualImpl();
ent.setURI(uri);
}
return ent;
}
public void shutdown(){
for(DocumentModifier dm: documentModifiers){
try{
dm.shutdown();
}catch(Exception e){
if( log != null)
log.debug(e,e);
}
}
}
public static final String DONT_EXCLUDE =null;
}

View file

@ -3,79 +3,90 @@
package edu.cornell.mannlib.vitro.webapp.searchindex;
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.REBUILD_REQUESTED;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.*;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_STATEMENTS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP;
import static java.util.concurrent.TimeUnit.MINUTES;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.WebappDaoFactoryFiltering;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilterUtils;
import edu.cornell.mannlib.vitro.webapp.dao.filtering.filters.VitroFilters;
import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
import edu.cornell.mannlib.vitro.webapp.modules.Application;
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexingEventListener;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.RebuildIndexTask;
import edu.cornell.mannlib.vitro.webapp.searchindex.tasks.UpdateUrisTask;
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoader;
import edu.cornell.mannlib.vitro.webapp.utils.configuration.ConfigurationBeanLoaderException;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevelStamp;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
/**
* TODO A silly implementation that just wraps the old IndexBuilder.
* An implementation of the SearchIndexer interface.
*
* This implementation uses a single-threaded task queue to permit indexing to
* run one at a time in a "background" thread. The task queue is controlled by a
* scheduler that allows us to suspend incoming tasks (pause).
*
* A thread pool is available so the tasks can create small units of work to be
* run in parallel. Each task should block until all of its work units are
* complete, to preserve the pattern of running one task at a time.
*/
public class SearchIndexerImpl implements SearchIndexer {
private static final Log log = LogFactory.getLog(SearchIndexerImpl.class);
private final ListenerList listeners = new ListenerList();
private final TaskQueue taskQueue = new TaskQueue();
private final Scheduler scheduler = new Scheduler(taskQueue);
private final WorkerThreadPool pool = new WorkerThreadPool();
private ServletContext ctx;
private Set<SearchIndexExcluder> excluders;
private Set<DocumentModifier> modifiers;
private Set<IndexingUriFinder> uriFinders;
// TODO
private IndexBuilder indexBuilder;
private WebappDaoFactory wadf;
@Override
public void startup(Application application, ComponentStartupStatus ss) {
try {
this.ctx = application.getServletContext();
loadConfiguration();
this.wadf = getFilteredWebappDaoFactory();
listeners.fireEvent(new Event(STARTUP, getStatus()));
ss.info("Configured SearchIndexer: excluders=" + excluders
+ ", modifiers=" + modifiers + ", uriFinders=" + uriFinders);
{ // >>>>>>> TODO
this.indexBuilder = (IndexBuilder) ctx
.getAttribute(IndexBuilder.class.getName());
this.indexBuilder.addIndexBuilderListener(new BridgeListener());
}
createAndFire(STARTUP);
} catch (Exception e) {
ss.fatal("Failed to configure the SearchIndexer", e);
}
}
private void createAndFire(Event.Type type) {
listeners.fireEvent(new Event(type, getStatus()));
}
private void loadConfiguration() throws ConfigurationBeanLoaderException {
ConfigurationBeanLoader beanLoader = new ConfigurationBeanLoader(
ModelAccess.on(ctx).getOntModel(DISPLAY), ctx);
@ -84,80 +95,36 @@ public class SearchIndexerImpl implements SearchIndexer {
uriFinders = beanLoader.loadAll(IndexingUriFinder.class);
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer#
* scheduleUpdatesForUris(java.util.Collection)
/**
* Use a filtered DAO factory, so only public data goes into the search
* index.
*/
private WebappDaoFactory getFilteredWebappDaoFactory() {
WebappDaoFactory rawWadf = ModelAccess.on(ctx).getWebappDaoFactory();
VitroFilters vf = VitroFilterUtils.getPublicFilter(ctx);
return new WebappDaoFactoryFiltering(rawWadf, vf);
}
@Override
public void scheduleUpdatesForUris(Collection<String> uris) {
// TODO
for (String uri : uris) {
indexBuilder.addToChanged(uri);
}
log.debug("Schedule updates for " + uris.size() + " uris.");
scheduler.scheduleTask(new UpdateUrisTask(uris, excluders, modifiers,
wadf.getIndividualDao(), listeners, pool));
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer#
* rebuildIndex()
*/
@Override
public void rebuildIndex() {
// TODO
indexBuilder.doIndexRebuild();
scheduler.scheduleTask(new RebuildIndexTask());
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer#
* pause()
*/
@Override
public void pause() {
// TODO
indexBuilder.pause();
scheduler.pause();
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer#
* unpause()
*/
@Override
public void unpause() {
// TODO
indexBuilder.unpause();
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer#
* getStatus()
*/
@Override
public SearchIndexerStatus getStatus() {
// TODO
WorkLevelStamp workLevel = indexBuilder.getWorkLevel();
WorkLevel level = workLevel.getLevel();
Date since = workLevel.getSince();
if (level == WorkLevel.IDLE) {
return new SearchIndexerStatus(State.IDLE, since,
new SearchIndexerStatus.NoCounts());
} else {
return new SearchIndexerStatus(State.PROCESSING_URIS, since,
new SearchIndexerStatus.UriCounts(1, 2, 3, 6));
}
scheduler.unpause();
}
@Override
@ -170,63 +137,264 @@ public class SearchIndexerImpl implements SearchIndexer {
listeners.remove(listener);
}
/*
* (non-Javadoc)
*
* @see
* edu.cornell.mannlib.vitro.webapp.modules.Application.Component#shutdown
* (edu.cornell.mannlib.vitro.webapp.modules.Application)
*/
@Override
public void shutdown(Application application) {
// TODO
public SearchIndexerStatus getStatus() {
return taskQueue.getStatus();
}
@Override
public synchronized void shutdown(Application application) {
SearchIndexerStatus status = taskQueue.getStatus();
if (status.getState() != State.SHUTDOWN) {
listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status));
taskQueue.shutdown();
for (DocumentModifier dm : modifiers) {
try {
dm.shutdown();
} catch (Exception e) {
log.warn("Failed to shut down document modifier " + dm, e);
}
}
listeners.fireEvent(new Event(SHUTDOWN_COMPLETE, taskQueue
.getStatus()));
}
}
// ----------------------------------------------------------------------
// Helper classes
// ----------------------------------------------------------------------
/**
* A simple thread-safe list of event listeners.
* A simple thread-safe list of event listeners. All methods are
* synchronized.
*/
private static class ListenerList {
public static class ListenerList {
private final List<Listener> list;
public ListenerList() {
list = Collections.synchronizedList(new ArrayList<Listener>());
list = new ArrayList<Listener>();
}
public void add(Listener l) {
public synchronized void add(Listener l) {
list.add(l);
}
public void remove(Listener l) {
public synchronized void remove(Listener l) {
list.remove(l);
}
public void fireEvent(Event e) {
synchronized (list) {
for (Listener l : list) {
l.receiveSearchIndexerEvent(e);
public synchronized void fireEvent(Event event) {
for (Listener l : list) {
try {
l.receiveSearchIndexerEvent(event);
} catch (Exception e) {
log.warn("Failed to deliver event '" + event
+ "' to listener '" + l + "'", e);
}
}
}
}
private class BridgeListener implements IndexingEventListener {
@Override
public void notifyOfIndexingEvent(EventTypes ie) {
switch (ie) {
case START_UPDATE:
createAndFire(START_PROCESSING_STATEMENTS);
break;
case FINISHED_UPDATE:
createAndFire(STOP_PROCESSING_STATEMENTS);
break;
case START_FULL_REBUILD:
createAndFire(REBUILD_REQUESTED);
createAndFire(START_PROCESSING_STATEMENTS);
break;
default: // FINISH_FULL_REBUILD
createAndFire(STOP_PROCESSING_STATEMENTS);
break;
/**
* A scheduler either collects tasks (if paused), or passes them to the
* queue (if not paused). All methods are synchronized.
*/
private static class Scheduler {
private final TaskQueue taskQueue;
private final List<Task> deferredQueue;
private volatile boolean paused;
public Scheduler(TaskQueue taskQueue) {
this.taskQueue = taskQueue;
this.deferredQueue = new ArrayList<Task>();
}
public synchronized void scheduleTask(Task task) {
if (paused) {
deferredQueue.add(task);
log.debug("added task to deferred queue: " + task);
} else {
taskQueue.scheduleTask(task);
log.debug("added task to task queue: " + task);
}
}
public synchronized void pause() {
paused = true;
}
public synchronized void unpause() {
paused = false;
for (Task task : deferredQueue) {
taskQueue.scheduleTask(task);
log.debug("moved task from deferred queue to task queue: " + task);
}
}
}
/**
* A single-threaded task queue that can tell us the status of the current
* task.
*
* If no current task, it can return a status of IDLE or SHUTDOWN.
*/
private static class TaskQueue {
private final ExecutorService queue = Executors
.newSingleThreadExecutor(new VitroBackgroundThread.Factory(
"SearchIndexer_TaskQueue"));
private AtomicReference<QueueStatus> current = new AtomicReference<>(
new QueueStatus(SearchIndexerStatus.idle()));
public void scheduleTask(Task task) {
try {
queue.execute(new TaskWrapper(task));
} catch (RejectedExecutionException e) {
log.warn("Search Indexer task was rejected: " + e);
}
}
public SearchIndexerStatus getStatus() {
return current.get().getStatus();
}
public void shutdown() {
try {
queue.shutdownNow();
boolean terminated = queue.awaitTermination(1, MINUTES);
if (!terminated) {
log.warn("SearchIndexer task queue did not shut down "
+ "within 1 minute.");
}
current.set(new QueueStatus(SearchIndexerStatus.shutdown()));
} catch (InterruptedException e) {
log.warn("call to 'awaitTermination' was interrupted.");
}
}
/** When this wrapper is run, we will know the current task and status. */
private class TaskWrapper implements Runnable {
private final Task task;
public TaskWrapper(Task task) {
this.task = task;
}
@Override
public void run() {
current.set(new QueueStatus(task));
log.debug("starting task: " + task);
task.run();
current.set(new QueueStatus(SearchIndexerStatus.idle()));
log.debug("ended task: " + task);
}
}
/** Either a specific status or a task to interrogate. */
private class QueueStatus {
private final Task task;
private final SearchIndexerStatus status;
public QueueStatus(Task task) {
this.task = Objects.requireNonNull(task);
this.status = null;
}
public QueueStatus(SearchIndexerStatus status) {
this.task = null;
this.status = Objects.requireNonNull(status);
}
public SearchIndexerStatus getStatus() {
if (task != null) {
return task.getStatus();
} else {
return status;
}
}
}
}
public static interface Task extends Runnable {
public SearchIndexerStatus getStatus();
public void notifyWorkUnitCompletion(Runnable workUnit);
}
/**
* A thread pool for handling many small units of work submitted by a task.
*
* The task is notified as each unit completes.
*
* Only one task is active at a time, so the task can simply wait until this
* pool is idle to know that all of its units have completed.
*
* When shutting down, no attempt is made to interrupt the currently
* executing work units, since they are assumed to be small.
*/
public static class WorkerThreadPool {
private final ThreadPoolExecutor pool;
public WorkerThreadPool() {
this.pool = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(50),
new VitroBackgroundThread.Factory(
"SearchIndexer_ThreadPool"));
}
public void submit(Runnable workUnit, Task task) {
pool.execute(new WorkUnitWrapper(workUnit, task));
}
public void waitUntilIdle() {
for (int i = 0; i < 60; i++) {
if (pool.getActiveCount() == 0) {
return;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public void shutdown() {
pool.shutdown();
try {
boolean terminated = pool.awaitTermination(1, MINUTES);
if (!terminated) {
log.warn("SearchIndexer thread pool did not shut down "
+ "within 1 minute.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static class WorkUnitWrapper implements Runnable {
private final Runnable workUnit;
private final Task task;
public WorkUnitWrapper(Runnable workUnit, Task task) {
this.workUnit = workUnit;
this.task = task;
}
@Override
public void run() {
try {
workUnit.run();
} finally {
task.notifyWorkUnitCompletion(workUnit);
}
}
}
}
}

View file

@ -6,7 +6,6 @@ import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
@ -26,11 +25,8 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext;
import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.search.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.search.documentBuilding.IndividualToSearchDocument;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder;
import edu.cornell.mannlib.vitro.webapp.search.indexing.SearchReindexingListener;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
import edu.cornell.mannlib.vitro.webapp.searchindex.indexing.IndexingUriFinder;
import edu.cornell.mannlib.vitro.webapp.startup.ComponentStartupStatusImpl;
import edu.cornell.mannlib.vitro.webapp.startup.StartupStatus;
@ -40,12 +36,12 @@ import edu.cornell.mannlib.vitro.webapp.utils.developer.Key;
import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisabledModelChangeListener;
/**
* TODO
* A silly implementation that just wraps the old IndexBuilder with a new SearchIndexerImpl.
* TODO A silly implementation that just wraps the old IndexBuilder with a new
* SearchIndexerImpl.
*/
public class SearchIndexerSetup implements ServletContextListener {
private static final Log log = LogFactory.getLog(SearchIndexerSetup.class);
private ServletContext ctx;
private OntModel displayModel;
private ConfigurationBeanLoader beanLoader;
@ -55,79 +51,55 @@ public class SearchIndexerSetup implements ServletContextListener {
this.ctx = sce.getServletContext();
this.displayModel = ModelAccess.on(ctx).getOntModel(DISPLAY);
this.beanLoader = new ConfigurationBeanLoader(displayModel, ctx);
ServletContext context = sce.getServletContext();
StartupStatus ss = StartupStatus.getBean(context);
SearchEngine searchEngine = ApplicationUtils.instance().getSearchEngine();
SearchEngine searchEngine = ApplicationUtils.instance()
.getSearchEngine();
try {
IndividualToSearchDocument indToSearchDoc = setupTranslation();
/* setup search indexer */
SearchIndexer searchIndexer = new SearchIndexer(searchEngine, indToSearchDoc);
// This is where the builder gets the list of places to try to
// get objects to index. It is filtered so that non-public text
// does not get into the search index.
WebappDaoFactory wadf = ModelAccess.on(context)
.getWebappDaoFactory();
VitroFilters vf = VitroFilterUtils.getPublicFilter(context);
wadf = new WebappDaoFactoryFiltering(wadf, vf);
// make objects that will find additional URIs for context nodes etc
List<IndexingUriFinder> uriFinders = loadUriFinders();
// Make the IndexBuilder
IndexBuilder builder = new IndexBuilder(searchIndexer, wadf,
uriFinders);
// Save it to the servlet context so we can access it later in the
// webapp.
context.setAttribute(IndexBuilder.class.getName(), builder);
// Create listener to notify index builder of changes to model
// (can be disabled by developer setting.)
ModelContext.registerListenerForChanges(context,
new DeveloperDisabledModelChangeListener(
new SearchReindexingListener(builder),
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER));
ss.info(this, "Setup of search indexer completed.");
ApplicationUtils.instance().getSearchIndexer().startup(ApplicationUtils.instance(), new ComponentStartupStatusImpl(this, ss));
} catch (Throwable e) {
ss.fatal(this, "could not setup search engine", e);
{ // >>>>> TODO
try {
// /* setup search indexer */
// SearchIndexer searchIndexer = new SearchIndexer(searchEngine,
// indToSearchDoc);
//
// // Make the IndexBuilder
// IndexBuilder builder = new IndexBuilder(searchIndexer, wadf,
// uriFinders);
//
// // Create listener to notify index builder of changes to model
// // (can be disabled by developer setting.)
// ModelContext
// .registerListenerForChanges(
// context,
// new DeveloperDisabledModelChangeListener(
// new SearchReindexingListener(builder),
// Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER));
//
// ss.info(this, "Setup of search indexer completed.");
//
} catch (Throwable e) {
ss.fatal(this, "could not setup search engine", e);
}
}
ApplicationUtils
.instance()
.getSearchIndexer()
.startup(ApplicationUtils.instance(),
new ComponentStartupStatusImpl(this, ss));
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
IndexBuilder builder = (IndexBuilder) sce.getServletContext()
.getAttribute(IndexBuilder.class.getName());
if (builder != null)
builder.stopIndexingThread();
ApplicationUtils.instance().getSearchIndexer()
.shutdown(ApplicationUtils.instance());
}
private IndividualToSearchDocument setupTranslation() {
try {
Set<SearchIndexExcluder> excluders = beanLoader.loadAll(SearchIndexExcluder.class);
log.debug("Excludes: (" + excluders.size() + ") " + excluders);
Set<DocumentModifier> modifiers = beanLoader.loadAll(DocumentModifier.class);
log.debug("Modifiers: (" + modifiers.size() + ") " + modifiers);
return new IndividualToSearchDocument(new ArrayList<>(excluders), new ArrayList<>(modifiers));
} catch (ConfigurationBeanLoaderException e) {
throw new RuntimeException("Failed to configure the SearchIndexer", e);
}
}
private List<IndexingUriFinder> loadUriFinders() {
try {
return new ArrayList<>(beanLoader.loadAll(IndexingUriFinder.class));
} catch (ConfigurationBeanLoaderException e) {
throw new RuntimeException("Failed to configure the SearchIndexer", e);
{ // >>>>> TODO
IndexBuilder builder = (IndexBuilder) sce.getServletContext()
.getAttribute(IndexBuilder.class.getName());
if (builder != null)
builder.stopIndexingThread();
}
}
}

View file

@ -20,7 +20,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.configuration.Property;
*/
public class ExcludeBasedOnType implements SearchIndexExcluder {
private static final String SKIP_MSG = "skipping due to type.";
private static final String SKIP_MSG = "skipping due to type: ";
private final Set<String> typeURIs = new HashSet<>();
@ -35,7 +35,7 @@ public class ExcludeBasedOnType implements SearchIndexExcluder {
return DONT_EXCLUDE;
}
if (typeURIinExcludeList(ind.getVClass())) {
return SKIP_MSG;
return SKIP_MSG + ind.getVClass();
}
List<VClass> vclasses = new ArrayList<>();
@ -44,7 +44,7 @@ public class ExcludeBasedOnType implements SearchIndexExcluder {
for (VClass vclz : vclasses) {
if (typeURIinExcludeList(vclz))
return SKIP_MSG;
return SKIP_MSG + vclz;
}
return DONT_EXCLUDE;

View file

@ -0,0 +1,41 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
/**
* TODO
*/
public class RebuildIndexTask implements Task {
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// TODO Auto-generated method stub
throw new RuntimeException("RebuildIndexTask.run() not implemented.");
}
/* (non-Javadoc)
* @see edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task#getStatus()
*/
@Override
public SearchIndexerStatus getStatus() {
// TODO Auto-generated method stub
throw new RuntimeException(
"RebuildIndexTask.getStatus() not implemented.");
}
@Override
public void notifyWorkUnitCompletion(Runnable workUnit) {
// TODO Auto-generated method stub
throw new RuntimeException("RebuildIndexTask.notifyWorkUnitCompletion() not implemented.");
}
}

View file

@ -0,0 +1,195 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.ALLTEXT;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.ALLTEXTUNSTEMMED;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.CLASSGROUP_URI;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.DOCID;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.INDEXEDTIME;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.MOST_SPECIFIC_TYPE_URIS;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.NAME_LOWERCASE_SINGLE_VALUED;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.NAME_RAW;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.RDFTYPE;
import static edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.jsoup.Jsoup;
import com.hp.hpl.jena.vocabulary.OWL;
import com.hp.hpl.jena.vocabulary.RDFS;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.beans.DataPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
public class UpdateDocumentWorkUnit implements Runnable {
private static final Log log = LogFactory
.getLog(UpdateDocumentWorkUnit.class);
private static final String URI_OWL_THING = OWL.Thing.getURI();
private static final String URI_DIFFERENT_FROM = OWL.differentFrom.getURI();
private static final String URI_RDFS_LABEL = RDFS.label.getURI();
private final Individual ind;
private final List<DocumentModifier> modifiers;
private final SearchEngine searchEngine;
public UpdateDocumentWorkUnit(Individual ind,
Collection<DocumentModifier> modifiers) {
this.ind = ind;
this.modifiers = new ArrayList<>(modifiers);
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
}
public Individual getInd() {
return ind;
}
@Override
public void run() {
try {
SearchInputDocument doc = searchEngine.createInputDocument();
addIdAndUri(doc);
addLabel(doc);
addClasses(doc);
addMostSpecificTypes(doc);
addObjectPropertyText(doc);
addDataPropertyText(doc);
addEntityBoost(doc);
for (DocumentModifier modifier : modifiers) {
modifier.modifyDocument(ind, doc);
}
addIndexedTime(doc);
searchEngine.add(doc);
} catch (Exception e) {
log.warn("Failed to add '" + ind + "' to the search index.", e);
}
}
private void addIdAndUri(SearchInputDocument doc) {
doc.addField(DOCID, SearchIndexerUtils.getIdForUri(ind.getURI()));
doc.addField(URI, ind.getURI());
}
private void addLabel(SearchInputDocument doc) {
String name = ind.getRdfsLabel();
if (name == null) {
name = ind.getLocalName();
}
doc.addField(NAME_RAW, name);
doc.addField(NAME_LOWERCASE_SINGLE_VALUED, name);
}
/**
* For each class that the individual belongs to, record the class URI, the
* class group URI, the class Name, and the class boost.
*/
private void addClasses(SearchInputDocument doc) {
List<VClass> vclasses = ind.getVClasses(false);
if (vclasses == null) {
return;
}
for (VClass clz : vclasses) {
String classUri = clz.getURI();
if (classUri == null || URI_OWL_THING.equals(classUri)) {
continue;
}
doc.addField(RDFTYPE, classUri);
String classGroupUri = clz.getGroupURI();
if (classGroupUri != null) {
doc.addField(CLASSGROUP_URI, classGroupUri);
}
addToAlltext(doc, clz.getName());
Float boost = clz.getSearchBoost();
if (boost != null) {
doc.setDocumentBoost(doc.getDocumentBoost() + boost);
}
}
}
private void addMostSpecificTypes(SearchInputDocument doc) {
List<String> mstURIs = ind.getMostSpecificTypeURIs();
if (mstURIs != null) {
for (String typeURI : mstURIs) {
if (StringUtils.isNotBlank(typeURI)) {
doc.addField(MOST_SPECIFIC_TYPE_URIS, typeURI);
}
}
}
}
private void addObjectPropertyText(SearchInputDocument doc) {
List<ObjectPropertyStatement> stmts = ind.getObjectPropertyStatements();
if (stmts == null) {
return;
}
for (ObjectPropertyStatement stmt : stmts) {
if (URI_DIFFERENT_FROM.equals(stmt.getPropertyURI())) {
continue;
}
addToAlltext(doc, stmt.getObject().getRdfsLabel());
}
}
private void addDataPropertyText(SearchInputDocument doc) {
List<DataPropertyStatement> stmts = ind.getDataPropertyStatements();
if (stmts == null) {
return;
}
for (DataPropertyStatement stmt : stmts) {
if (stmt.getDatapropURI().equals(URI_RDFS_LABEL)) {
continue;
}
addToAlltext(doc, stmt.getData());
}
}
private void addEntityBoost(SearchInputDocument doc) {
Float boost = ind.getSearchBoost();
if(boost != null && ! boost.equals(0.0F)) {
doc.setDocumentBoost(boost);
}
}
private void addIndexedTime(SearchInputDocument doc) {
doc.addField(INDEXEDTIME, (Object) new DateTime().getMillis());
}
private void addToAlltext(SearchInputDocument doc, String raw) {
if (StringUtils.isBlank(raw)) {
return;
}
String clean = Jsoup.parse(raw).text();
if (StringUtils.isBlank(clean)) {
return;
}
doc.addField(ALLTEXT, clean);
doc.addField(ALLTEXTUNSTEMMED, clean);
}
}

View file

@ -0,0 +1,199 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.searchindex.tasks;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.START_PROCESSING_URIS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STOP_PROCESSING_URIS;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State.PROCESSING_URIS;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.ListenerList;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.Task;
import edu.cornell.mannlib.vitro.webapp.searchindex.SearchIndexerImpl.WorkerThreadPool;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
import edu.cornell.mannlib.vitro.webapp.searchindex.exclusions.SearchIndexExcluder;
/**
* Given a list of URIs, remove the ones that don't belong in the index and
* update the ones that do belong.
*
* A URI doesn't belong in the index if there is no individual with that URI, or
* if the individual has no VClasses assigned to it, or if the individual is
* excluded by one of the excluders.
*
* Deletions are done synchronously, but updates are scheduled to run on the
* thread pool.
*/
public class UpdateUrisTask implements Task {
private static final Log log = LogFactory.getLog(UpdateUrisTask.class);
private final Set<String> uris;
private final IndividualDao indDao;
private final List<SearchIndexExcluder> excluders;
private final List<DocumentModifier> modifiers;
private final ListenerList listeners;
private final WorkerThreadPool pool;
private final Status status;
private final SearchEngine searchEngine;
public UpdateUrisTask(Collection<String> uris,
Collection<SearchIndexExcluder> excluders,
Collection<DocumentModifier> modifiers, IndividualDao indDao,
ListenerList listeners, WorkerThreadPool pool) {
this.uris = new HashSet<>(uris);
this.excluders = new ArrayList<>(excluders);
this.modifiers = new ArrayList<>(modifiers);
this.indDao = indDao;
this.listeners = listeners;
this.pool = pool;
this.status = new Status(uris.size());
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
}
@Override
public void run() {
listeners.fireEvent(new Event(START_PROCESSING_URIS, status
.getSearchIndexerStatus()));
for (String uri : uris) {
if (isInterrupted()) {
log.info("Interrupted: " + status.getSearchIndexerStatus());
return;
} else {
Individual ind = getIndividual(uri);
if (ind == null || hasNoClass(ind) || isExcluded(ind)) {
deleteDocument(uri);
} else {
updateDocument(ind);
}
}
}
pool.waitUntilIdle();
listeners.fireEvent(new Event(STOP_PROCESSING_URIS, status
.getSearchIndexerStatus()));
}
private boolean isInterrupted() {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
return true;
} else {
return false;
}
}
private Individual getIndividual(String uri) {
Individual ind = indDao.getIndividualByURI(uri);
if (ind == null) {
log.debug("Found no individual for '" + uri + "'");
}
return ind;
}
private boolean hasNoClass(Individual ind) {
List<VClass> vclasses = ind.getVClasses(false);
if (vclasses == null || vclasses.isEmpty()) {
log.debug("Individual " + ind + " has no classes.");
return true;
}
return false;
}
private boolean isExcluded(Individual ind) {
for (SearchIndexExcluder excluder : excluders) {
String message = excluder.checkForExclusion(ind);
if (message != SearchIndexExcluder.DONT_EXCLUDE) {
log.debug("Excluded " + ind + " because " + message);
return true;
}
}
return false;
}
/** A delete is fast enough to be done synchronously. */
private void deleteDocument(String uri) {
try {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementDeletes();
log.debug("deleted '" + uri + "' from search index.");
} catch (SearchEngineException e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
private void updateDocument(Individual ind) {
Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers);
pool.submit(workUnit, this);
log.debug("scheduled update to " + ind);
}
@Override
public void notifyWorkUnitCompletion(Runnable workUnit) {
log.debug("completed update to "
+ ((UpdateDocumentWorkUnit) workUnit).getInd());
status.incrementUpdates();
}
@Override
public SearchIndexerStatus getStatus() {
return status.getSearchIndexerStatus();
}
// ----------------------------------------------------------------------
// helper classes
// ----------------------------------------------------------------------
/**
* A thread-safe collection of status information. All methods are
* synchronized.
*/
private static class Status {
private final int total;
private int updated = 0;
private int deleted = 0;
private Date since = new Date();
public Status(int total) {
this.total = total;
}
public synchronized void incrementUpdates() {
updated++;
since = new Date();
}
public synchronized void incrementDeletes() {
deleted++;
since = new Date();
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
int remaining = total - updated - deleted;
return new SearchIndexerStatus(PROCESSING_URIS, since,
new UriCounts(deleted, updated, remaining, total));
}
}
}