Working on memory leak in solr indexing. Changed ContextNodeFields to single threaded.

This commit is contained in:
briancaruso 2011-06-28 23:47:44 +00:00
parent 7cc5501f70
commit 6f65d712cc
36 changed files with 1863 additions and 662 deletions

View file

@ -88,21 +88,12 @@ public interface IndividualDao extends ObjectSourceIface {
/**
* Returns an Iterator over all Individuals in the model that are user-viewable.
*/
public abstract Iterator<Individual> getAllOfThisTypeIterator();
/**
* Returns an Iterator over all Individuals in the model that are user-viewable and of the specified VClass URI.
* @param vClassURI
* @return
*/
public abstract Iterator<Individual> getAllOfThisVClassIterator(String vClassURI);
public abstract Iterator<String> getAllOfThisTypeIterator();
/**
* Returns an Iterator over all Individuals in the model that are user-viewable and have been updated since the specified time.
*/
public abstract Iterator<Individual> getUpdatedSinceIterator(long updatedSince);
int getCountOfIndividualsInVClass(String vclassURI );
public abstract Iterator<String> getUpdatedSinceIterator(long updatedSince);
public boolean isIndividualOfClass(String vclassURI, String indURI);

View file

@ -125,22 +125,7 @@ class IndividualDaoFiltering extends BaseFiltering implements IndividualDao{
/* All of the methods that return iterator don't wrap the Individual in
* a IndividualFiltering so they might cause problems */
public Iterator getAllOfThisTypeIterator() {
return filterAndWrap(innerIndividualDao.getAllOfThisTypeIterator(),
filters);
}
public Iterator getAllOfThisVClassIterator(String classURI) {
return filterAndWrap(
innerIndividualDao.getAllOfThisVClassIterator(classURI),
filters);
}
public Iterator getUpdatedSinceIterator(long updatedSince) {
return filterAndWrap(
innerIndividualDao.getUpdatedSinceIterator(updatedSince),
filters);
}
private class ToFilteredIndividual extends UnaryFunctor<Individual, Individual>{
private final VitroFilters filters;
@ -151,19 +136,21 @@ class IndividualDaoFiltering extends BaseFiltering implements IndividualDao{
public Individual fn(Individual arg) {
return new IndividualFiltering(arg,filters);
}
}
public int getCountOfIndividualsInVClass(String vclassURI) {
Iterator<Individual> it = innerIndividualDao.getAllOfThisVClassIterator(vclassURI);
if( it == null ) return 0;
Iterator<Individual> itFiltered = Filter.filter(it,filters.getIndividualFilter());
return (int)(Summarize.count(itFiltered,filters.getIndividualFilter()));
}
}
/* ******************* unfiltered methods ****************** */
public Iterator<String> getAllOfThisTypeIterator() {
return innerIndividualDao.getAllOfThisTypeIterator();
}
public Iterator<String> getUpdatedSinceIterator(long updatedSince) {
return innerIndividualDao.getUpdatedSinceIterator(updatedSince);
}
public Collection<DataPropertyStatement> getExternalIds(String individualURI) {
return innerIndividualDao.getExternalIds(individualURI);
}

View file

@ -17,6 +17,7 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
@ -839,138 +840,14 @@ public class IndividualDaoJena extends JenaBaseDao implements IndividualDao {
return null;
}
public Iterator<Individual> getAllOfThisTypeIterator() {
final List<com.hp.hpl.jena.ontology.Individual> list =
new LinkedList<com.hp.hpl.jena.ontology.Individual>();
getOntModel().enterCriticalSection(Lock.READ);
try {
ClosableIterator<com.hp.hpl.jena.ontology.Individual> allIndIt = getOntModel().listIndividuals();
try {
while (allIndIt.hasNext()) {
com.hp.hpl.jena.ontology.Individual ind = allIndIt.next();
boolean userVisible = true;
//Check for non-user visible types, maybe this should be an annotation?
ClosableIterator<Resource> typeIt = ind.listRDFTypes(false);
try {
while (typeIt.hasNext()) {
Resource typeRes = typeIt.next();
String type = typeRes.getURI();
// brute forcing this until we implement a better strategy
if (VitroVocabulary.PORTAL.equals(type) ||
VitroVocabulary.LINK.equals(type) ||
VitroVocabulary.KEYWORD.equals(type) ||
VitroVocabulary.KEYWORD_INDIVIDUALRELATION.equals(type) ||
VitroVocabulary.CLASSGROUP.equals(type) ||
VitroVocabulary.PROPERTYGROUP.equals(type) ||
VitroVocabulary.APPLICATION.equals(type)) {
userVisible = false;
break;
}
if( OWL.ObjectProperty.getURI().equals(type) ||
OWL.DatatypeProperty.getURI().equals(type) ||
OWL.AnnotationProperty.getURI().equals(type) ||
RDF.type.getURI().equals(type) ){
userVisible = false;
break;
}
}
} finally {
typeIt.close();
}
if (userVisible) {
list.add(ind);
}
}
} finally {
allIndIt.close();
}
} finally {
getOntModel().leaveCriticalSection();
}
if (list.size() >0){
return new Iterator<Individual>(){
Iterator<com.hp.hpl.jena.ontology.Individual> innerIt = list.iterator();
public boolean hasNext() {
return innerIt.hasNext();
}
public Individual next() {
return new IndividualJena(innerIt.next(), (WebappDaoFactoryJena) getWebappDaoFactory());
}
public void remove() {
//not used
}
};
}
else
return null;
public Iterator<String> getAllOfThisTypeIterator() {
//this is implemented in IndivdiualSDB
throw new NotImplementedException();
}
public Iterator<Individual> getAllOfThisVClassIterator(String vClassURI) {
getOntModel().enterCriticalSection(Lock.READ);
try {
List<Individual> ents = new LinkedList<Individual>();
OntClass cls = getOntModel().getOntClass(vClassURI);
Iterator<? extends OntResource> indIt = cls.listInstances();
while (indIt.hasNext()) {
OntResource ind = indIt.next();
ents.add(new IndividualJena(ind, (WebappDaoFactoryJena) getWebappDaoFactory()));
}
return ents.iterator();
} finally {
getOntModel().leaveCriticalSection();
}
}
public Iterator<Individual> getUpdatedSinceIterator(long updatedSince){
List<Individual> ents = new ArrayList<Individual>();
Date since = new DateTime(updatedSince).toDate();
String sinceStr = xsdDateTimeFormat.format(since);
getOntModel().enterCriticalSection(Lock.READ);
try {
Property modTimeProp = MODTIME;
if (modTimeProp == null)
modTimeProp = getOntModel().getProperty(VitroVocabulary.MODTIME);
if (modTimeProp == null)
return null; // throw an exception?
String queryStr = "PREFIX vitro: <"+ VitroVocabulary.vitroURI+"> " +
"PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>" +
"SELECT ?ent " +
"WHERE { " +
" ?ent vitro:modTime ?modTime ." +
" FILTER (xsd:dateTime(?modTime) >= \""+sinceStr+"\"^^xsd:dateTime) " +
"}";
Query query = QueryFactory.create(queryStr);
QueryExecution qe = QueryExecutionFactory.create(query,getOntModel());
ResultSet results = qe.execSelect();
while (results.hasNext()) {
QuerySolution qs = (QuerySolution) results.next();
Resource res = (Resource) qs.get("?ent");
com.hp.hpl.jena.ontology.Individual ent = getOntModel().getIndividual(res.getURI());
if (ent != null) {
boolean userVisible = false;
ClosableIterator<Resource> typeIt = ent.listRDFTypes(true);
try {
while (typeIt.hasNext()) {
Resource typeRes = typeIt.next();
if (typeRes.getNameSpace() == null || (!NONUSER_NAMESPACES.contains(typeRes.getNameSpace()))) {
userVisible = true;
break;
}
}
} finally {
typeIt.close();
}
if (userVisible) {
ents.add(new IndividualJena(ent, (WebappDaoFactoryJena) getWebappDaoFactory()));
}
}
}
} finally {
getOntModel().leaveCriticalSection();
}
return ents.iterator();
public Iterator<String> getUpdatedSinceIterator(long updatedSince){
//this is implemented in IndivdiualSDB
throw new NotImplementedException();
}
public boolean isIndividualOfClass(String vclassURI, String indURI) {

View file

@ -423,7 +423,7 @@ public class IndividualDaoSDB extends IndividualDaoJena {
}
@Override
public Iterator<Individual> getAllOfThisTypeIterator() {
public Iterator<String> getAllOfThisTypeIterator() {
final List<String> list =
new LinkedList<String>();
@ -455,7 +455,7 @@ public class IndividualDaoSDB extends IndividualDaoJena {
w.close();
}
return getIndividualIterator(list);
return list.iterator();
}
@ -485,30 +485,10 @@ public class IndividualDaoSDB extends IndividualDaoJena {
}
else
return null;
}
@Override
public Iterator<Individual> getAllOfThisVClassIterator(String vClassURI) {
getOntModel().enterCriticalSection(Lock.READ);
try {
List<String> individualURIs = new ArrayList<String>();
OntClass cls = getOntModel().getOntClass(vClassURI);
Iterator indIt = cls.listInstances();
while (indIt.hasNext()) {
com.hp.hpl.jena.ontology.Individual ind =
(com.hp.hpl.jena.ontology.Individual) indIt.next();
if (ind.getURI() != null) {
individualURIs.add(ind.getURI());
}
}
return getIndividualIterator(individualURIs);
} finally {
getOntModel().leaveCriticalSection();
}
}
}
@Override
public Iterator<Individual> getUpdatedSinceIterator(long updatedSince){
public Iterator<String> getUpdatedSinceIterator(long updatedSince){
List<String> individualURIs = new ArrayList<String>();
Date since = new DateTime(updatedSince).toDate();
String sinceStr = xsdDateTimeFormat.format(since);
@ -540,7 +520,7 @@ public class IndividualDaoSDB extends IndividualDaoJena {
} finally {
getOntModel().leaveCriticalSection();
}
return getIndividualIterator(individualURIs);
return individualURIs.iterator();
}
}

View file

@ -0,0 +1,9 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search;
public class IndexConstants {
public static String INDEX_REBUILD_REQUESTED_AT_STARTUP = "INDEX_REBUILD_REQUESTED_AT_STARTUP";
public static String SEARCH_DATAPROPERTY_BLACKLIST ="SEARCH_DATAPROPERTY_BLACKLIST";
public static String SEARCH_OBJECTPROPERTY_BLACKLIST = "SEARCH_OBJECTPROPERTY_BLACKLIST";
}

View file

@ -3,13 +3,11 @@
package edu.cornell.mannlib.vitro.webapp.search.beans;
import java.util.Iterator;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
public interface ObjectSourceIface {
Iterator<Individual> getAllOfThisTypeIterator();
Iterator<String> getAllOfThisTypeIterator();
Iterator<Individual> getUpdatedSinceIterator(long msSinceEpoc);
Iterator<String> getUpdatedSinceIterator(long msSinceEpoc);
}

View file

@ -39,7 +39,6 @@ import edu.cornell.mannlib.vitro.webapp.controller.ajax.VitroAjaxController;
import edu.cornell.mannlib.vitro.webapp.search.SearchException;
import edu.cornell.mannlib.vitro.webapp.search.lucene.Entity2LuceneDoc.VitroLuceneTermNames;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneIndexFactory;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneSetup;
/**
* AutocompleteController generates autocomplete content
@ -145,11 +144,11 @@ public class AutocompleteController extends VitroAjaxController {
}
private Analyzer getAnalyzer(ServletContext servletContext) throws SearchException {
Object obj = servletContext.getAttribute(LuceneSetup.ANALYZER);
if( obj == null || !(obj instanceof Analyzer) )
// //Object obj = servletContext.getAttribute(LuceneSetup.ANALYZER);
// if( obj == null || !(obj instanceof Analyzer) )
throw new SearchException("Could not get analyzer");
else
return (Analyzer)obj;
// else
// return (Analyzer)obj;
}
private Query getQuery(VitroRequest vreq, Analyzer analyzer,

View file

@ -67,7 +67,6 @@ import edu.cornell.mannlib.vitro.webapp.search.lucene.CustomSimilarity;
import edu.cornell.mannlib.vitro.webapp.search.lucene.Entity2LuceneDoc;
import edu.cornell.mannlib.vitro.webapp.search.lucene.Entity2LuceneDoc.VitroLuceneTermNames;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneIndexFactory;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneSetup;
import edu.cornell.mannlib.vitro.webapp.web.templatemodels.LinkTemplateModel;
import edu.cornell.mannlib.vitro.webapp.web.templatemodels.individual.ListedIndividualTemplateModel;
import freemarker.template.Configuration;
@ -585,11 +584,11 @@ public class PagedSearchController extends FreemarkerHttpServlet implements Sear
}
private Analyzer getAnalyzer(ServletContext servletContext) throws SearchException {
Object obj = servletContext.getAttribute(LuceneSetup.ANALYZER);
if( obj == null || !(obj instanceof Analyzer) )
// Object obj = servletContext.getAttribute(LuceneSetup.ANALYZER);
// if( obj == null || !(obj instanceof Analyzer) )
throw new SearchException("Could not get analyzer");
else
return (Analyzer)obj;
// else
// return (Analyzer)obj;
}
private Query getQuery(VitroRequest request,
@ -777,16 +776,18 @@ public class PagedSearchController extends FreemarkerHttpServlet implements Sear
@SuppressWarnings("unchecked")
private HashSet<String> getDataPropertyBlacklist(){
HashSet<String>dpBlacklist = (HashSet<String>)
getServletContext().getAttribute(LuceneSetup.SEARCH_DATAPROPERTY_BLACKLIST);
return dpBlacklist;
// HashSet<String>dpBlacklist = (HashSet<String>)
// getServletContext().getAttribute(LuceneSetup.SEARCH_DATAPROPERTY_BLACKLIST);
// return dpBlacklist;
return null;
}
@SuppressWarnings("unchecked")
private HashSet<String> getObjectPropertyBlacklist(){
HashSet<String>opBlacklist = (HashSet<String>)
getServletContext().getAttribute(LuceneSetup.SEARCH_OBJECTPROPERTY_BLACKLIST);
return opBlacklist;
// HashSet<String>opBlacklist = (HashSet<String>)
// getServletContext().getAttribute(LuceneSetup.SEARCH_OBJECTPROPERTY_BLACKLIST);
// return opBlacklist;
return null;
}

View file

@ -44,12 +44,12 @@ import edu.cornell.mannlib.vitro.webapp.dao.IndividualDao;
import edu.cornell.mannlib.vitro.webapp.dao.VClassDao;
import edu.cornell.mannlib.vitro.webapp.dao.VClassGroupDao;
import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary;
import edu.cornell.mannlib.vitro.webapp.search.IndexConstants;
import edu.cornell.mannlib.vitro.webapp.search.SearchException;
import edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames;
import edu.cornell.mannlib.vitro.webapp.search.beans.VitroHighlighter;
import edu.cornell.mannlib.vitro.webapp.search.beans.VitroQuery;
import edu.cornell.mannlib.vitro.webapp.search.beans.VitroQueryFactory;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneSetup;
import edu.cornell.mannlib.vitro.webapp.search.solr.SolrSetup;
import edu.cornell.mannlib.vitro.webapp.web.templatemodels.LinkTemplateModel;
import edu.cornell.mannlib.vitro.webapp.web.templatemodels.individual.ListedIndividualTemplateModel;
@ -603,14 +603,14 @@ public class SolrPagedSearchController extends FreemarkerHttpServlet {
@SuppressWarnings({ "unchecked", "unused" })
private HashSet<String> getDataPropertyBlacklist(){
HashSet<String>dpBlacklist = (HashSet<String>)
getServletContext().getAttribute(LuceneSetup.SEARCH_DATAPROPERTY_BLACKLIST);
getServletContext().getAttribute(IndexConstants.SEARCH_DATAPROPERTY_BLACKLIST);
return dpBlacklist;
}
@SuppressWarnings({ "unchecked", "unused" })
private HashSet<String> getObjectPropertyBlacklist(){
HashSet<String>opBlacklist = (HashSet<String>)
getServletContext().getAttribute(LuceneSetup.SEARCH_OBJECTPROPERTY_BLACKLIST);
getServletContext().getAttribute(IndexConstants.SEARCH_OBJECTPROPERTY_BLACKLIST);
return opBlacklist;
}

View file

@ -7,24 +7,15 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
import java.util.Queue;
import javax.servlet.ServletContext;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.beans.VClass;
import edu.cornell.mannlib.vitro.webapp.dao.VClassDao;
import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary;
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.search.beans.ObjectSourceIface;
import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters;
/**
@ -41,32 +32,35 @@ import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters;
*
*/
public class IndexBuilder extends Thread {
private List<ObjectSourceIface> sourceList = new LinkedList<ObjectSourceIface>();
private IndexerIface indexer = null;
private ServletContext context = null;
private WebappDaoFactory wdf;
private final IndexerIface indexer;
private final ServletContext context;
/* changedUris should only be accessed from synchronized blocks */
private HashSet<String> changedUris = null;
private List<Individual> updatedInds = null;
private List<Individual> deletedInds = null;
private List<String> updatedInds = null;
private List<String> deletedInds = null;
private boolean reindexRequested = false;
protected boolean stopRequested = false;
protected long reindexInterval = 1000 * 60 /* msec */ ;
protected int numberOfThreads = 10;
public static final boolean UPDATE_DOCS = false;
public static final boolean NEW_DOCS = true;
private static final Log log = LogFactory.getLog(IndexBuilder.class);
public IndexBuilder(ServletContext context,
public IndexBuilder(
ServletContext context,
IndexerIface indexer,
List<ObjectSourceIface> sources){
WebappDaoFactory wdf){
super("IndexBuilder");
this.indexer = indexer;
this.sourceList = sources;
this.context = context;
this.wdf = wdf;
this.context = context;
this.changedUris = new HashSet<String>();
this.start();
@ -74,22 +68,17 @@ public class IndexBuilder extends Thread {
protected IndexBuilder(){
//for testing only
this( null, null, Collections.<ObjectSourceIface>emptyList());
this( null, null, null);
}
public void addObjectSource(ObjectSourceIface osi) {
if (osi != null)
sourceList.add(osi);
public void setWdf(WebappDaoFactory wdf){
this.wdf = wdf;
}
public boolean isIndexing(){
return indexer.isIndexing();
}
public List<ObjectSourceIface> getObjectSourceList() {
return sourceList;
}
public synchronized void doIndexRebuild() {
//set flag for full index rebuild
this.reindexRequested = true;
@ -106,10 +95,16 @@ public class IndexBuilder extends Thread {
this.notifyAll();
}
/**
* Use this method to add URIs that need to be indexed.
*/
public synchronized void addToChangedUris(String uri){
changedUris.add(uri);
}
/**
* Use this method to add URIs that need to be indexed.
*/
public synchronized void addToChangedUris(Collection<String> uris){
changedUris.addAll(uris);
}
@ -124,7 +119,8 @@ public class IndexBuilder extends Thread {
public synchronized void stopIndexingThread() {
stopRequested = true;
this.notifyAll();
this.notifyAll();
this.interrupt();
}
@Override
@ -145,10 +141,14 @@ public class IndexBuilder extends Thread {
} catch (InterruptedException e) {
log.debug("woken up",e);
}catch(Throwable e){
log.error(e,e);
if( log != null )//may be null on shutdown
log.error(e,e);
}
}
log.info("Stopping IndexBuilder thread");
if(log != null )//may be null on shutdown
log.info("Stopping IndexBuilder thread");
}
@ -164,65 +164,48 @@ public class IndexBuilder extends Thread {
*/
private void makeAddAndDeleteLists( Collection<String> uris){
/* clear updateInds and deletedUris. This is the only method that should set these. */
this.updatedInds = new ArrayList<Individual>();
this.deletedInds = new ArrayList<Individual>();
WebappDaoFactory wdf = (WebappDaoFactory)context.getAttribute("webappDaoFactory");
this.updatedInds = new ArrayList<String>();
this.deletedInds = new ArrayList<String>();
for( String uri: uris){
if( uri != null ){
Individual ind = wdf.getIndividualDao().getIndividualByURI(uri);
if( ind != null)
this.updatedInds.add(ind);
this.updatedInds.add(uri);
else{
log.debug("found delete in changed uris");
this.deletedInds.add(ind);
this.deletedInds.add(uri);
}
}
}
this.updatedInds = addDepResourceClasses(updatedInds);
}
}
/**
* This rebuilds the whole index.
*/
protected void indexRebuild() {
log.info("Rebuild of search index is starting.");
List<Iterator<Individual>> listOfIterators = new LinkedList<Iterator<Individual>>();
for (ObjectSourceIface objectSource: sourceList) {
if (objectSource != null) {
listOfIterators.add(((objectSource)
.getAllOfThisTypeIterator()));
}
}
// clear out changed uris since we are doing a full index rebuild
getAndEmptyChangedUris();
if (listOfIterators.size() == 0)
log.warn("Warning: no ObjectSources found.");
doBuild(listOfIterators, Collections.<Individual>emptyList() );
log.debug("Getting all URIs in the model");
Iterator<String> uris = wdf.getIndividualDao().getAllOfThisTypeIterator();
doBuild(uris, Collections.<String>emptyList() );
if( log != null ) //log might be null if system is shutting down.
log.info("Rebuild of search index is complete.");
}
protected void updatedIndex() {
log.debug("Starting updateIndex()");
long since = indexer.getModified() - 60000;
List<Iterator<Individual>> listOfIterators =
new LinkedList<Iterator<Individual>>();
for (ObjectSourceIface objectSource: sourceList) {
if (objectSource != null) {
listOfIterators.add(((objectSource)
.getUpdatedSinceIterator(since)));
}
}
//long since = indexer.getModified() - 60000;
//List updatedUris = wdf.getIndividualDao().getUpdatedSinceIterator(since);
makeAddAndDeleteLists( getAndEmptyChangedUris());
listOfIterators.add( (new IndexBuilder.BuilderObjectSource(updatedInds)).getUpdatedSinceIterator(0) );
makeAddAndDeleteLists( getAndEmptyChangedUris() );
doBuild( listOfIterators, deletedInds );
doBuild( updatedInds.iterator(), deletedInds );
log.debug("Ending updateIndex()");
}
@ -240,7 +223,7 @@ public class IndexBuilder extends Thread {
* to false, and a check is made before adding, it will work fine; but
* checking if an object is on the index is slow.
*/
private void doBuild(List<Iterator<Individual>> sourceIterators, Collection<Individual> deletes ){
private void doBuild(Iterator<String> updates, Collection<String> deletes ){
boolean aborted = false;
boolean newDocs = reindexRequested;
boolean forceNewIndex = reindexRequested;
@ -253,19 +236,12 @@ public class IndexBuilder extends Thread {
reindexRequested = false;
if( ! forceNewIndex ){
for(Individual deleteMe : deletes ){
for(String deleteMe : deletes ){
indexer.removeFromIndex(deleteMe);
}
}
//get an iterator for all of the sources of indexable objects
for (Iterator<Individual> sourceIterator: sourceIterators) {
if (sourceIterator == null) {
log.warn("skipping null iterator");
} else {
indexForSource(sourceIterator, newDocs);
}
}
indexUriList(updates, newDocs);
} catch (AbortIndexing abort){
if( log != null)
log.debug("aborting the indexing because thread stop was requested");
@ -286,140 +262,100 @@ public class IndexBuilder extends Thread {
* Use the back end indexer to index each object that the Iterator returns.
* @throws AbortIndexing
*/
private void indexForSource(Iterator<Individual> individuals , boolean newDocs) throws AbortIndexing{
int count = 0;
int numOfThreads = 10;
List<IndexWorkerThread> workers = new ArrayList<IndexWorkerThread>();
boolean distributing = true;
private void indexUriList(Iterator<String> updateUris , boolean newDocs) throws AbortIndexing{
//make a copy of numberOfThreads so the local copy is safe during this method.
int numberOfThreads = this.numberOfThreads;
IndexWorkerThread.setStartTime(System.currentTimeMillis());
for(int i = 0; i< numOfThreads ;i++){
workers.add(new IndexWorkerThread(indexer,i,distributing)); // made a pool of workers
}
//make lists of work URIs for workers
List<List<String>> workLists = makeWorkerUriLists(updateUris, numberOfThreads);
//setup workers with work
List<IndexWorkerThread> workers = new ArrayList<IndexWorkerThread>();
for(int i = 0; i< numberOfThreads ;i++){
Iterator<Individual> workToDo = new UriToIndividualIterator(workLists.get(i), wdf);
workers.add( new IndexWorkerThread(indexer, i, workToDo) );
}
log.debug("Starting the building and indexing of documents in worker threads");
// starting worker threads
for(int i =0; i < numberOfThreads; i++){
workers.get(i).start();
}
log.info("Indexing worker pool ready for indexing.");
// starting worker threads
for(int i =0; i < numOfThreads; i++){
workers.get(i).start();
}
while(individuals.hasNext()){
if( stopRequested )
throw new AbortIndexing();
Individual ind = null;
try{
ind = individuals.next();
workers.get(count%numOfThreads).addToQueue(ind); // adding individual to worker queue.
}catch(Throwable ex){
if( stopRequested || log == null){//log might be null if system is shutting down.
throw new AbortIndexing();
}
String uri = ind!=null?ind.getURI():"null";
log.warn("Error indexing individual " + uri + " " + ex.getMessage());
}
count++;
}
for(int i =0 ; i < numOfThreads; i ++){
workers.get(i).setDistributing(false);
}
for(int i =0; i < numOfThreads; i++){
//waiting for all the work to finish
for(int i =0; i < numberOfThreads; i++){
try{
workers.get(i).join();
}catch(InterruptedException e){
log.error(e,e);
//this thread will get interrupted if the system is trying to shut down.
if( log != null )
log.debug(e,e);
for( IndexWorkerThread thread: workers){
thread.requestStop();
}
return;
}
}
IndexWorkerThread.resetCount();
IndexWorkerThread.resetCount();
}
/**
* For a list of individuals, this builds a list of dependent resources and returns it.
*/
private List<Individual> addDepResourceClasses(List<Individual> inds) {
WebappDaoFactory wdf = (WebappDaoFactory)context.getAttribute("webappDaoFactory");
VClassDao vClassDao = wdf.getVClassDao();
Iterator<Individual> it = inds.iterator();
VClass depResVClass = new VClass(VitroVocabulary.DEPENDENT_RESORUCE);
while(it.hasNext()){
Individual ind = it.next();
List<VClass> classes = ind.getVClasses();
boolean isDepResource = false;
for( VClass clazz : classes){
if( !isDepResource && VitroVocabulary.DEPENDENT_RESORUCE.equals( clazz.getURI() ) ){
isDepResource = true;
break;
}
}
if( ! isDepResource ){
for( VClass clazz : classes){
List<String> superClassUris = vClassDao.getAllSuperClassURIs(clazz.getURI());
for( String uri : superClassUris){
if( VitroVocabulary.DEPENDENT_RESORUCE.equals( uri ) ){
isDepResource = true;
break;
}
}
if( isDepResource )
break;
}
}
if( isDepResource){
classes.add(depResVClass);
ind.setVClasses(classes, true);
}
}
return inds;
}
/* maybe ObjectSourceIface should be replaced with just an iterator. */
private class BuilderObjectSource implements ObjectSourceIface {
private final List<Individual> individuals;
public BuilderObjectSource( List<Individual> individuals){
this.individuals=individuals;
}
protected class UriToIndividualIterator implements Iterator<Individual>{
private final Iterator<String> uris;
private final WebappDaoFactory wdf;
public UriToIndividualIterator( Iterator<String> uris, WebappDaoFactory wdf){
this.uris= uris;
this.wdf = wdf;
}
public UriToIndividualIterator( List<String> uris, WebappDaoFactory wdf){
this.uris= uris.iterator();
this.wdf = wdf;
}
@Override
public Iterator<Individual> getAllOfThisTypeIterator() {
return new Iterator<Individual>(){
final Iterator<Individual> it = individuals.iterator();
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Individual next() {
return it.next();
}
@Override
public void remove() { /* not implemented */}
};
}
public boolean hasNext() {
return uris.hasNext();
}
/** may return null */
@Override
public Iterator<Individual> getUpdatedSinceIterator(long msSinceEpoc) {
return getAllOfThisTypeIterator();
public Individual next() {
String uri = uris.next();
return wdf.getIndividualDao().getIndividualByURI(uri);
}
@Override
public void remove() {
throw new IllegalAccessError("");
}
}
private static List<List<String>> makeWorkerUriLists(Iterator<String> uris,int workers){
List<List<String>> work = new ArrayList<List<String>>(workers);
for(int i =0; i< workers; i++){
work.add( new ArrayList<String>() );
}
int counter = 0;
while(uris.hasNext()){
work.get( counter % workers ).add( uris.next() );
counter ++;
}
return work;
}
private class AbortIndexing extends Exception {
// Just a vanilla exception
}
}

View file

@ -1,89 +1,72 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.search.indexing;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.search.solr.IndividualToSolrDocument;
class IndexWorkerThread extends Thread{
protected final int threadNum;
protected IndividualToSolrDocument individualToSolrDoc;
private IndexerIface indexer = null;
private Log log = LogFactory.getLog(IndexWorkerThread.class);
private static long count=0;
private Queue<Individual> indQueue = new LinkedList<Individual>();
private int threadNum;
private static long starttime = 0;
private boolean distributing;
protected final IndexerIface indexer;
protected final Iterator<Individual> individualsToIndex;
protected boolean stopRequested = false;
public IndexWorkerThread(IndexerIface indexer, int threadNum,boolean distributing){
private Log log = LogFactory.getLog(IndexWorkerThread.class);
private static long count=0;
private static long starttime = 0;
public IndexWorkerThread(IndexerIface indexer, int threadNum , Iterator<Individual> individualsToIndex){
super("IndexWorkerThread"+threadNum);
this.indexer = indexer;
this.threadNum = threadNum;
this.distributing = distributing;
/*synchronized(this){
if(starttime == 0)
starttime = System.currentTimeMillis();
}*/
this.individualsToIndex = individualsToIndex;
}
public void requestStop(){
stopRequested = true;
}
public void addToQueue(Individual ind){
synchronized(indQueue){
indQueue.offer(ind);
indQueue.notify();
}
}
public boolean isQueueEmpty(){
return indQueue.isEmpty();
}
public void setDistributing(boolean distributing){
this.distributing = distributing;
}
public void run(){
while(this.distributing){
synchronized(indQueue){
try{
while(indQueue.isEmpty() && this.distributing){
try{
log.debug("Worker number " + threadNum + " waiting on some work to be alloted.");
indQueue.wait(1000);
}catch(InterruptedException ie){
log.error(ie,ie);
}
}
Thread.sleep(50); //wait a bit to let a bit more work to come into the queue
log.debug("work found for Woker number " + threadNum);
addDocsToIndex();
} catch (InterruptedException e) {
log.debug("Worker number " + threadNum + " woken up",e);
}
catch(Throwable e){
log.error(e,e);
}
}
}
public void run(){
while( ! stopRequested ){
//do the actual indexing work
log.debug("work found for Woker number " + threadNum);
addDocsToIndex();
// done so shut this thread down.
stopRequested = true;
}
log.info("Worker number " + threadNum + " exiting.");
}
protected void addDocsToIndex() throws IndexingException{
protected void addDocsToIndex() {
while(!indQueue.isEmpty()){
indexer.index(indQueue.poll());
while( individualsToIndex.hasNext() ){
//need to stop right away if requested to
if( stopRequested ) return;
//build the document and add it to the index
Individual ind = null;
try {
ind = individualsToIndex.next();
indexer.index( ind );
} catch (IndexingException e) {
if( ind != null )
log.error("Could not index individual " + ind.getURI() , e );
else
log.warn("Could not index, individual was null");
}
synchronized(this){
count++;
if( log.isInfoEnabled() ){

View file

@ -46,7 +46,7 @@ public interface IndexerIface {
* @param obj
* @throws IndexingException
*/
public void removeFromIndex(Individual ind) throws IndexingException;
public void removeFromIndex(String uri) throws IndexingException;
public void prepareForRebuild() throws IndexingException;

View file

@ -479,5 +479,11 @@ public class LuceneIndexer implements IndexerIface {
public void index(Individual ind) throws IndexingException {
// TODO Auto-generated method stub
}
}
@Override
public void removeFromIndex(String uri) throws IndexingException {
// TODO Auto-generated method stub
}
}

View file

@ -144,7 +144,8 @@ public class LuceneSetup implements javax.servlet.ServletContextListener {
List<ObjectSourceIface> sources = new ArrayList<ObjectSourceIface>();
sources.add(wadf.getIndividualDao());
IndexBuilder builder = new IndexBuilder(context, indexer, sources);
//IndexBuilder builder = new IndexBuilder(context, indexer, sources);
IndexBuilder builder = new IndexBuilder(context, indexer, wadf);
// here we add the IndexBuilder with the LuceneIndexer
// to the servlet context so we can access it later in the webapp.

View file

@ -109,7 +109,8 @@ public class LuceneSetupCJK implements javax.servlet.ServletContextListener {
List sources = new ArrayList();
sources.add(wadf.getIndividualDao());
IndexBuilder builder = new IndexBuilder(context,indexer,sources);
//IndexBuilder builder = new IndexBuilder(context,indexer,sources);
IndexBuilder builder = new IndexBuilder(context, indexer, wadf);
// here we add the IndexBuilder with the LuceneIndexer
// to the servlet context so we can access it later in the webapp.

View file

@ -3,7 +3,6 @@
package edu.cornell.mannlib.vitro.webapp.search.solr;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
@ -26,14 +25,10 @@ import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.QuerySolutionMap;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.Syntax;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.shared.Lock;
import com.hp.hpl.jena.ontology.OntModel;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames;

View file

@ -5,13 +5,16 @@ package edu.cornell.mannlib.vitro.webapp.search.solr;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import com.hp.hpl.jena.query.Dataset;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.query.QueryExecution;
import com.hp.hpl.jena.query.QueryExecutionFactory;
@ -20,6 +23,7 @@ import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.QuerySolutionMap;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.Syntax;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
@ -29,49 +33,91 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.VitroSearchTermNames;
public class ContextNodeFields implements DocumentModifier{
private static final String prefix = "prefix owl: <http://www.w3.org/2002/07/owl#> "
+ " prefix vitroDisplay: <http://vitro.mannlib.cornell.edu/ontologies/display/1.1#> "
+ " prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> "
+ " prefix core: <http://vivoweb.org/ontology/core#> "
+ " prefix foaf: <http://xmlns.com/foaf/0.1/> "
+ " prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> "
+ " prefix localNav: <http://vitro.mannlib.cornell.edu/ns/localnav#> "
+ " prefix bibo: <http://purl.org/ontology/bibo/> ";
private Model model;
private static ExecutorService threadPool = null;
private static final int THREAD_POOL_SIZE = 10;
private static final List<String> singleValuedQueriesForAgent = new ArrayList<String>();
private static final List<String> singleValuedQueriesForInformationResource = new ArrayList<String>();
private static final List<String> multiValuedQueriesForAgent = new ArrayList<String>();
private static final String multiValuedQueryForInformationResource;
// private static StringBuffer objectProperties = new StringBuffer();
private static final List<String> multiValuedQueriesForAgent = new ArrayList<String>();
private static final String multiValuedQueryForInformationResource;
private Log log = LogFactory.getLog(ContextNodeFields.class);
private Dataset dataset;
private Log log = LogFactory.getLog(ContextNodeFields.class);
public ContextNodeFields(Model model){
// synchronized( ContextNodeFields.class){
// if( threadPool == null ){
// threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, new ContextNodeFieldsThreadFactory());
// }
// }
this.model = model;
}
@Override
public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) {
log.debug("retrieving context node values..");
public ContextNodeFields(Dataset dataset){
this.dataset = dataset;
StringBuffer objectProperties = singleThreadExecute( individual, multiValuedQueriesForAgent);
//change fields of solr document
SolrInputField targetField = doc.getField(VitroSearchTermNames.targetInfo);
targetField.addValue(" " + runQuery(individual, multiValuedQueryForInformationResource), targetField.getBoost());
SolrInputField field = doc.getField(VitroSearchTermNames.ALLTEXT);
field.addValue(objectProperties, field.getBoost());
log.debug("context node values are retrieved");
}
private StringBuffer singleThreadExecute(Individual individual, List<String> queries ){
StringBuffer propertyValues = new StringBuffer(" ");
for(String query : queries ){
propertyValues.append(runQuery(individual, query));
}
return propertyValues;
}
/* TODO: consider a constructor like this:
* public ContextNodeFields(OntModel fullModel, List<String> queries )
*/
/*
*TODO:
* consider reducing the code in this class using a method like the following:
*/
public StringBuffer runQuery( Individual individual, String query ){
StringBuffer propertyValues = new StringBuffer();
/** experimental, may not work */
private StringBuffer multiThreadExecute(Individual individual, List<String> queries ){
int queryCount = queries.size();
List<QueryRunner> tasks = new ArrayList<QueryRunner>(queryCount);
List<Future<QueryRunner>> completedTasks = new ArrayList<Future<QueryRunner>>(queryCount);
//Make a task for each query and start it.
for(String query : queries ){
QueryRunner queryTask = new QueryRunner(individual, query);
tasks.add(queryTask);
completedTasks.add( threadPool.submit( queryTask , queryTask));
}
//Wait for each thread to finish and collect results
StringBuffer objectProperties = new StringBuffer(" ");
for(int i = 0 ; i < queryCount; i++){
try {
completedTasks.get(i).get();
objectProperties.append( tasks.get(i).getPropertyValues() ) ;
} catch (InterruptedException e) {
log.error("Thread interrupted");
} catch (ExecutionException e) {
log.error("problem during execution",e);
e.printStackTrace();
}
}
return objectProperties;
}
public StringBuffer runQuery( Individual individual, String query ){
StringBuffer propertyValues = new StringBuffer();
QuerySolutionMap initialBinding = new QuerySolutionMap();
Resource uriResource = ResourceFactory.createResource(individual.getURI());
initialBinding.add("uri", uriResource);
Query sparqlQuery = QueryFactory.create( query, Syntax.syntaxARQ);
dataset.getLock().enterCriticalSection(Lock.READ);
model.getLock().enterCriticalSection(Lock.READ);
try{
QueryExecution qExec = QueryExecutionFactory.create(sparqlQuery, dataset, initialBinding);
QueryExecution qExec = QueryExecutionFactory.create(sparqlQuery, model, initialBinding);
try{
ResultSet results = qExec.execSelect();
while(results.hasNext()){
@ -93,59 +139,21 @@ public class ContextNodeFields implements DocumentModifier{
qExec.close();
}
}finally{
dataset.getLock().leaveCriticalSection();
model.getLock().leaveCriticalSection();
}
return propertyValues;
}
}
@Override
public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) {
log.debug("retrieving context node values..");
SolrInputField field = doc.getField(VitroSearchTermNames.ALLTEXT);
SolrInputField targetField = doc.getField(VitroSearchTermNames.targetInfo);
StringBuffer objectProperties = new StringBuffer();
objectProperties.append(" ");
int threadCount = multiValuedQueriesForAgent.size();
QueryRunner[] threads = new QueryRunner[threadCount];
//Make a thread for each query and start it.
for(int i= 0; i < threadCount; i++){
QueryRunner t = new QueryRunner(individual, multiValuedQueriesForAgent.get(i));
t.start();
threads[i] = t;
}
//Wait for each thread to finish and collect results
for(int i = 0 ; i < threadCount ; i++){
try {
threads[i].join();
objectProperties.append( threads[i].getPropertyValues() ) ;
threads[i] = null;
} catch (InterruptedException e) {
log.error("Thread " + threads[i].getName() + " interrupted!");
}
}
targetField.addValue(" " + runQuery(individual, multiValuedQueryForInformationResource), targetField.getBoost());
field.addValue(objectProperties, field.getBoost());
log.debug("context node values are retrieved");
}
private static final String prefix = "prefix owl: <http://www.w3.org/2002/07/owl#> "
+ " prefix vitroDisplay: <http://vitro.mannlib.cornell.edu/ontologies/display/1.1#> "
+ " prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> "
+ " prefix core: <http://vivoweb.org/ontology/core#> "
+ " prefix foaf: <http://xmlns.com/foaf/0.1/> "
+ " prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> "
+ " prefix localNav: <http://vitro.mannlib.cornell.edu/ns/localnav#> "
+ " prefix bibo: <http://purl.org/ontology/bibo/> ";
//single valued queries for foaf:Agent
static {
@ -390,63 +398,43 @@ public class ContextNodeFields implements DocumentModifier{
private class QueryRunner extends Thread{
private Individual ind;
private String query;
private StringBuffer propertyValues = new StringBuffer();
public String getPropertyValues(){
return propertyValues.toString();
}
private class QueryRunner implements Runnable {
private final Individual ind;
private final String query;
private final StringBuffer propertyValues = new StringBuffer();
public QueryRunner(Individual ind, String query){
this.ind = ind;
this.query = query;
}
public String getPropertyValues(){
return propertyValues.toString();
}
public void run(){
// StringBuffer propertyValues = new StringBuffer();
public void run(){
propertyValues.append(runQuery(ind, query));
// QuerySolutionMap initialBinding = new QuerySolutionMap();
// Resource uriResource = ResourceFactory.createResource(ind.getURI());
// initialBinding.add("uri", uriResource);
//
// Query sparqlQuery = QueryFactory.create( query, Syntax.syntaxARQ);
// dataset.getLock().enterCriticalSection(Lock.READ);
// try{
// QueryExecution qExec = QueryExecutionFactory.create(sparqlQuery, dataset, initialBinding);
// try{
// ResultSet results = qExec.execSelect();
// while(results.hasNext()){
// QuerySolution soln = results.nextSolution();
// Iterator<String> iter = soln.varNames() ;
// while( iter.hasNext()){
// String name = iter.next();
// RDFNode node = soln.get( name );
// if( node != null ){
// propertyValues.append(" " + node.toString());
// }else{
// log.debug(name + " is null");
// }
// }
// }
// }catch(Throwable t){
// log.error(t,t);
// } finally{
// qExec.close();
// }
// }finally{
// dataset.getLock().leaveCriticalSection();
// }
//
//objectProperties.append(propertyValues.toString());
}
}
// count for thread names
private static Integer threadCounter = 0;
private static String getNewThreadName(){
synchronized(threadCounter){
Integer i = threadCounter;
threadCounter = threadCounter + 1;
return "IndexBuilder-ContextNodeFields-" + i.toString();
}
}
private class ContextNodeFieldsThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread( getNewThreadName() );
}
}
}

View file

@ -9,7 +9,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.document.Document;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.joda.time.DateTime;
@ -41,9 +41,7 @@ public class IndividualToSolrDocument {
public List<DocumentModifier> documentModifiers = new ArrayList<DocumentModifier>();
private static List<String> contextNodeClassNames = new ArrayList<String>();
public IndividualToSolrDocument(
ClassProhibitedFromSearch classesProhibitedFromSearch,
IndividualProhibitedFromSearch individualProhibitedFromSearch){
@ -73,7 +71,6 @@ public class IndividualToSolrDocument {
classPublicNames.append("");
SolrInputDocument doc = new SolrInputDocument();
//DocId
String id = ind.getURI();
log.debug("translating " + id);
@ -150,8 +147,8 @@ public class IndividualToSolrDocument {
doc.addField(term.PROHIBITED_FROM_TEXT_RESULTS, prohibited?"1":"0");
//lucene DocID
doc.addField(term.DOCID, entClassName + id);
//DocID
doc.addField(term.DOCID, getIdForUri( ind.getURI() ) );
//vitro id
doc.addField(term.URI, id);
@ -292,14 +289,28 @@ public class IndividualToSolrDocument {
public Object getIndexId(Object obj) {
throw new Error("IndiviudalToSolrDocument.getIndexId() is unimplemented");
}
public String getIdForUri(String uri){
if( uri != null ){
return entClassName + uri;
}else{
return null;
}
}
public String getQueryForId(String uri ){
return term.DOCID + ':' + getIdForUri(uri);
}
public Individual unTranslate(Object result) {
Individual ent = null;
if( result != null && result instanceof Document){
Document hit = (Document) result;
String id = hit.get(VitroSearchTermNames.URI);
if( result != null && result instanceof SolrDocument){
SolrDocument hit = (SolrDocument) result;
String uri= (String) hit.getFirstValue(term.URI);
ent = new IndividualImpl();
ent.setURI(id);
ent.setURI(uri);
}
return ent;
}

View file

@ -3,28 +3,23 @@
package edu.cornell.mannlib.vitro.webapp.search.solr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.*;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.impl.StreamingUpdateSolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.search.docbuilder.Obj2DocIface;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexerIface;
import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters;
public class SolrIndexer implements IndexerIface {
private final static Log log = LogFactory.getLog(SolrIndexer.class);
@ -41,6 +36,43 @@ public class SolrIndexer implements IndexerIface {
@Override
public void index(Individual ind) throws IndexingException {
if( ! indexing )
throw new IndexingException("SolrIndexer: must call " +
"startIndexing() before index().");
if( ind == null )
log.debug("Individual to index was null, ignoring.");
try{
if( urisIndexed.contains(ind.getURI()) ){
log.debug("already indexed " + ind.getURI() );
return;
}else{
SolrInputDocument solrDoc = null;
synchronized(this){
urisIndexed.add(ind.getURI());
}
log.debug("indexing " + ind.getURI());
solrDoc = individualToSolrDoc.translate(ind);
if( solrDoc != null){
//sending each doc individually is inefficient
// Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
// docs.add( solrDoc );
UpdateResponse res = server.add( solrDoc );
log.debug("response after adding docs to server: "+ res);
}else{
log.debug("removing from index " + ind.getURI());
//TODO: how do we delete document?
//writer.deleteDocuments((Term)obj2doc.getIndexId(ind));
}
}
} catch (IOException ex) {
throw new IndexingException(ex.getMessage());
} catch (SolrServerException ex) {
throw new IndexingException(ex.getMessage());
}
if( ! indexing )
throw new IndexingException("SolrIndexer: must call " +
"startIndexing() before index().");
@ -73,7 +105,7 @@ public class SolrIndexer implements IndexerIface {
} catch (IOException ex) {
throw new IndexingException(ex.getMessage());
} catch (SolrServerException ex) {
throw new IndexingException(ex.getMessage());
throw new IndexingException(ex.getMessage());
}
}
@ -84,14 +116,21 @@ public class SolrIndexer implements IndexerIface {
@Override
public void prepareForRebuild() throws IndexingException {
// TODO Auto-generated method stub
// TODO Auto-generated method stub
}
@Override
public void removeFromIndex(Individual ind) throws IndexingException {
// TODO Auto-generated method stub
public void removeFromIndex(String uri) throws IndexingException {
if( uri != null ){
try {
server.deleteByQuery( individualToSolrDoc.getQueryForId(uri));
log.debug("deleted " + " " + uri);
} catch (SolrServerException e) {
log.error( "could not delete individual " + uri, e);
} catch (IOException e) {
log.error( "could not delete individual " + uri, e);
}
}
}
@Override

View file

@ -2,6 +2,7 @@
package edu.cornell.mannlib.vitro.webapp.search.solr;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@ -12,6 +13,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.XMLResponseParser;
import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.query.Dataset;
@ -26,11 +28,10 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.JenaBaseDao;
import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext;
import edu.cornell.mannlib.vitro.webapp.dao.jena.SearchReindexingListener;
import edu.cornell.mannlib.vitro.webapp.dao.jena.WebappDaoFactoryJena;
import edu.cornell.mannlib.vitro.webapp.search.IndexConstants;
import edu.cornell.mannlib.vitro.webapp.search.beans.IndividualProhibitedFromSearchImpl;
import edu.cornell.mannlib.vitro.webapp.search.beans.ObjectSourceIface;
import edu.cornell.mannlib.vitro.webapp.search.beans.ProhibitedFromSearch;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneSetup;
import edu.cornell.mannlib.vitro.webapp.servlet.setup.AbortStartup;
public class SolrSetup implements javax.servlet.ServletContextListener{
@ -56,28 +57,30 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
);
return;
}
CommonsHttpSolrServer server;
server = new CommonsHttpSolrServer( solrServerUrl );
CommonsHttpSolrServer server;
server = new CommonsHttpSolrServer(new URL( solrServerUrl ),null,new XMLResponseParser(),false);
//server = new CommonsHttpSolrServer(new URL( solrServerUrl ));
server.setSoTimeout(10000); // socket read timeout
server.setConnectionTimeout(10000);
server.setDefaultMaxConnectionsPerHost(100);
server.setMaxTotalConnections(100);
server.setMaxRetries(1);
context.setAttribute(LOCAL_SOLR_SERVER, server);
/* setup the individual to solr doc translation */
//first we need a ent2luceneDoc translator
OntModel displayOntModel = (OntModel) sce.getServletContext().getAttribute("displayOntModel");
OntModel abox = ModelContext.getBaseOntModelSelector(context).getABoxModel();
OntModel abox = ModelContext.getBaseOntModelSelector(context).getABoxModel();
OntModel inferences = (OntModel)context.getAttribute( JenaBaseDao.INFERENCE_ONT_MODEL_ATTRIBUTE_NAME);
Dataset dataset = WebappDaoFactoryJena.makeInMemoryDataset(abox, inferences);
OntModel jenaOntModel = ModelContext.getJenaOntModel(context);
List<DocumentModifier> modifiers = new ArrayList<DocumentModifier>();
// modifiers.add(new CalculateParameters(ModelContext.getJenaOntModel(context)));
modifiers.add(new CalculateParameters(dataset));
modifiers.add(new ContextNodeFields(dataset));
modifiers.add(new ContextNodeFields(jenaOntModel));
IndividualToSolrDocument indToSolrDoc = new IndividualToSolrDocument(
new ProhibitedFromSearch(DisplayVocabulary.PRIMARY_LUCENE_INDEX_URI, displayOntModel),
@ -88,7 +91,7 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
SolrIndexer solrIndexer = new SolrIndexer(server, indToSolrDoc);
if( solrIndexer.isIndexEmpty() ){
log.info("solr index is empty, requesting rebuild");
sce.getServletContext().setAttribute(LuceneSetup.INDEX_REBUILD_REQUESTED_AT_STARTUP, Boolean.TRUE);
sce.getServletContext().setAttribute(IndexConstants.INDEX_REBUILD_REQUESTED_AT_STARTUP, Boolean.TRUE);
}
// This is where the builder gets the list of places to try to
@ -96,11 +99,9 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
// does not get into the search index.
WebappDaoFactory wadf = (WebappDaoFactory) context.getAttribute("webappDaoFactory");
VitroFilters vf = VitroFilterUtils.getPublicFilter(context);
wadf = new WebappDaoFactoryFiltering(wadf, vf);
List<ObjectSourceIface> sources = new ArrayList<ObjectSourceIface>();
sources.add(wadf.getIndividualDao());
wadf = new WebappDaoFactoryFiltering(wadf, vf);
IndexBuilder builder = new IndexBuilder(context, solrIndexer, sources);
IndexBuilder builder = new IndexBuilder(context, solrIndexer, wadf);
// to the servlet context so we can access it later in the webapp.
context.setAttribute(IndexBuilder.class.getName(), builder);
@ -109,8 +110,8 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
SearchReindexingListener srl = new SearchReindexingListener(builder);
ModelContext.registerListenerForChanges(ctx, srl);
if( sce.getServletContext().getAttribute(LuceneSetup.INDEX_REBUILD_REQUESTED_AT_STARTUP) instanceof Boolean &&
(Boolean)sce.getServletContext().getAttribute(LuceneSetup.INDEX_REBUILD_REQUESTED_AT_STARTUP) ){
if( sce.getServletContext().getAttribute(IndexConstants.INDEX_REBUILD_REQUESTED_AT_STARTUP) instanceof Boolean &&
(Boolean)sce.getServletContext().getAttribute(IndexConstants.INDEX_REBUILD_REQUESTED_AT_STARTUP) ){
log.info("Rebuild of solr index required before startup.");
builder.doIndexRebuild();
int n = 0;
@ -132,6 +133,9 @@ public class SolrSetup implements javax.servlet.ServletContextListener{
@Override
public void contextDestroyed(ServletContextEvent sce) {
IndexBuilder builder = (IndexBuilder)sce.getServletContext().getAttribute(IndexBuilder.class.getName());
if( builder != null )
builder.stopIndexingThread();
}

View file

@ -32,7 +32,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.jena.ModelContext;
import edu.cornell.mannlib.vitro.webapp.dao.jena.OntModelSelector;
import edu.cornell.mannlib.vitro.webapp.ontology.update.KnowledgeBaseUpdater;
import edu.cornell.mannlib.vitro.webapp.ontology.update.UpdateSettings;
import edu.cornell.mannlib.vitro.webapp.search.lucene.LuceneSetup;
import edu.cornell.mannlib.vitro.webapp.search.IndexConstants;
/**
* Invokes process to test whether the knowledge base needs any updating
@ -116,7 +116,7 @@ public class UpdateKnowledgeBase implements ServletContextListener {
try {
if (ontologyUpdater.updateRequired()) {
ctx.setAttribute(LuceneSetup.INDEX_REBUILD_REQUESTED_AT_STARTUP, Boolean.TRUE);
ctx.setAttribute(IndexConstants.INDEX_REBUILD_REQUESTED_AT_STARTUP, Boolean.TRUE);
//doMiscAppMetadataReplacements(ctx.getRealPath(MISC_REPLACEMENTS_FILE), oms);
reloadDisplayModel(ctx);
}