Merging from 1.3 maint branch.

This commit is contained in:
briancaruso 2011-07-20 15:42:26 +00:00
parent 6ecc1cc704
commit 5ff4bc5d6d
5 changed files with 95 additions and 171 deletions

View file

@ -78,6 +78,9 @@ public class IndexBuilder extends Thread {
/** Length of pause between a model change an the start of indexing. */ /** Length of pause between a model change an the start of indexing. */
protected long reindexInterval = 1000 * 60 /* msec */ ; protected long reindexInterval = 1000 * 60 /* msec */ ;
/** Length of pause between when work comes into queue to when indexing starts */
protected long waitAfterNewWorkInterval = 500; //msec
/** Number of threads to use during indexing. */ /** Number of threads to use during indexing. */
protected int numberOfThreads = 10; protected int numberOfThreads = 10;
@ -155,7 +158,7 @@ public class IndexBuilder extends Thread {
/** /**
* This is called when the system shuts down. * This is called when the system shuts down.
*/ */
public synchronized void stopIndexingThread() { public synchronized void stopIndexingThread() {
stopRequested = true; stopRequested = true;
this.notifyAll(); this.notifyAll();
this.interrupt(); this.interrupt();
@ -169,17 +172,17 @@ public class IndexBuilder extends Thread {
log.debug("full re-index requested"); log.debug("full re-index requested");
indexRebuild(); indexRebuild();
}else if( !stopRequested && isThereWorkToDo() ){ }else if( !stopRequested && isThereWorkToDo() ){
Thread.sleep(500); //wait a bit to let a bit more work to come into the queue Thread.sleep(waitAfterNewWorkInterval); //wait a bit to let a bit more work to come into the queue
log.debug("work found for IndexBuilder, starting update"); log.debug("work found for IndexBuilder, starting update");
updatedIndex(); updatedIndex();
}else if( !stopRequested && ! isThereWorkToDo() ){ } else {
log.debug("there is no indexing working to do, waiting for work"); log.debug("there is no indexing working to do, waiting for work");
synchronized (this) { this.wait(reindexInterval); } synchronized (this) { this.wait(reindexInterval); }
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("woken up",e); log.debug("woken up",e);
}catch(Throwable e){ }catch(Throwable e){
if( log != null )//may be null on shutdown if( ! stopRequested && log != null )//may be null on shutdown
log.error(e,e); log.error(e,e);
} }
} }
@ -187,30 +190,37 @@ public class IndexBuilder extends Thread {
if( indexer != null) if( indexer != null)
indexer.abortIndexingAndCleanUp(); indexer.abortIndexingAndCleanUp();
if(log != null )//may be null on shutdown if(! stopRequested && log != null )//may be null on shutdown
log.info("Stopping IndexBuilder thread"); log.info("Stopping IndexBuilder thread");
} }
/* ******************** non-public methods ************************* */ /* ******************** non-public methods ************************* */
private synchronized Collection<String> getAndEmptyChangedStatements(){
private List<Statement> getAndEmptyChangedStatements(){
List<Statement> localChangedStmt = null; List<Statement> localChangedStmt = null;
synchronized( changedStmtQueue ){ synchronized( changedStmtQueue ){
localChangedStmt = new ArrayList<Statement>(changedStmtQueue.size()); localChangedStmt = new ArrayList<Statement>(changedStmtQueue.size());
localChangedStmt.addAll( changedStmtQueue ); localChangedStmt.addAll( changedStmtQueue );
changedStmtQueue.clear(); changedStmtQueue.clear();
} }
return localChangedStmt;
}
/**
* For a collection of statements, find the URIs that need to be updated in
* the index.
*/
private Collection<String> statementsToUris( Collection<Statement> localChangedStmt ){
Collection<String> urisToUpdate = new HashSet<String>(); Collection<String> urisToUpdate = new HashSet<String>();
for( Statement stmt : localChangedStmt){ for( Statement stmt : localChangedStmt){
if( stmt == null ) if( stmt == null )
continue; continue;
for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){ for( StatementToURIsToUpdate stu : stmtToURIsToIndexFunctions ){
urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) ); urisToUpdate.addAll( stu.findAdditionalURIsToIndex(stmt) );
} }
} }
return urisToUpdate;
return urisToUpdate;
} }
/** /**
@ -264,7 +274,7 @@ public class IndexBuilder extends Thread {
protected void updatedIndex() { protected void updatedIndex() {
log.debug("Starting updateIndex()"); log.debug("Starting updateIndex()");
makeAddAndDeleteLists( getAndEmptyChangedStatements() ); makeAddAndDeleteLists( statementsToUris(getAndEmptyChangedStatements()) );
this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, updatedUris.size() / 20); this.numberOfThreads = Math.max( MAX_UPDATE_THREADS, updatedUris.size() / 20);
doBuild( updatedUris.iterator(), deletedUris ); doBuild( updatedUris.iterator(), deletedUris );
@ -289,13 +299,13 @@ public class IndexBuilder extends Thread {
* to false, and a check is made before adding, it will work fine; but * to false, and a check is made before adding, it will work fine; but
* checking if an object is on the index is slow. * checking if an object is on the index is slow.
*/ */
private void doBuild(Iterator<String> updates, Collection<String> deletes ){ private void doBuild(Iterator<String> updates, Collection<String> deletes ){
boolean aborted = false;
boolean updateRequested = ! reindexRequested; boolean updateRequested = ! reindexRequested;
try { try {
if( reindexRequested ) if( reindexRequested ){
indexer.prepareForRebuild(); indexer.prepareForRebuild();
}
indexer.startIndexing(); indexer.startIndexing();
reindexRequested = false; reindexRequested = false;
@ -311,29 +321,21 @@ public class IndexBuilder extends Thread {
} }
} }
} }
indexUriList(updates); indexUriList(updates);
} catch (AbortIndexing abort){
if( log != null)
log.debug("aborting the indexing because thread stop was requested");
aborted = true;
} catch (Exception e) { } catch (Exception e) {
log.error(e,e); if( log != null) log.debug("Exception during indexing",e);
}
if( aborted ){
indexer.abortIndexingAndCleanUp();
}else{
indexer.endIndexing();
} }
indexer.endIndexing();
} }
/** /**
* Use the back end indexer to index each object that the Iterator returns. * Use the back end indexer to index each object that the Iterator returns.
* @throws AbortIndexing * @throws AbortIndexing
*/ */
private void indexUriList(Iterator<String> updateUris ) throws AbortIndexing{ private void indexUriList(Iterator<String> updateUris ) {
//make a copy of numberOfThreads so the local copy is safe during this method. //make a copy of numberOfThreads so the local copy is safe during this method.
int numberOfThreads = this.numberOfThreads; int numberOfThreads = this.numberOfThreads;
if( numberOfThreads > MAX_THREADS ) if( numberOfThreads > MAX_THREADS )
@ -427,12 +429,6 @@ public class IndexBuilder extends Thread {
synchronized( changedStmtQueue ){ synchronized( changedStmtQueue ){
return reindexRequested || ! changedStmtQueue.isEmpty() ; return reindexRequested || ! changedStmtQueue.isEmpty() ;
} }
} }
private class AbortIndexing extends Exception {
// Just a vanilla exception
}
} }

