Merge branch 'feature/searchIndexerTry' into develop

This commit is contained in:
Jim Blake 2015-01-21 12:32:48 -05:00
commit 2fbf467ab4
23 changed files with 451 additions and 202 deletions

View file

@ -68,7 +68,7 @@
# developer.searchIndex.showDocuments = false
# developer.searchIndex.uriOrNameRestriction = .*
# developer.searchIndex.documentRestriction = .*
# developer.searchIndex.logIndexingBreakdownTimings = .*
# developer.searchIndex.logIndexingBreakdownTimings = false
# developer.searchIndex.suppressModelChangeListener = false
# developer.searchDeletions.enable = false

View file

@ -53,8 +53,8 @@ public class ObjectPropertyStatementDaoJena extends JenaBaseDao implements Objec
private static final Log log = LogFactory.getLog(ObjectPropertyStatementDaoJena.class);
private DatasetWrapperFactory dwf;
private RDFService rdfService;
protected DatasetWrapperFactory dwf;
protected RDFService rdfService;
public ObjectPropertyStatementDaoJena(RDFService rdfService,
DatasetWrapperFactory dwf,

View file

@ -2,164 +2,205 @@
package edu.cornell.mannlib.vitro.webapp.dao.jena;
import static edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService.ModelSerializationFormat.N3;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.ontology.OntModelSpec;
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.QueryExecution;
import com.hp.hpl.jena.query.QueryExecutionFactory;
import com.hp.hpl.jena.query.QueryFactory;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.util.iterator.ClosableIterator;
import com.hp.hpl.jena.vocabulary.RDF;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectProperty;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectPropertyStatementImpl;
import edu.cornell.mannlib.vitro.webapp.dao.ObjectPropertyStatementDao;
import edu.cornell.mannlib.vitro.webapp.dao.jena.IndividualSDB.IndividualNotFoundException;
import edu.cornell.mannlib.vitro.webapp.dao.jena.WebappDaoFactorySDB.SDBDatasetMode;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils;
public class ObjectPropertyStatementDaoSDB extends
ObjectPropertyStatementDaoJena implements ObjectPropertyStatementDao {
private static final Log log = LogFactory
.getLog(ObjectPropertyStatementDaoSDB.class);
private static final Log log = LogFactory.getLog(ObjectPropertyStatementDaoSDB.class);
// Get the types of the base entity.
private static final String SUBJECT_TYPE_QUERY = ""
+ "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \n"
+ "CONSTRUCT { \n" //
+ " ?uri rdf:type ?type . \n" //
+ "} WHERE { \n" //
+ " ?uri rdf:type ?type . \n" //
+ "} \n";
private DatasetWrapperFactory dwf;
private SDBDatasetMode datasetMode;
private WebappDaoFactorySDB wadf;
// Get the types of all objects of properties.
private static final String OBJECT_TYPE_QUERY = ""
+ "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \n"
+ "CONSTRUCT { \n" //
+ " ?uri ?p ?o . \n" //
+ " ?o rdf:type ?type . \n" //
+ "} WHERE { \n" //
+ " ?uri ?p ?o . \n" //
+ " ?o rdf:type ?type . \n" //
+ "} \n";
public ObjectPropertyStatementDaoSDB(
RDFService rdfService,
DatasetWrapperFactory dwf,
SDBDatasetMode datasetMode,
// Get the labels of all objects of properties.
private static final String OBJECT_LABEL_QUERY = ""
+ "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> \n"
+ "CONSTRUCT { \n" //
+ " ?uri ?p ?o . \n" //
+ " ?o rdfs:label ?label . \n" //
+ "} WHERE { \n" //
+ " ?uri ?p ?o . \n" //
+ " ?o rdfs:label ?label . \n" //
+ "} \n";
private final WebappDaoFactorySDB wadf;
private final SDBDatasetMode datasetMode;
public ObjectPropertyStatementDaoSDB(RDFService rdfService,
DatasetWrapperFactory dwf, SDBDatasetMode datasetMode,
WebappDaoFactorySDB wadf) {
super(rdfService, dwf, wadf);
this.dwf = dwf;
this.datasetMode = datasetMode;
this.wadf = wadf;
this.datasetMode = datasetMode;
}
@Override
public Individual fillExistingObjectPropertyStatements(Individual entity) {
if (entity.getURI() == null)
if (entity == null || entity.getURI() == null)
return entity;
else {
Map<String, ObjectProperty> uriToObjectProperty = new HashMap<String,ObjectProperty>();
String query = "CONSTRUCT { \n" +
" <" + entity.getURI() + "> ?p ?o . \n" +
// " ?o a ?oType . \n" +
// " ?o <" + RDFS.label.getURI() + "> ?oLabel . \n" +
// " ?o <" + VitroVocabulary.MONIKER + "> ?oMoniker \n" +
"} WHERE { \n" +
" { <" + entity.getURI() + "> ?p ?o } \n" +
// " UNION { <" + entity.getURI() + "> ?p ?o . ?o a ?oType } \n" +
// " UNION { <" + entity.getURI() + "> ?p ?o . \n" +
// " ?o <" + RDFS.label.getURI() + "> ?oLabel } \n" +
// " UNION { <" + entity.getURI() + "> ?p ?o . \n " +
// " ?o <" + VitroVocabulary.MONIKER + "> ?oMoniker } \n" +
"}";
long startTime = System.currentTimeMillis();
Model m = null;
DatasetWrapper w = dwf.getDatasetWrapper();
Dataset dataset = w.getDataset();
dataset.getLock().enterCriticalSection(Lock.READ);
QueryExecution qexec = null;
try {
qexec = QueryExecutionFactory.create(QueryFactory.create(query), dataset);
m = qexec.execConstruct();
} finally {
if(qexec != null) qexec.close();
dataset.getLock().leaveCriticalSection();
w.close();
}
if (log.isDebugEnabled()) {
log.debug("Time (ms) to query for related individuals: " + (System.currentTimeMillis() - startTime));
if (System.currentTimeMillis() - startTime > 1000) {
//log.debug(query);
log.debug("Results size (statements): " + m.size());
}
}
List<ObjectPropertyStatement> objectPropertyStatements = new ArrayList<>();
String subjectUri = entity.getURI();
OntModel ontModel = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM, m);
ontModel.enterCriticalSection(Lock.READ);
try {
Resource ind = ontModel.getResource(entity.getURI());
List<ObjectPropertyStatement> objPropertyStmtList = new ArrayList<ObjectPropertyStatement>();
ClosableIterator<Statement> propIt = ind.listProperties();
try {
while (propIt.hasNext()) {
Statement st = propIt.next();
if (st.getObject().isResource() && !(NONUSER_NAMESPACES.contains(st.getPredicate().getNameSpace()))) {
try {
ObjectPropertyStatement objPropertyStmt = new ObjectPropertyStatementImpl();
objPropertyStmt.setSubjectURI(entity.getURI());
objPropertyStmt.setSubject(entity);
objPropertyStmt.setObjectURI(((Resource)st.getObject()).getURI());
Model m = getInfoForObjectsOfThisEntity(subjectUri);
objPropertyStmt.setPropertyURI(st.getPredicate().getURI());
Property prop = st.getPredicate();
if( uriToObjectProperty.containsKey(prop.getURI())){
objPropertyStmt.setProperty(uriToObjectProperty.get(prop.getURI()));
}else{
ObjectProperty p = getWebappDaoFactory().getObjectPropertyDao().getObjectPropertyByURI(prop.getURI());
if( p != null ){
uriToObjectProperty.put(prop.getURI(), p);
objPropertyStmt.setProperty(uriToObjectProperty.get(prop.getURI()));
}else{
//if ObjectProperty not found in ontology, skip it
Set<String> subjectTypes = getTypes(m, subjectUri);
for (ObjectPropertyPair pair : getRawObjectPropertyPairs(m,
subjectUri)) {
String predicateUri = pair.getPredicateUri();
String objectUri = pair.getObjectUri();
Set<String> objectTypes = getTypes(m, objectUri);
ObjectProperty prop = findRawProperty(predicateUri);
if (prop == null) {
continue;
}
}
if (objPropertyStmt.getObjectURI() != null) {
//this might throw IndividualNotFoundException
Individual objInd = new IndividualSDB(
objPropertyStmt.getObjectURI(),
this.dwf,
datasetMode,
wadf);
objPropertyStmt.setObject(objInd);
}
//only add statement to list if it has its values filled out
if ( (objPropertyStmt.getSubjectURI() != null)
&& (objPropertyStmt.getPropertyURI() != null)
&& (objPropertyStmt.getObject() != null) ) {
objPropertyStmtList.add(objPropertyStmt);
}
} catch (IndividualNotFoundException t) {
log.debug(t,t);
continue;
} catch (Throwable t){
log.error(t,t);
continue;
}
}
}
} finally {
propIt.close();
}
entity.setObjectPropertyStatements(objPropertyStmtList);
} finally {
ontModel.leaveCriticalSection();
Individual object = new IndividualSDB(objectUri, dwf,
datasetMode, wadf, m);
objectPropertyStatements.add(createStatement(entity, prop,
object));
}
entity.setObjectPropertyStatements(objectPropertyStatements);
return entity;
}
}
/**
* Get the types of this entity. Get the related object and the predicates
* by which they are related. Get the types and labels of those related
* objects.
*/
private Model getInfoForObjectsOfThisEntity(String subjectUri) {
Model m = ModelFactory.createDefaultModel();
try {
m.add(RDFServiceUtils.parseModel(
rdfService.sparqlConstructQuery(
substituteUri(subjectUri, SUBJECT_TYPE_QUERY), N3),
N3));
m.add(RDFServiceUtils.parseModel(
rdfService.sparqlConstructQuery(
substituteUri(subjectUri, OBJECT_TYPE_QUERY), N3),
N3));
m.add(RDFServiceUtils.parseModel(
rdfService.sparqlConstructQuery(
substituteUri(subjectUri, OBJECT_LABEL_QUERY), N3),
N3));
} catch (RDFServiceException e) {
log.warn("Failed to fill object property statements for '"
+ subjectUri + "'", e);
}
return m;
}
private String substituteUri(String uri, String query) {
return query.replace("?uri", "<" + uri + "> ");
}
private Set<String> getTypes(Model m, String uri) {
Set<String> typeUris = new HashSet<>();
for (RDFNode typeNode : m.listObjectsOfProperty(m.createResource(uri),
RDF.type).toSet()) {
if (typeNode.isURIResource()) {
typeUris.add(typeNode.asResource().getURI());
}
}
return typeUris;
}
private List<ObjectPropertyPair> getRawObjectPropertyPairs(Model m,
String subjectUri) {
List<ObjectPropertyPair> list = new ArrayList<>();
for (Statement stmt : m.listStatements(m.createResource(subjectUri),
null, (RDFNode) null).toList()) {
if (wadf.getNonuserNamespaces().contains(
stmt.getPredicate().getNameSpace())) {
continue;
}
if (!stmt.getObject().isURIResource()) {
continue;
}
list.add(new ObjectPropertyPair(stmt.getPredicate().getURI(), stmt
.getObject().asResource().getURI()));
}
return list;
}
private ObjectProperty findRawProperty(String predicateUri) {
return wadf.getObjectPropertyDao().getObjectPropertyByURI(predicateUri);
}
private ObjectPropertyStatement createStatement(Individual entity,
ObjectProperty prop, Individual object) {
ObjectPropertyStatementImpl ops = new ObjectPropertyStatementImpl();
ops.setSubject(entity);
ops.setProperty(prop);
ops.setObject(object);
return ops;
}
// ----------------------------------------------------------------------
// Helper classes
// ----------------------------------------------------------------------
private static class ObjectPropertyPair {
private final String predicateUri;
private final String objectUri;
public ObjectPropertyPair(String predicateUri, String objectUri) {
this.predicateUri = predicateUri;
this.objectUri = objectUri;
}
public String getPredicateUri() {
return predicateUri;
}
public String getObjectUri() {
return objectUri;
}
}
}

View file

@ -9,6 +9,11 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application;
/**
* The principle interface for the SearchEngine. All search-related objects are
* created by these methods.
*
* All methods that throw SearchEngineException should attempt to distinguish
* whether the exception is caused because the SearchEngine is not responding.
* In that case, they should throw a SearchEngineNotRespondingException, so the
* client code can choose to respond accordingly.
*/
public interface SearchEngine extends Application.Module {

View file

@ -0,0 +1,27 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.modules.searchEngine;
/**
* Indicates that a request to the SearchEngine has timed out, or given some
* other indication that no response will be coming.
*/
public class SearchEngineNotRespondingException extends SearchEngineException {
public SearchEngineNotRespondingException() {
super();
}
public SearchEngineNotRespondingException(String message) {
super(message);
}
public SearchEngineNotRespondingException(Throwable cause) {
super(cause);
}
public SearchEngineNotRespondingException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -12,6 +12,9 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application;
/**
* Interface for the code that controls the contents of the search index.
*
* If calls are made to schedule tasks prior to startup(), they will be queued,
* since the indexer is created in paused mode.
*
* The only calls that are valid after shutdown are shutdown(), getStatus() and
* removeListener().
*/
@ -26,6 +29,8 @@ public interface SearchIndexer extends Application.Module {
* We accumulate a batch of affected URIs, removing duplicates if they
* occur, and then submit them for updates.
*
* If called before startup or while paused, this task will be queued.
*
* @param urls
* if null or empty, this call has no effect.
* @throws IllegalStateException
@ -43,6 +48,8 @@ public interface SearchIndexer extends Application.Module {
* A URI belongs in the index if it refers to an existing individual in the
* model, and is not excluded.
*
* If called before startup or while paused, this task will be queued.
*
* @param uris
* if null or empty, this call has no effect.
* @throws IllegalStateException
@ -57,6 +64,8 @@ public interface SearchIndexer extends Application.Module {
* If a rebuild is already pending or in progress, this method has no
* effect.
*
* If called before startup or while paused, this task will be queued.
*
* @throws IllegalStateException
* if called after shutdown()
*/
@ -64,7 +73,12 @@ public interface SearchIndexer extends Application.Module {
/**
* Stop processing new tasks. Requests will be queued until a call to
* unpause().
* unpause(). Fires a PAUSED event to listeners.
*
* The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* If already paused, this call has no effect.
*
* @throws IllegalStateException
* if called after shutdown()
@ -73,9 +87,13 @@ public interface SearchIndexer extends Application.Module {
/**
* Resume processing new tasks. Any requests that were received since the
* call to pause() will now be scheduled for processing.
* call to pause() will now be scheduled for processing. Fires an UNPAUSED
* event to listeners.
*
* Has no effect if called after shutdown().
* The SearchIndexer is paused when created. When fully initialized, it
* should be unpaused.
*
* Has no effect if called after shutdown() or if not paused.
*/
void unpause();
@ -132,6 +150,8 @@ public interface SearchIndexer extends Application.Module {
public enum Type {
STARTUP, PROGRESS,
PAUSE, UNPAUSE,
START_PROCESSING_URIS, STOP_PROCESSING_URIS,
START_PROCESSING_STATEMENTS, STOP_PROCESSING_STATEMENTS,

View file

@ -102,19 +102,25 @@ public class SearchIndexerStatus {
}
public static class UriCounts extends Counts {
private final int excluded;
private final int deleted;
private final int updated;
private final int remaining;
private final int total;
public UriCounts(int deleted, int updated, int remaining, int total) {
public UriCounts(int excluded, int deleted, int updated, int remaining, int total) {
super(Type.URI_COUNTS);
this.excluded = excluded;
this.deleted = deleted;
this.updated = updated;
this.remaining = remaining;
this.total = total;
}
public int getExcluded() {
return excluded;
}
public int getDeleted() {
return deleted;
}
@ -133,7 +139,7 @@ public class SearchIndexerStatus {
@Override
public String toString() {
return "[deleted=" + deleted + ", updated=" + updated
return "[excluded=" + excluded + ", deleted=" + deleted + ", updated=" + updated
+ ", remaining=" + remaining + ", total=" + total + "]";
}
}

View file

@ -16,10 +16,13 @@ import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.auth.permissions.SimplePermission;
import edu.cornell.mannlib.vitro.webapp.auth.policy.PolicyHelper;
import edu.cornell.mannlib.vitro.webapp.auth.requestedAction.AuthorizationRequest;
import edu.cornell.mannlib.vitro.webapp.auth.requestedAction.RequestedAction;
import edu.cornell.mannlib.vitro.webapp.controller.VitroRequest;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.FreemarkerHttpServlet;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.UrlBuilder;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.RedirectResponseValues;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.ResponseValues;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.TemplateResponseValues;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
@ -99,10 +102,6 @@ public class IndexController extends FreemarkerHttpServlet {
@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp)
throws IOException, ServletException {
if (!isAuthorizedToDisplayPage(req, resp, REQUIRED_ACTIONS)) {
return;
}
switch (RequestType.fromRequest(req)) {
case STATUS:
showStatus(req, resp);
@ -118,12 +117,17 @@ public class IndexController extends FreemarkerHttpServlet {
return "Rebuild Search Index";
}
@Override
protected AuthorizationRequest requiredActions(VitroRequest vreq) {
return REQUIRED_ACTIONS;
}
@Override
protected ResponseValues processRequest(VitroRequest vreq) {
switch (RequestType.fromRequest(vreq)) {
case REBUILD:
requestRebuild();
return showDisplay();
return new RedirectResponseValues(PAGE_URL);
default:
return showDisplay();
}
@ -138,13 +142,22 @@ public class IndexController extends FreemarkerHttpServlet {
private void showStatus(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
if (!PolicyHelper.isAuthorizedForActions(req, REQUIRED_ACTIONS)) {
resp.setStatus(HttpServletResponse.SC_FORBIDDEN);
resp.getWriter().write(
"You are not authorized to access this page.");
return;
}
try {
Map<String, Object> body = new HashMap<>();
body.put("statusUrl", UrlBuilder.getUrl(PAGE_URL, "status", "true"));
body.put("rebuildUrl",
UrlBuilder.getUrl(PAGE_URL, "rebuild", "true"));
body.put("status", buildStatusMap(indexer.getStatus()));
if (history != null) {
body.put("history", history.toMaps());
}
String rendered = FreemarkerProcessingServiceSetup.getService(
getServletContext()).renderTemplate(STATUS_TEMPLATE_NAME,

View file

@ -28,7 +28,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatu
public class IndexHistory implements SearchIndexer.Listener {
private static final Log log = LogFactory.getLog(IndexHistory.class);
private final static int MAX_EVENTS = 10;
private final static int MAX_EVENTS = 20;
private final Deque<Event> events = new LinkedList<>();
@ -84,6 +84,7 @@ public class IndexHistory implements SearchIndexer.Listener {
}
private void addCounts(UriCounts counts, Map<String, Object> map) {
map.put("excluded", counts.getExcluded());
map.put("updated", counts.getUpdated());
map.put("deleted", counts.getDeleted());
map.put("remaining", counts.getRemaining());

View file

@ -3,6 +3,7 @@
package edu.cornell.mannlib.vitro.webapp.searchengine.solr;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -19,6 +20,7 @@ import edu.cornell.mannlib.vitro.webapp.modules.Application;
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineNotRespondingException;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResponse;
@ -72,8 +74,8 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.ping();
} catch (SolrServerException | IOException e) {
throw new SearchEngineException(
"Solr server did not respont to ping.", e);
throw appropriateException("Solr server did not respond to ping.",
e);
}
}
@ -93,8 +95,8 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.add(SolrConversionUtils.convertToSolrInputDocuments(docs));
} catch (SolrServerException | IOException e) {
throw new SearchEngineException(
"Solr server failed to add documents " + docs, e);
throw appropriateException("Solr server failed to add documents "
+ docs, e);
}
}
@ -103,8 +105,7 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.commit();
} catch (SolrServerException | IOException e) {
throw new SearchEngineException("Failed to commit to Solr server.",
e);
throw appropriateException("Failed to commit to Solr server.", e);
}
}
@ -113,8 +114,7 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.commit(wait, wait);
} catch (SolrServerException | IOException e) {
throw new SearchEngineException("Failed to commit to Solr server.",
e);
throw appropriateException("Failed to commit to Solr server.", e);
}
}
@ -128,7 +128,7 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.deleteById(new ArrayList<>(ids));
} catch (SolrServerException | IOException e) {
throw new SearchEngineException(
throw appropriateException(
"Solr server failed to delete documents: " + ids, e);
}
}
@ -138,7 +138,7 @@ public class SolrSearchEngine implements SearchEngine {
try {
server.deleteByQuery(query);
} catch (SolrServerException | IOException e) {
throw new SearchEngineException(
throw appropriateException(
"Solr server failed to delete documents: " + query, e);
}
}
@ -162,7 +162,7 @@ public class SolrSearchEngine implements SearchEngine {
QueryResponse response = server.query(solrQuery);
return SolrConversionUtils.convertToSearchResponse(response);
} catch (SolrServerException e) {
throw new SearchEngineException(
throw appropriateException(
"Solr server failed to execute the query" + query, e);
}
}
@ -172,4 +172,22 @@ public class SolrSearchEngine implements SearchEngine {
SearchResponse response = query(createQuery("*:*"));
return (int) response.getResults().getNumFound();
}
/**
* If there is a SocketTimeoutException in the causal chain for this
* exception, then wrap it in a SearchEngineNotRespondingException instead
* of a generic SearchEngineException.
*/
private SearchEngineException appropriateException(String message,
Exception e) {
Throwable cause = e;
while (cause != null) {
if (cause instanceof SocketTimeoutException) {
return new SearchEngineNotRespondingException(message, e);
}
cause = cause.getCause();
}
return new SearchEngineException(message, e);
}
}

View file

@ -2,6 +2,9 @@
package edu.cornell.mannlib.vitro.webapp.searchindex;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@ -20,6 +23,7 @@ import com.hp.hpl.jena.rdf.model.StmtIterator;
import edu.cornell.mannlib.vitro.webapp.dao.jena.event.EditEvent;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
@ -27,13 +31,18 @@ import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
* When a change is heard, wait for an interval to see if more changes come in.
* When changes stop coming in for a specified interval, send what has
* accumulated.
*
* When the SearchIndexer pauses, stop sending changes until the SearchIndexer
* unpauses.
*/
public class IndexingChangeListener implements ChangeListener {
public class IndexingChangeListener implements ChangeListener,
SearchIndexer.Listener {
private static final Log log = LogFactory
.getLog(IndexingChangeListener.class);
private final SearchIndexer searchIndexer;
private final Ticker ticker;
private volatile boolean paused = true;
/** All access to the list must be synchronized. */
private final List<Statement> changes;
@ -42,17 +51,33 @@ public class IndexingChangeListener implements ChangeListener {
this.searchIndexer = searchIndexer;
this.ticker = new Ticker();
this.changes = new ArrayList<>();
searchIndexer.addListener(this);
}
private synchronized void noteChange(Statement stmt) {
changes.add(stmt);
if (!paused) {
ticker.start();
}
}
@Override
public void receiveSearchIndexerEvent(Event event) {
if (event.getType() == PAUSE) {
paused = true;
} else if (event.getType() == UNPAUSE) {
paused = false;
respondToTicker();
}
}
private synchronized void respondToTicker() {
if (!paused && !changes.isEmpty()) {
searchIndexer.scheduleUpdatesForStatements(changes);
changes.clear();
}
}
public void shutdown() {
ticker.shutdown();
@ -160,7 +185,7 @@ public class IndexingChangeListener implements ChangeListener {
if (queue.isShutdown()) {
log.warn("Attempt to start ticker after shutdown request.");
} else {
queue.schedule(new TickerResponse(), 500, TimeUnit.MILLISECONDS);
queue.schedule(new TickerResponse(), 1, TimeUnit.SECONDS);
running = true;
}
}

View file

@ -3,9 +3,11 @@
package edu.cornell.mannlib.vitro.webapp.searchindex;
import static edu.cornell.mannlib.vitro.webapp.modelaccess.ModelNames.DISPLAY;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.PAUSE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_COMPLETE;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.SHUTDOWN_REQUESTED;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.STARTUP;
import static edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type.UNPAUSE;
import static edu.cornell.mannlib.vitro.webapp.utils.developer.Key.SEARCH_INDEX_LOG_INDEXING_BREAKDOWN_TIMINGS;
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.IDLE;
import static edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread.WorkLevel.WORKING;
@ -39,6 +41,7 @@ import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
import edu.cornell.mannlib.vitro.webapp.modules.Application;
import edu.cornell.mannlib.vitro.webapp.modules.ComponentStartupStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.State;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
@ -235,11 +238,13 @@ public class SearchIndexerImpl implements SearchIndexer {
@Override
public void pause() {
scheduler.pause();
listeners.fireEvent(new Event(PAUSE, getStatus()));
}
@Override
public void unpause() {
scheduler.unpause();
listeners.fireEvent(new Event(UNPAUSE, getStatus()));
}
@Override
@ -263,8 +268,8 @@ public class SearchIndexerImpl implements SearchIndexer {
if (status.getState() != State.SHUTDOWN) {
listeners.fireEvent(new Event(SHUTDOWN_REQUESTED, status));
pool.shutdown();
taskQueue.shutdown();
pool.shutdown();
for (DocumentModifier dm : modifiers) {
try {
@ -321,7 +326,7 @@ public class SearchIndexerImpl implements SearchIndexer {
private static class Scheduler {
private final TaskQueue taskQueue;
private final List<Task> deferredQueue;
private volatile boolean paused;
private volatile boolean paused = true;
public Scheduler(TaskQueue taskQueue) {
this.taskQueue = taskQueue;
@ -461,6 +466,9 @@ public class SearchIndexerImpl implements SearchIndexer {
*
* The task is notified as each unit completes.
*
* If no thread is available for a work unit, the thread of the task itself
* will run it. This provides automatic throttling.
*
* Only one task is active at a time, so the task can simply wait until this
* pool is idle to know that all of its units have completed.
*
@ -474,14 +482,21 @@ public class SearchIndexerImpl implements SearchIndexer {
this.pool = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50),
new VitroBackgroundThread.Factory(
"SearchIndexer_ThreadPool"));
"SearchIndexer_ThreadPool"),
new ThreadPoolExecutor.CallerRunsPolicy());
}
public void submit(Runnable workUnit, Task task) {
try {
pool.execute(new WorkUnitWrapper(workUnit, task));
} catch (RejectedExecutionException e) {
log.warn("Work unit was rejected: " + workUnit + " for " + task);
if (pool.isShutdown()) {
log.warn("Work unit was rejected: " + workUnit + " for "
+ task);
} else {
log.error("Work unit was rejected: " + workUnit + " for "
+ task, e);
}
}
}

View file

@ -15,6 +15,7 @@ import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.modules.Application;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineNotRespondingException;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event.Type;
@ -32,7 +33,7 @@ import edu.cornell.mannlib.vitro.webapp.utils.developer.listeners.DeveloperDisab
* Start the SearchIndexer. Create a listener on the RDFService and link it to
* the indexer.
*
* Create a history object as a listener and make it avaiable to the
* Create a history object as a listener and make it available to the
* IndexController.
*
* Create a listener that will call commit() on the SearchEngine every time it
@ -61,8 +62,10 @@ public class SearchIndexerSetup implements ServletContextListener {
listener = new IndexingChangeListener(searchIndexer);
// Wrap it so it can be disabled by a developer flag.
listenerWrapper = new DeveloperDisabledChangeListener(listener,
Key.SEARCH_INDEX_SUPPRESS_MODEL_CHANGE_LISTENER);
RDFServiceUtils.getRDFServiceFactory(ctx).registerListener(
listenerWrapper);
@ -75,6 +78,7 @@ public class SearchIndexerSetup implements ServletContextListener {
searchIndexer
.startup(app, new ComponentStartupStatusImpl(this, ss));
searchIndexer.unpause();
ss.info(this, "Setup of search indexer completed.");
} catch (RDFServiceException e) {
@ -117,6 +121,9 @@ public class SearchIndexerSetup implements ServletContextListener {
private void commitChanges() {
try {
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.error("Failed to commit the changes: "
+ "the search engine is not responding.");
} catch (Exception e) {
log.error("Failed to commit the changes.", e);
}

View file

@ -9,12 +9,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.rdf.model.Statement;
/**
* The basic implementation.
*/
public class IndexingUriFinderListBasic implements IndexingUriFinderList {
private static final Log log = LogFactory
.getLog(IndexingUriFinderListBasic.class);
private final List<IndexingUriFinder> finders;
public IndexingUriFinderListBasic(
@ -40,7 +46,14 @@ public class IndexingUriFinderListBasic implements IndexingUriFinderList {
public Set<String> findAdditionalUris(Statement stmt) {
Set<String> uris = new HashSet<>();
for (IndexingUriFinder uriFinder : finders) {
uris.addAll(uriFinder.findAdditionalURIsToIndex(stmt));
List<String> additions = uriFinder.findAdditionalURIsToIndex(stmt);
for (String addition : additions) {
if (addition == null) {
log.warn("Finder " + uriFinder + " returned a null URI.");
} else {
uris.add(addition);
}
}
}
return uris;
}

View file

@ -17,6 +17,7 @@ import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineNotRespondingException;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.RebuildCounts;
@ -105,6 +106,9 @@ public class RebuildIndexTask implements Task {
try {
searchEngine.deleteByQuery(query);
searchEngine.commit();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete outdated documents from the search index: "
+ "the search engine is not responding.");
} catch (SearchEngineException e) {
log.warn("Failed to delete outdated documents "
+ "from the search index", e);
@ -114,8 +118,12 @@ public class RebuildIndexTask implements Task {
private int getDocumentCount() {
try {
return searchEngine.documentCount();
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to get document count from the search index: "
+ "the search engine is not responding.");
return 0;
} catch (SearchEngineException e) {
log.warn("Failed to get docoument count from the search index.", e);
log.warn("Failed to get document count from the search index.", e);
return 0;
}
}

View file

@ -31,6 +31,7 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.ObjectPropertyStatement;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineNotRespondingException;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchInputDocument;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerUtils;
import edu.cornell.mannlib.vitro.webapp.searchindex.documentBuilding.DocumentModifier;
@ -65,6 +66,9 @@ public class UpdateDocumentWorkUnit implements Runnable {
modifiers.modifyDocument(ind, doc);
addIndexedTime(doc);
searchEngine.add(doc);
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to add '" + ind + "' to the search index: "
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to add '" + ind + "' to the search index.", e);
}

View file

@ -77,7 +77,7 @@ public class UpdateStatementsTask implements Task {
this.uris = Collections.synchronizedSet(new HashSet<String>());
this.status = new Status(changes.size(), 200, listeners);
this.status = new Status(changes.size(), 500, listeners);
}
@Override

View file

@ -21,6 +21,7 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineNotRespondingException;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexer.Event;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus;
import edu.cornell.mannlib.vitro.webapp.modules.searchIndexer.SearchIndexerStatus.UriCounts;
@ -66,11 +67,10 @@ public class UpdateUrisTask implements Task {
this.listeners = listeners;
this.pool = pool;
this.status = new Status(uris.size(), 200, listeners);
this.status = new Status(uris.size(), 500, listeners);
this.searchEngine = ApplicationUtils.instance().getSearchEngine();
}
@Override
@ -84,10 +84,14 @@ public class UpdateUrisTask implements Task {
if (isInterrupted()) {
log.info("Interrupted: " + status.getSearchIndexerStatus());
break;
} else if (uri == null) {
// Nothing to do
} else {
Individual ind = getIndividual(uri);
if (ind == null || isExcluded(ind)) {
if (ind == null) {
deleteDocument(uri);
} else if (isExcluded(ind)) {
excludeDocument(uri);
} else {
updateDocument(ind);
}
@ -128,11 +132,28 @@ public class UpdateUrisTask implements Task {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementDeletes();
log.debug("deleted '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to delete '" + uri + "' from search index: "
+ "the search engine is not responding.");
} catch (Exception e) {
log.warn("Failed to delete '" + uri + "' from search index", e);
}
}
/** An exclusion is just a delete for different reasons. */
private void excludeDocument(String uri) {
try {
searchEngine.deleteById(SearchIndexerUtils.getIdForUri(uri));
status.incrementExclusions();
log.debug("excluded '" + uri + "' from search index.");
} catch (SearchEngineNotRespondingException e) {
log.warn("Failed to exclude '" + uri + "' from search index: "
+ "the search engine is not responding.", e);
} catch (Exception e) {
log.warn("Failed to exclude '" + uri + "' from search index", e);
}
}
private void updateDocument(Individual ind) {
Runnable workUnit = new UpdateDocumentWorkUnit(ind, modifiers);
pool.submit(workUnit, this);
@ -165,6 +186,7 @@ public class UpdateUrisTask implements Task {
private final ListenerList listeners;
private int updated = 0;
private int deleted = 0;
private int excluded = 0;
private Date since = new Date();
public Status(int total, int progressInterval, ListenerList listeners) {
@ -184,6 +206,11 @@ public class UpdateUrisTask implements Task {
since = new Date();
}
public synchronized void incrementExclusions() {
excluded++;
since = new Date();
}
private void maybeFireProgressEvent() {
if (updated > 0 && updated % progressInterval == 0) {
listeners.fireEvent(new Event(PROGRESS,
@ -192,9 +219,9 @@ public class UpdateUrisTask implements Task {
}
public synchronized SearchIndexerStatus getSearchIndexerStatus() {
int remaining = total - updated - deleted;
int remaining = total - updated - deleted - excluded;
return new SearchIndexerStatus(PROCESSING_URIS, since,
new UriCounts(deleted, updated, remaining, total));
new UriCounts(excluded, deleted, updated, remaining, total));
}
}

View file

@ -13,10 +13,7 @@ import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.application.ApplicationUtils;
import edu.cornell.mannlib.vitro.webapp.controller.VitroRequest;
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.IndividualListQueryResults;
import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngine;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchEngineException;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchQuery.Order;
import edu.cornell.mannlib.vitro.webapp.modules.searchEngine.SearchResponse;

View file

@ -9,8 +9,12 @@ function updateSearchIndexerStatus() {
url: searchIndexerStatusUrl,
dataType: "html",
complete: function(xhr, status) {
if (xhr.status == 200) {
updatePanelContents(xhr.responseText);
setTimeout(updateSearchIndexerStatus,5000);
} else {
displayErrorMessage(xhr.status + " " + xhr.statusText);
}
}
});
}
@ -19,4 +23,9 @@ function updatePanelContents(contents) {
document.getElementById("searchIndexerStatus").innerHTML = contents;
}
function displayErrorMessage(message) {
document.getElementById("searchIndexerError").innerHTML = "<h3>" + message + "</h3>";
}
$(document).ready(updateSearchIndexerStatus());

View file

@ -20,12 +20,19 @@ table.threadInfo th {
<h2>${i18n().background_threads}</h2>
<section id="show-threads" role="region">
<table class="threadInfo" summary="Status of background threads.">
<tr>
<th>${i18n().name}</th>
<th>${i18n().work_level}</th>
<th>${i18n().since}</th>
<th>${i18n().flags}</th>
</tr>
<#list threads as threadInfo>
<table class="threadInfo ${threadInfo.workLevel}" summary="Thread ${threadInfo.name}">
<tr><th>${i18n().name}</th><td>${threadInfo.name}</td></tr>
<tr><th>${i18n().work_level}</th><td>${threadInfo.workLevel}</td></tr>
<tr><th>${i18n().since}</th><td>${threadInfo.since}</td></tr>
<tr><th>${i18n().flags}</th><td>${threadInfo.flags}</td></tr>
</table>
<tr>
<td>${threadInfo.name}</td>
<td>${threadInfo.workLevel}</td>
<td>${threadInfo.since}</td>
<td>${threadInfo.flags}</td>
</#list>
</table>
</section>

View file

@ -7,6 +7,8 @@
<h2>${i18n().search_index_status}</h2>
<div id="searchIndexerError" />
<div id="searchIndexerStatus">
Search Indexer Status
</div>

View file

@ -40,9 +40,13 @@
<h3>History</h3>
<table class="history">
<tr> <th>Event</th> <th>Status</th> <th>Since</th> <th>Counts</th> </tr>
<#if history?has_content >
<#list history as ie>
<@showIndexerEvent ie />
</#list>
<#else>
<tr><td colspan="4">Search indexer history is not available.</td></tr>
</#if>
</table>
</section>
@ -64,7 +68,7 @@
<#macro showIndexerCounts countsType, counts>
<#if countsType == "URI_COUNTS">
Updated: ${counts.updated}, deleted: ${counts.deleted}, remaining: ${counts.remaining}, total: ${counts.total}
Updated: ${counts.updated}, excluded: ${counts.excluded}, deleted: ${counts.deleted}, remaining: ${counts.remaining}, total: ${counts.total}
<#elseif countsType == "STATEMENT_COUNTS">
Processed: ${counts.processed}, remaining: ${counts.remaining}, total: ${counts.total}
<#elseif countsType == "REBUILD_COUNTS">