From c69a3f9f93da10db4b70f15157066cfccbe50ddd Mon Sep 17 00:00:00 2001 From: anupsawant Date: Tue, 21 Jun 2011 20:20:25 +0000 Subject: [PATCH] threading code committed. --- webapp/config/default.log4j.properties | 3 +- .../webapp/search/indexing/IndexBuilder.java | 53 +++++++- .../search/indexing/IndexWorkerThread.java | 121 ++++++++++-------- .../webapp/search/indexing/IndexerIface.java | 2 +- .../webapp/search/lucene/LuceneIndexer.java | 8 +- .../search/solr/CalculateParameters.java | 69 +++++----- .../webapp/search/solr/ContextNodeFields.java | 12 +- .../webapp/search/solr/DocumentModifier.java | 3 +- .../search/solr/IndividualToSolrDocument.java | 18 +-- .../vitro/webapp/search/solr/SolrIndexer.java | 31 +++-- 10 files changed, 194 insertions(+), 126 deletions(-) diff --git a/webapp/config/default.log4j.properties b/webapp/config/default.log4j.properties index 9fc388165..2476c7b55 100644 --- a/webapp/config/default.log4j.properties +++ b/webapp/config/default.log4j.properties @@ -41,7 +41,8 @@ log4j.logger.edu.cornell.mannlib.vitro.webapp.dao.jena.RDBGraphGenerator=WARN #log4j.logger.edu.cornell.mannlib.vitro.webapp.search.solr.ContextNodeFields=DEBUG log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder=INFO log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder=DEBUG -log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexThread=DEBUG +log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexWorkerThread=INFO +log4j.logger.edu.cornell.mannlib.vitro.webapp.search.solr.SolrIndexer=INFO # suppress odd warnings from libraries log4j.logger.org.openjena.riot=FATAL log4j.logger.org.directwebremoting=FATAL \ No newline at end of file diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java index c95c837d1..240df264b 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexBuilder.java @@ -9,8 +9,11 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Stack; +import java.util.Queue; import javax.servlet.ServletContext; +import org.apache.solr.client.solrj.SolrServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -21,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.VClassDao; import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary; import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory; import edu.cornell.mannlib.vitro.webapp.search.beans.ObjectSourceIface; +import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters; /** @@ -285,8 +289,27 @@ public class IndexBuilder extends Thread { private void indexForSource(Iterator individuals , boolean newDocs) throws AbortIndexing{ - long starttime = System.currentTimeMillis(); - long count = 0; + // long starttime = System.currentTimeMillis(); + int count = 0; + int numOfThreads = 10; + + + List workers = new ArrayList(); + boolean distributing = true; + + for(int i = 0; i< numOfThreads ;i++){ + workers.add(new IndexWorkerThread(indexer,i,distributing)); // made a pool of workers + } + + log.info("Indexing worker pool ready for indexing."); + + // starting worker threads + + for(int i =0; i < numOfThreads; i++){ + workers.get(i).start(); + } + + while(individuals.hasNext()){ if( stopRequested ) throw new AbortIndexing(); @@ -295,7 +318,10 @@ public class IndexBuilder extends Thread { try{ ind = individuals.next(); - indexer.index(ind, newDocs); + //indexer.index(ind); + + workers.get(count%numOfThreads).addToQueue(ind); // adding individual to worker queue. + }catch(Throwable ex){ if( stopRequested || log == null){//log might be null if system is shutting down. throw new AbortIndexing(); @@ -304,20 +330,33 @@ public class IndexBuilder extends Thread { log.warn("Error indexing individual " + uri + " " + ex.getMessage()); } count++; - if( log.isDebugEnabled() ){ + /* if( log.isDebugEnabled() ){ if( (count % 100 ) == 0 && count > 0 ){ long dt = (System.currentTimeMillis() - starttime); log.debug("individuals indexed: " + count + " in " + dt + " msec " + " time pre individual = " + (dt / count) + " msec" ); } - } + } */ } - log.info( + for(int i =0 ; i < numOfThreads; i ++){ + workers.get(i).setDistributing(false); + } + for(int i =0; i < numOfThreads; i++){ + try{ + workers.get(i).join(); + }catch(InterruptedException e){ + log.error(e,e); + } + } + + /* log.info( "individuals indexed: " + count + " in " + (System.currentTimeMillis() - starttime) + " msec" + (count!=0?(" time per individual = " + (System.currentTimeMillis() - starttime)/ count + " msec"):"") - ); + );*/ } + + /** * For a list of individuals, this builds a list of dependent resources and returns it. diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java index 5a032cc7c..513093c27 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexWorkerThread.java @@ -1,78 +1,97 @@ package edu.cornell.mannlib.vitro.webapp.search.indexing; -import java.util.Iterator; + import java.util.LinkedList; -import java.util.List; import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import edu.cornell.mannlib.vitro.webapp.beans.Individual; +import edu.cornell.mannlib.vitro.webapp.search.IndexingException; +import edu.cornell.mannlib.vitro.webapp.search.solr.IndividualToSolrDocument; -class IndexWorkerThread implements Runnable{ +class IndexWorkerThread extends Thread{ - private IndexerIface indexer; - private static Log log = LogFactory.getLog(IndexWorkerThread.class); + protected IndividualToSolrDocument individualToSolrDoc; + private IndexerIface indexer = null; + private Log log = LogFactory.getLog(IndexWorkerThread.class); + private static long count=0; private Queue indQueue = new LinkedList(); + private int threadNum; + private static long starttime = 0; + private boolean distributing; - public IndexWorkerThread(IndexerIface indexer){ - + public IndexWorkerThread(IndexerIface indexer, int threadNum,boolean distributing){ this.indexer = indexer; + this.threadNum = threadNum; + this.distributing = distributing; + synchronized(this){ + if(starttime == 0) + starttime = System.currentTimeMillis(); + } } - public void addToQueue(Individual ind, boolean newDocs){ - + public void addToQueue(Individual ind){ + synchronized(indQueue){ + indQueue.offer(ind); + indQueue.notify(); + } } - public void shutdown() { - - + public boolean isQueueEmpty(){ + return indQueue.isEmpty(); } + public void setDistributing(boolean distributing){ + this.distributing = distributing; + } public void run(){ - - //check for work - //if work found, - // translate - // send to server - //sleep (1000) - + + while(this.distributing){ + synchronized(indQueue){ + try{ + while(indQueue.isEmpty() && this.distributing){ + try{ + log.debug("Worker number " + threadNum + " waiting on some work to be alloted."); + indQueue.wait(1000); + }catch(InterruptedException ie){ + log.error(ie,ie); + } + } + + Thread.sleep(50); //wait a bit to let a bit more work to come into the queue + log.debug("work found for Woker number " + threadNum); + addDocsToIndex(); + + } catch (InterruptedException e) { + log.debug("Worker number " + threadNum + " woken up",e); + } + catch(Throwable e){ + log.error(e,e); + } + } + } + log.info("Worker number " + threadNum + " exiting."); } - /*protected void indexInd() throws AbortIndexing{ - long starttime = System.currentTimeMillis(); - long count = 0; - Iterator individuals = firstList.iterator(); - while(individuals.hasNext()){ - if( stopRequested ) - throw new AbortIndexing(); - - Individual ind = null; - try{ - ind = individuals.next(); - indexer.index(ind, newDocs); - }catch(Throwable ex){ - if( stopRequested || log == null){//log might be null if system is shutting down. - throw new AbortIndexing(); - } - String uri = ind!=null?ind.getURI():"null"; - log.warn("Error indexing individual from separate thread" + uri + " " + ex.getMessage()); - } - count++; - if( log.isDebugEnabled() ){ - if( (count % 100 ) == 0 && count > 0 ){ - long dt = (System.currentTimeMillis() - starttime); - log.debug("individuals indexed from seperate thread: " + count + " in " + dt + " msec " + - " time pre individual from seperate thread = " + (dt / count) + " msec" ); - } - } - } + protected void addDocsToIndex() throws IndexingException{ + + while(!indQueue.isEmpty()){ + indexer.index(indQueue.poll()); + synchronized(this){ + count++; + if( log.isInfoEnabled() ){ + if( (count % 100 ) == 0 && count > 0 ){ + long dt = (System.currentTimeMillis() - starttime); + log.info("individuals indexed: " + count + " in " + dt + " msec " + + " time per individual = " + (dt / count) + " msec" ); + } + } + } + } } - private class AbortIndexing extends Exception { - // Just a vanilla exception - } */ + } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexerIface.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexerIface.java index a456cfbad..f4ee9a057 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexerIface.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/indexing/IndexerIface.java @@ -38,7 +38,7 @@ public interface IndexerIface { * @param newDoc - if true, just insert doc, if false attempt to update. * @throws IndexingException */ - public void index(Individual ind, boolean newDoc)throws IndexingException; + public void index(Individual ind)throws IndexingException; /** diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/lucene/LuceneIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/lucene/LuceneIndexer.java index e7bc2b8cc..d685c9be9 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/lucene/LuceneIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/lucene/LuceneIndexer.java @@ -473,5 +473,11 @@ public class LuceneIndexer implements IndexerIface { System.out.println("Could not clean up temp indexing dir " + currentOffLineDir); } } - } + } + + @Override + public void index(Individual ind) throws IndexingException { + // TODO Auto-generated method stub + + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/CalculateParameters.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/CalculateParameters.java index e8a504527..bc2580f5d 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/CalculateParameters.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/CalculateParameters.java @@ -1,6 +1,7 @@ package edu.cornell.mannlib.vitro.webapp.search.solr; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; @@ -40,8 +41,7 @@ public class CalculateParameters implements DocumentModifier { private Dataset dataset; public static int totalInd=1; - public static Map betaMap = new Hashtable(); - private float phi; + protected Map betaMap = new Hashtable(); private static final String prefix = "prefix owl: " + " prefix vitroDisplay: " + " prefix rdf: " @@ -81,6 +81,10 @@ public class CalculateParameters implements DocumentModifier { new Thread(new TotalInd(this.dataset,totalCountQuery)).start(); } + public CalculateParameters(){ + super(); + } + public float calculateBeta(String uri){ float beta=0; int Conn=0; @@ -116,19 +120,12 @@ public class CalculateParameters implements DocumentModifier { StringTokenizer nodes = new StringTokenizer(adjNodes.toString()," "); String uri=null; - float beta=0; int size=0; - phi = 0.1F; + float phi = 0.1F; while(nodes.hasMoreTokens()){ size++; uri = nodes.nextToken(); - if(hasBeta(uri)){ // get if already calculated - phi += getBeta(uri); - }else{ // query if not calculated and put in map - beta = calculateBeta(uri); - setBeta(uri, beta); - phi+=beta; - } + phi += getBeta(uri); } if(size>0) phi = (float)phi/size; @@ -137,20 +134,21 @@ public class CalculateParameters implements DocumentModifier { return phi; } - public Float getBeta(String uri){ - return betaMap.get(uri); - } - public float getPhi(){ - return phi; - } - public boolean hasBeta(String uri){ - return betaMap.containsKey(uri); - } - public void setBeta(String uri, float beta){ - betaMap.put(uri, beta); + public synchronized Float getBeta(String uri){ + + float beta; + if(betaMap.containsKey(uri)){ + beta = betaMap.get(uri); + }else{ + beta = calculateBeta(uri); // or calculate & put in map + betaMap.put(uri, beta); + } + return beta; + } - public String[] getAdjacentNodes(String uri,boolean isPerson){ + + public String[] getAdjacentNodes(String uri){ List queryList = new ArrayList(); Set adjacentNodes = new HashSet(); @@ -222,9 +220,9 @@ public class CalculateParameters implements DocumentModifier { RDFNode coauthor = null; try{ while(queryItr.hasNext()){ - if(!isPerson){ + /*if(!isPerson){ queryItr.next(); // we don't want first query to execute if the ind is not a person. - } + }*/ query = QueryFactory.create(queryItr.next(),Syntax.syntaxARQ); QueryExecution qexec = QueryExecutionFactory.create(query,dataset,initialBinding); try{ @@ -276,26 +274,17 @@ public class CalculateParameters implements DocumentModifier { } @Override - public void modifyDocument(Individual individual, SolrInputDocument doc) { + public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) { // TODO Auto-generated method stub // calculate beta value. log.debug("Parameter calculation starts.."); - float beta = 0; String uri = individual.getURI(); - if(hasBeta(uri)){ - beta = getBeta(uri); - }else{ - beta = calculateBeta(uri); // or calculate & put in map - setBeta(uri,beta); - } - - boolean isPerson = (IndividualToSolrDocument.superClassNames.contains("Person")) ? true : false ; - String adjInfo[] = getAdjacentNodes(uri,isPerson); + String adjInfo[] = getAdjacentNodes(uri); StringBuffer info = new StringBuffer(); info.append(adjInfo[0]); - info.append(IndividualToSolrDocument.addUri.toString()); - phi = calculatePhi(info); + info.append(addUri.toString()); + float phi = calculatePhi(info); for(String term: fieldsToAddBetaTo){ SolrInputField f = doc.getField( term ); @@ -314,6 +303,10 @@ public class CalculateParameters implements DocumentModifier { log.debug("Parameter calculation is done"); } + public void clearMap(){ + betaMap.clear(); + } + } class TotalInd implements Runnable{ diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/ContextNodeFields.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/ContextNodeFields.java index 0e6b7fb46..23507f48f 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/ContextNodeFields.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/ContextNodeFields.java @@ -103,7 +103,7 @@ public class ContextNodeFields implements DocumentModifier{ @Override - public void modifyDocument(Individual individual, SolrInputDocument doc) { + public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) { log.debug("retrieving context node values.."); @@ -111,7 +111,7 @@ public class ContextNodeFields implements DocumentModifier{ SolrInputField targetField = doc.getField(VitroTermNames.targetInfo); StringBuffer objectProperties = new StringBuffer(); - if(IndividualToSolrDocument.superClassNames.contains("Agent")){ + objectProperties.append(" "); int threadCount = multiValuedQueriesForAgent.size(); @@ -135,14 +135,15 @@ public class ContextNodeFields implements DocumentModifier{ log.error("Thread " + threads[i].getName() + " interrupted!"); } } - } + - if(IndividualToSolrDocument.superClassNames.contains("InformationResource")){ + targetField.addValue(" " + runQuery(individual, multiValuedQueryForInformationResource), targetField.getBoost()); - } + field.addValue(objectProperties, field.getBoost()); log.debug("context node values are retrieved"); + } @@ -388,6 +389,7 @@ public class ContextNodeFields implements DocumentModifier{ } + private class QueryRunner extends Thread{ private Individual ind; diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/DocumentModifier.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/DocumentModifier.java index 662c017ec..4a7bcf100 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/DocumentModifier.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/DocumentModifier.java @@ -10,5 +10,6 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual; * This interface represents an object that can add to a SolrInputDocument. */ public interface DocumentModifier { - public void modifyDocument(Individual individual, SolrInputDocument doc); + public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri); + } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/IndividualToSolrDocument.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/IndividualToSolrDocument.java index 3c5d24185..b95c7b031 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/IndividualToSolrDocument.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/IndividualToSolrDocument.java @@ -38,14 +38,12 @@ public class IndividualToSolrDocument { private IndividualProhibitedFromSearch individualProhibitedFromSearch; - public static ArrayList superClassNames = null; - - public static StringBuffer addUri = null; - - private List documentModifiers = new ArrayList(); + public List documentModifiers = new ArrayList(); private static List contextNodeClassNames = new ArrayList(); + + public IndividualToSolrDocument( ClassProhibitedFromSearch classesProhibitedFromSearch, IndividualProhibitedFromSearch individualProhibitedFromSearch){ @@ -68,6 +66,8 @@ public class IndividualToSolrDocument { @SuppressWarnings("static-access") public SolrInputDocument translate(Individual ind) throws IndexingException{ long tProhibited = System.currentTimeMillis(); + ArrayList superClassNames = null; + StringBuffer addUri = null; String value; StringBuffer classPublicNames = new StringBuffer(); classPublicNames.append(""); @@ -193,7 +193,7 @@ public class IndividualToSolrDocument { } } - if(documentModifiers == null){ + if(documentModifiers == null || documentModifiers.isEmpty()){ doc.addField(term.NAME_RAW, value, NAME_BOOST); doc.addField(term.NAME_LOWERCASE, value.toLowerCase(),NAME_BOOST); doc.addField(term.NAME_UNSTEMMED, value,NAME_BOOST); @@ -210,7 +210,7 @@ public class IndividualToSolrDocument { long tMoniker = System.currentTimeMillis(); - if(documentModifiers == null){ + if(documentModifiers == null || documentModifiers.isEmpty()){ //boost for entity if(ind.getSearchBoost() != null && ind.getSearchBoost() != 0) doc.setDocumentBoost(ind.getSearchBoost()); @@ -269,10 +269,10 @@ public class IndividualToSolrDocument { doc.addField(term.ALLTEXT_PHONETIC, alltext,PHONETIC_BOOST); //run the document modifiers - if( documentModifiers != null ){ + if( documentModifiers != null && !documentModifiers.isEmpty()){ doc.addField(term.targetInfo,""); for(DocumentModifier modifier: documentModifiers){ - modifier.modifyDocument(ind, doc); + modifier.modifyDocument(ind, doc, addUri); } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/SolrIndexer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/SolrIndexer.java index 33ba1fa6d..0c22d41cd 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/SolrIndexer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/search/solr/SolrIndexer.java @@ -19,6 +19,7 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual; import edu.cornell.mannlib.vitro.webapp.search.IndexingException; import edu.cornell.mannlib.vitro.webapp.search.docbuilder.Obj2DocIface; import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexerIface; +import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters; public class SolrIndexer implements IndexerIface { private final static Log log = LogFactory.getLog(SolrIndexer.class); @@ -34,7 +35,7 @@ public class SolrIndexer implements IndexerIface { } @Override - public synchronized void index(Individual ind, boolean newDoc) throws IndexingException { + public void index(Individual ind) throws IndexingException { if( ! indexing ) throw new IndexingException("SolrIndexer: must call " + "startIndexing() before index()."); @@ -47,15 +48,19 @@ public class SolrIndexer implements IndexerIface { log.debug("already indexed " + ind.getURI() ); return; }else{ - urisIndexed.add(ind.getURI()); - log.debug("indexing " + ind.getURI()); - - SolrInputDocument solrDoc = individualToSolrDoc.translate(ind); + SolrInputDocument solrDoc = null; + synchronized(this){ + urisIndexed.add(ind.getURI()); + } + log.debug("indexing " + ind.getURI()); + // synchronized(individualToSolrDoc){ + solrDoc = individualToSolrDoc.translate(ind); + // } if( solrDoc != null){ //sending each doc individually is inefficient - Collection docs = new ArrayList(); - docs.add( solrDoc ); - UpdateResponse res = server.add( docs ); + // Collection docs = new ArrayList(); + // docs.add( solrDoc ); + UpdateResponse res = server.add( solrDoc ); log.debug("response after adding docs to server: "+ res); }else{ log.debug("removing from index " + ind.getURI()); @@ -125,12 +130,14 @@ public class SolrIndexer implements IndexerIface { } catch(IOException e){ log.error("Could not commit to solr server", e); }finally{ - if(CalculateParameters.betaMap!=null){ - CalculateParameters.betaMap.clear(); - CalculateParameters.betaMap = null; + if(!individualToSolrDoc.documentModifiers.isEmpty()){ + if(individualToSolrDoc.documentModifiers.get(0) instanceof CalculateParameters){ + CalculateParameters c = (CalculateParameters) individualToSolrDoc.documentModifiers.get(0); + c.clearMap(); + log.info("BetaMap cleared"); + } } } - try { server.optimize(); } catch (Exception e) {