View file

@ -54,33 +54,38 @@ class IndexWorkerThread extends Thread{
while( individualsToIndex.hasNext() ){ while( individualsToIndex.hasNext() ){
//need to stop right away if requested to //need to stop right away if requested to
if( stopRequested ) return; if( stopRequested ) return;
try{
//build the document and add it to the index //build the document and add it to the index
Individual ind = null; Individual ind = null;
try { try {
ind = individualsToIndex.next(); ind = individualsToIndex.next();
indexer.index( ind ); indexer.index( ind );
} catch (IndexingException e) { } catch (IndexingException e) {
if( stopRequested ) if( stopRequested )
return; return;
if( ind != null ) if( ind != null )
log.error("Could not index individual " + ind.getURI() , e ); log.error("Could not index individual " + ind.getURI() , e );
else else
log.warn("Could not index, individual was null"); log.warn("Could not index, individual was null");
} }
synchronized(this){ synchronized(this){
count++; count++;
if( log.isInfoEnabled() ){ if( log.isInfoEnabled() ){
if( (count % 100 ) == 0 && count > 0 ){ if( (count % 100 ) == 0 && count > 0 ){
long dt = (System.currentTimeMillis() - starttime); long dt = (System.currentTimeMillis() - starttime);
log.info("individuals indexed: " + count + " in " + dt + " msec " + log.info("individuals indexed: " + count + " in " + dt + " msec " +
" time per individual = " + (dt / count) + " msec" ); " time per individual = " + (dt / count) + " msec" );
} }
} }
} }
}catch(Throwable th){
//on tomcat shutdown odd exceptions get thrown and log can be null
if( log != null )
log.debug("Exception during index building",th);
}
} }
} }

View file

@ -47,11 +47,6 @@ public class ContextNodeFields implements DocumentModifier{
private Log log = LogFactory.getLog(ContextNodeFields.class); private Log log = LogFactory.getLog(ContextNodeFields.class);
public ContextNodeFields(Model model){ public ContextNodeFields(Model model){
// synchronized( ContextNodeFields.class){
// if( threadPool == null ){
// threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, new ContextNodeFieldsThreadFactory());
// }
// }
this.model = model; this.model = model;
} }
@ -60,11 +55,7 @@ public class ContextNodeFields implements DocumentModifier{
log.debug("retrieving context node values.."); log.debug("retrieving context node values..");
StringBuffer objectProperties = singleThreadExecute( individual, multiValuedQueriesForAgent); StringBuffer objectProperties = singleThreadExecute( individual, multiValuedQueriesForAgent);
//change fields of solr document
//SolrInputField targetField = doc.getField(VitroSearchTermNames.targetInfo);
//targetField.addValue(" " + runQuery(individual, multiValuedQueryForInformationResource), targetField.getBoost());
SolrInputField field = doc.getField(VitroSearchTermNames.ALLTEXT); SolrInputField field = doc.getField(VitroSearchTermNames.ALLTEXT);
field.addValue(objectProperties + " " + runQuery(individual, multiValuedQueryForInformationResource), field.getBoost()); field.addValue(objectProperties + " " + runQuery(individual, multiValuedQueryForInformationResource), field.getBoost());
@ -77,37 +68,7 @@ public class ContextNodeFields implements DocumentModifier{
propertyValues.append(runQuery(individual, query)); propertyValues.append(runQuery(individual, query));
} }
return propertyValues; return propertyValues;
} }
/** experimental, may not work */
private StringBuffer multiThreadExecute(Individual individual, List<String> queries ){
int queryCount = queries.size();
List<QueryRunner> tasks = new ArrayList<QueryRunner>(queryCount);
List<Future<QueryRunner>> completedTasks = new ArrayList<Future<QueryRunner>>(queryCount);
//Make a task for each query and start it.
for(String query : queries ){
QueryRunner queryTask = new QueryRunner(individual, query);
tasks.add(queryTask);
completedTasks.add( threadPool.submit( queryTask , queryTask));
}
//Wait for each thread to finish and collect results
StringBuffer objectProperties = new StringBuffer(" ");
for(int i = 0 ; i < queryCount; i++){
try {
completedTasks.get(i).get();
objectProperties.append( tasks.get(i).getPropertyValues() ) ;
} catch (InterruptedException e) {
log.error("Thread interrupted");
} catch (ExecutionException e) {
log.error("problem during execution",e);
e.printStackTrace();
}
}
return objectProperties;
}
public StringBuffer runQuery( Individual individual, String query ){ public StringBuffer runQuery( Individual individual, String query ){
StringBuffer propertyValues = new StringBuffer(); StringBuffer propertyValues = new StringBuffer();
@ -394,48 +355,7 @@ public class ContextNodeFields implements DocumentModifier{
+ " OPTIONAL { ?uri core:features ?i . ?i rdfs:label ?Features . } . " + " OPTIONAL { ?uri core:features ?i . ?i rdfs:label ?Features . } . "
+"}" ; +"}" ;
} }
private class QueryRunner implements Runnable {
private final Individual ind;
private final String query;
private final StringBuffer propertyValues = new StringBuffer();
public QueryRunner(Individual ind, String query){
this.ind = ind;
this.query = query;
}
public String getPropertyValues(){
return propertyValues.toString();
}
public void run(){
propertyValues.append(runQuery(ind, query));
}
}
// count for thread names
private static Integer threadCounter = 0;
private static String getNewThreadName(){
synchronized(threadCounter){
Integer i = threadCounter;
threadCounter = threadCounter + 1;
return "IndexBuilder-ContextNodeFields-" + i.toString();
}
}
private class ContextNodeFieldsThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread( getNewThreadName() );
}
}
public void shutdown(){ public void shutdown(){
shutdown=true; shutdown=true;

View file

@ -118,9 +118,14 @@ public class IndividualToSolrDocument {
return doc; return doc;
}catch(SkipIndividualException ex){ }catch(SkipIndividualException ex){
//indicates that this individual should not be indexed //indicates that this individual should not be indexed by returning null
log.debug(ex); log.debug(ex);
return null; return null;
}catch(Throwable th){
//Odd exceptions from jena get thrown on shutdown
if( log != null )
log.debug(th);
return null;
} }
} }
@ -168,6 +173,9 @@ public class IndividualToSolrDocument {
allTextValue.append(objectNames.toString()); allTextValue.append(objectNames.toString());
allTextValue.append(' ');
allTextValue.append(classPublicNames);
try { try {
String stripped = Jsoup.parse(allTextValue.toString()).text(); String stripped = Jsoup.parse(allTextValue.toString()).text();
allTextValue.setLength(0); allTextValue.setLength(0);

View file

@ -133,38 +133,33 @@ public class SolrIndexer implements IndexerIface {
@Override @Override
public void abortIndexingAndCleanUp() { public void abortIndexingAndCleanUp() {
shutdownRequested = true; shutdownRequested = true;
try{
server.commit();
}catch(SolrServerException e){
if( log != null)
log.debug("could not commit to solr server, " +
"this should not be a problem since solr will do autocommit");
} catch (IOException e) {
if( log != null)
log.debug("could not commit to solr server, " +
"this should not be a problem since solr will do autocommit");
}
try{ try{
individualToSolrDoc.shutdown(); individualToSolrDoc.shutdown();
}catch(Exception e){ }catch(Exception e){
if( log != null) if( log != null)
log.debug(e,e); log.debug(e,e);
} }
endIndexing();
} }
@Override @Override
public synchronized void endIndexing() { public synchronized void endIndexing() {
try {
if( doingFullIndexRebuild ){
removeDocumentsFromBeforeRebuild( );
}
} catch (Throwable e) {
if( log != null)
log.debug("could not remove documents from before build, " ,e);
}
try { try {
UpdateResponse res = server.commit(); UpdateResponse res = server.commit();
log.debug("Response after committing to server: "+ res ); log.debug("Response after committing to server: "+ res );
} catch (SolrServerException e) { } catch (Throwable e) {
log.error("Could not commit to solr server", e); if( log != null)
} catch(IOException e){ log.debug("could not commit to solr server, " +
log.error("Could not commit to solr server", e); "this should not be a problem since solr will do autocommit");
} }
if( doingFullIndexRebuild ){
removeDocumentsFromBeforeRebuild( );
}
indexing = false; indexing = false;
notifyAll(); notifyAll();
} }