threading code committed.

This commit is contained in:
anupsawant 2011-06-21 20:20:25 +00:00
parent 98b1fa6567
commit c69a3f9f93
10 changed files with 194 additions and 126 deletions

View file

@ -41,7 +41,8 @@ log4j.logger.edu.cornell.mannlib.vitro.webapp.dao.jena.RDBGraphGenerator=WARN
#log4j.logger.edu.cornell.mannlib.vitro.webapp.search.solr.ContextNodeFields=DEBUG
log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder=INFO
log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexBuilder=DEBUG
log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexThread=DEBUG
log4j.logger.edu.cornell.mannlib.vitro.webapp.search.indexing.IndexWorkerThread=INFO
log4j.logger.edu.cornell.mannlib.vitro.webapp.search.solr.SolrIndexer=INFO
# suppress odd warnings from libraries
log4j.logger.org.openjena.riot=FATAL
log4j.logger.org.directwebremoting=FATAL

View file

@ -9,8 +9,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
import java.util.Queue;
import javax.servlet.ServletContext;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -21,6 +24,7 @@ import edu.cornell.mannlib.vitro.webapp.dao.VClassDao;
import edu.cornell.mannlib.vitro.webapp.dao.VitroVocabulary;
import edu.cornell.mannlib.vitro.webapp.dao.WebappDaoFactory;
import edu.cornell.mannlib.vitro.webapp.search.beans.ObjectSourceIface;
import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters;
/**
@ -285,8 +289,27 @@ public class IndexBuilder extends Thread {
private void indexForSource(Iterator<Individual> individuals , boolean newDocs) throws AbortIndexing{
long starttime = System.currentTimeMillis();
long count = 0;
// long starttime = System.currentTimeMillis();
int count = 0;
int numOfThreads = 10;
List<IndexWorkerThread> workers = new ArrayList<IndexWorkerThread>();
boolean distributing = true;
for(int i = 0; i< numOfThreads ;i++){
workers.add(new IndexWorkerThread(indexer,i,distributing)); // made a pool of workers
}
log.info("Indexing worker pool ready for indexing.");
// starting worker threads
for(int i =0; i < numOfThreads; i++){
workers.get(i).start();
}
while(individuals.hasNext()){
if( stopRequested )
throw new AbortIndexing();
@ -295,7 +318,10 @@ public class IndexBuilder extends Thread {
try{
ind = individuals.next();
indexer.index(ind, newDocs);
//indexer.index(ind);
workers.get(count%numOfThreads).addToQueue(ind); // adding individual to worker queue.
}catch(Throwable ex){
if( stopRequested || log == null){//log might be null if system is shutting down.
throw new AbortIndexing();
@ -304,21 +330,34 @@ public class IndexBuilder extends Thread {
log.warn("Error indexing individual " + uri + " " + ex.getMessage());
}
count++;
if( log.isDebugEnabled() ){
/* if( log.isDebugEnabled() ){
if( (count % 100 ) == 0 && count > 0 ){
long dt = (System.currentTimeMillis() - starttime);
log.debug("individuals indexed: " + count + " in " + dt + " msec " +
" time pre individual = " + (dt / count) + " msec" );
}
} */
}
for(int i =0 ; i < numOfThreads; i ++){
workers.get(i).setDistributing(false);
}
for(int i =0; i < numOfThreads; i++){
try{
workers.get(i).join();
}catch(InterruptedException e){
log.error(e,e);
}
}
log.info(
/* log.info(
"individuals indexed: " + count + " in " + (System.currentTimeMillis() - starttime) + " msec" +
(count!=0?(" time per individual = " + (System.currentTimeMillis() - starttime)/ count + " msec"):"")
);
);*/
}
/**
* For a list of individuals, this builds a list of dependent resources and returns it.
*/

View file

@ -1,78 +1,97 @@
package edu.cornell.mannlib.vitro.webapp.search.indexing;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.search.solr.IndividualToSolrDocument;
class IndexWorkerThread implements Runnable{
class IndexWorkerThread extends Thread{
private IndexerIface indexer;
private static Log log = LogFactory.getLog(IndexWorkerThread.class);
protected IndividualToSolrDocument individualToSolrDoc;
private IndexerIface indexer = null;
private Log log = LogFactory.getLog(IndexWorkerThread.class);
private static long count=0;
private Queue<Individual> indQueue = new LinkedList<Individual>();
private int threadNum;
private static long starttime = 0;
private boolean distributing;
public IndexWorkerThread(IndexerIface indexer){
public IndexWorkerThread(IndexerIface indexer, int threadNum,boolean distributing){
this.indexer = indexer;
this.threadNum = threadNum;
this.distributing = distributing;
synchronized(this){
if(starttime == 0)
starttime = System.currentTimeMillis();
}
}
public void addToQueue(Individual ind, boolean newDocs){
public void addToQueue(Individual ind){
synchronized(indQueue){
indQueue.offer(ind);
indQueue.notify();
}
}
public void shutdown() {
public boolean isQueueEmpty(){
return indQueue.isEmpty();
}
public void setDistributing(boolean distributing){
this.distributing = distributing;
}
public void run(){
//check for work
//if work found,
// translate
// send to server
//sleep (1000)
}
/*protected void indexInd() throws AbortIndexing{
long starttime = System.currentTimeMillis();
long count = 0;
Iterator<Individual> individuals = firstList.iterator();
while(individuals.hasNext()){
if( stopRequested )
throw new AbortIndexing();
Individual ind = null;
while(this.distributing){
synchronized(indQueue){
try{
ind = individuals.next();
indexer.index(ind, newDocs);
}catch(Throwable ex){
if( stopRequested || log == null){//log might be null if system is shutting down.
throw new AbortIndexing();
while(indQueue.isEmpty() && this.distributing){
try{
log.debug("Worker number " + threadNum + " waiting on some work to be alloted.");
indQueue.wait(1000);
}catch(InterruptedException ie){
log.error(ie,ie);
}
String uri = ind!=null?ind.getURI():"null";
log.warn("Error indexing individual from separate thread" + uri + " " + ex.getMessage());
}
Thread.sleep(50); //wait a bit to let a bit more work to come into the queue
log.debug("work found for Woker number " + threadNum);
addDocsToIndex();
} catch (InterruptedException e) {
log.debug("Worker number " + threadNum + " woken up",e);
}
catch(Throwable e){
log.error(e,e);
}
}
}
log.info("Worker number " + threadNum + " exiting.");
}
protected void addDocsToIndex() throws IndexingException{
while(!indQueue.isEmpty()){
indexer.index(indQueue.poll());
synchronized(this){
count++;
if( log.isDebugEnabled() ){
if( log.isInfoEnabled() ){
if( (count % 100 ) == 0 && count > 0 ){
long dt = (System.currentTimeMillis() - starttime);
log.debug("individuals indexed from seperate thread: " + count + " in " + dt + " msec " +
" time pre individual from seperate thread = " + (dt / count) + " msec" );
log.info("individuals indexed: " + count + " in " + dt + " msec " +
" time per individual = " + (dt / count) + " msec" );
}
}
}
}
private class AbortIndexing extends Exception {
// Just a vanilla exception
} */
}
}

View file

@ -38,7 +38,7 @@ public interface IndexerIface {
* @param newDoc - if true, just insert doc, if false attempt to update.
* @throws IndexingException
*/
public void index(Individual ind, boolean newDoc)throws IndexingException;
public void index(Individual ind)throws IndexingException;
/**

View file

@ -474,4 +474,10 @@ public class LuceneIndexer implements IndexerIface {
}
}
}
@Override
public void index(Individual ind) throws IndexingException {
// TODO Auto-generated method stub
}
}

View file

@ -1,6 +1,7 @@
package edu.cornell.mannlib.vitro.webapp.search.solr;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
@ -40,8 +41,7 @@ public class CalculateParameters implements DocumentModifier {
private Dataset dataset;
public static int totalInd=1;
public static Map<String,Float> betaMap = new Hashtable<String,Float>();
private float phi;
protected Map<String,Float> betaMap = new Hashtable<String,Float>();
private static final String prefix = "prefix owl: <http://www.w3.org/2002/07/owl#> "
+ " prefix vitroDisplay: <http://vitro.mannlib.cornell.edu/ontologies/display/1.1#> "
+ " prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> "
@ -81,6 +81,10 @@ public class CalculateParameters implements DocumentModifier {
new Thread(new TotalInd(this.dataset,totalCountQuery)).start();
}
public CalculateParameters(){
super();
}
public float calculateBeta(String uri){
float beta=0;
int Conn=0;
@ -116,19 +120,12 @@ public class CalculateParameters implements DocumentModifier {
StringTokenizer nodes = new StringTokenizer(adjNodes.toString()," ");
String uri=null;
float beta=0;
int size=0;
phi = 0.1F;
float phi = 0.1F;
while(nodes.hasMoreTokens()){
size++;
uri = nodes.nextToken();
if(hasBeta(uri)){ // get if already calculated
phi += getBeta(uri);
}else{ // query if not calculated and put in map
beta = calculateBeta(uri);
setBeta(uri, beta);
phi+=beta;
}
}
if(size>0)
phi = (float)phi/size;
@ -137,20 +134,21 @@ public class CalculateParameters implements DocumentModifier {
return phi;
}
public Float getBeta(String uri){
return betaMap.get(uri);
}
public float getPhi(){
return phi;
}
public boolean hasBeta(String uri){
return betaMap.containsKey(uri);
}
public void setBeta(String uri, float beta){
public synchronized Float getBeta(String uri){
float beta;
if(betaMap.containsKey(uri)){
beta = betaMap.get(uri);
}else{
beta = calculateBeta(uri); // or calculate & put in map
betaMap.put(uri, beta);
}
return beta;
public String[] getAdjacentNodes(String uri,boolean isPerson){
}
public String[] getAdjacentNodes(String uri){
List<String> queryList = new ArrayList<String>();
Set<String> adjacentNodes = new HashSet<String>();
@ -222,9 +220,9 @@ public class CalculateParameters implements DocumentModifier {
RDFNode coauthor = null;
try{
while(queryItr.hasNext()){
if(!isPerson){
/*if(!isPerson){
queryItr.next(); // we don't want first query to execute if the ind is not a person.
}
}*/
query = QueryFactory.create(queryItr.next(),Syntax.syntaxARQ);
QueryExecution qexec = QueryExecutionFactory.create(query,dataset,initialBinding);
try{
@ -276,26 +274,17 @@ public class CalculateParameters implements DocumentModifier {
}
@Override
public void modifyDocument(Individual individual, SolrInputDocument doc) {
public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) {
// TODO Auto-generated method stub
// calculate beta value.
log.debug("Parameter calculation starts..");
float beta = 0;
String uri = individual.getURI();
if(hasBeta(uri)){
beta = getBeta(uri);
}else{
beta = calculateBeta(uri); // or calculate & put in map
setBeta(uri,beta);
}
boolean isPerson = (IndividualToSolrDocument.superClassNames.contains("Person")) ? true : false ;
String adjInfo[] = getAdjacentNodes(uri,isPerson);
String adjInfo[] = getAdjacentNodes(uri);
StringBuffer info = new StringBuffer();
info.append(adjInfo[0]);
info.append(IndividualToSolrDocument.addUri.toString());
phi = calculatePhi(info);
info.append(addUri.toString());
float phi = calculatePhi(info);
for(String term: fieldsToAddBetaTo){
SolrInputField f = doc.getField( term );
@ -314,6 +303,10 @@ public class CalculateParameters implements DocumentModifier {
log.debug("Parameter calculation is done");
}
public void clearMap(){
betaMap.clear();
}
}
class TotalInd implements Runnable{

View file

@ -103,7 +103,7 @@ public class ContextNodeFields implements DocumentModifier{
@Override
public void modifyDocument(Individual individual, SolrInputDocument doc) {
public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri) {
log.debug("retrieving context node values..");
@ -111,7 +111,7 @@ public class ContextNodeFields implements DocumentModifier{
SolrInputField targetField = doc.getField(VitroTermNames.targetInfo);
StringBuffer objectProperties = new StringBuffer();
if(IndividualToSolrDocument.superClassNames.contains("Agent")){
objectProperties.append(" ");
int threadCount = multiValuedQueriesForAgent.size();
@ -135,15 +135,16 @@ public class ContextNodeFields implements DocumentModifier{
log.error("Thread " + threads[i].getName() + " interrupted!");
}
}
}
if(IndividualToSolrDocument.superClassNames.contains("InformationResource")){
targetField.addValue(" " + runQuery(individual, multiValuedQueryForInformationResource), targetField.getBoost());
}
field.addValue(objectProperties, field.getBoost());
log.debug("context node values are retrieved");
}
//single valued queries for foaf:Agent
@ -388,6 +389,7 @@ public class ContextNodeFields implements DocumentModifier{
}
private class QueryRunner extends Thread{
private Individual ind;

View file

@ -10,5 +10,6 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual;
* This interface represents an object that can add to a SolrInputDocument.
*/
public interface DocumentModifier {
public void modifyDocument(Individual individual, SolrInputDocument doc);
public void modifyDocument(Individual individual, SolrInputDocument doc, StringBuffer addUri);
}

View file

@ -38,14 +38,12 @@ public class IndividualToSolrDocument {
private IndividualProhibitedFromSearch individualProhibitedFromSearch;
public static ArrayList<String> superClassNames = null;
public static StringBuffer addUri = null;
private List<DocumentModifier> documentModifiers = new ArrayList<DocumentModifier>();
public List<DocumentModifier> documentModifiers = new ArrayList<DocumentModifier>();
private static List<String> contextNodeClassNames = new ArrayList<String>();
public IndividualToSolrDocument(
ClassProhibitedFromSearch classesProhibitedFromSearch,
IndividualProhibitedFromSearch individualProhibitedFromSearch){
@ -68,6 +66,8 @@ public class IndividualToSolrDocument {
@SuppressWarnings("static-access")
public SolrInputDocument translate(Individual ind) throws IndexingException{
long tProhibited = System.currentTimeMillis();
ArrayList<String> superClassNames = null;
StringBuffer addUri = null;
String value;
StringBuffer classPublicNames = new StringBuffer();
classPublicNames.append("");
@ -193,7 +193,7 @@ public class IndividualToSolrDocument {
}
}
if(documentModifiers == null){
if(documentModifiers == null || documentModifiers.isEmpty()){
doc.addField(term.NAME_RAW, value, NAME_BOOST);
doc.addField(term.NAME_LOWERCASE, value.toLowerCase(),NAME_BOOST);
doc.addField(term.NAME_UNSTEMMED, value,NAME_BOOST);
@ -210,7 +210,7 @@ public class IndividualToSolrDocument {
long tMoniker = System.currentTimeMillis();
if(documentModifiers == null){
if(documentModifiers == null || documentModifiers.isEmpty()){
//boost for entity
if(ind.getSearchBoost() != null && ind.getSearchBoost() != 0)
doc.setDocumentBoost(ind.getSearchBoost());
@ -269,10 +269,10 @@ public class IndividualToSolrDocument {
doc.addField(term.ALLTEXT_PHONETIC, alltext,PHONETIC_BOOST);
//run the document modifiers
if( documentModifiers != null ){
if( documentModifiers != null && !documentModifiers.isEmpty()){
doc.addField(term.targetInfo,"");
for(DocumentModifier modifier: documentModifiers){
modifier.modifyDocument(ind, doc);
modifier.modifyDocument(ind, doc, addUri);
}
}
}

View file

@ -19,6 +19,7 @@ import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.search.IndexingException;
import edu.cornell.mannlib.vitro.webapp.search.docbuilder.Obj2DocIface;
import edu.cornell.mannlib.vitro.webapp.search.indexing.IndexerIface;
import edu.cornell.mannlib.vitro.webapp.search.solr.CalculateParameters;
public class SolrIndexer implements IndexerIface {
private final static Log log = LogFactory.getLog(SolrIndexer.class);
@ -34,7 +35,7 @@ public class SolrIndexer implements IndexerIface {
}
@Override
public synchronized void index(Individual ind, boolean newDoc) throws IndexingException {
public void index(Individual ind) throws IndexingException {
if( ! indexing )
throw new IndexingException("SolrIndexer: must call " +
"startIndexing() before index().");
@ -47,15 +48,19 @@ public class SolrIndexer implements IndexerIface {
log.debug("already indexed " + ind.getURI() );
return;
}else{
SolrInputDocument solrDoc = null;
synchronized(this){
urisIndexed.add(ind.getURI());
}
log.debug("indexing " + ind.getURI());
SolrInputDocument solrDoc = individualToSolrDoc.translate(ind);
// synchronized(individualToSolrDoc){
solrDoc = individualToSolrDoc.translate(ind);
// }
if( solrDoc != null){
//sending each doc individually is inefficient
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
docs.add( solrDoc );
UpdateResponse res = server.add( docs );
// Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
// docs.add( solrDoc );
UpdateResponse res = server.add( solrDoc );
log.debug("response after adding docs to server: "+ res);
}else{
log.debug("removing from index " + ind.getURI());
@ -125,12 +130,14 @@ public class SolrIndexer implements IndexerIface {
} catch(IOException e){
log.error("Could not commit to solr server", e);
}finally{
if(CalculateParameters.betaMap!=null){
CalculateParameters.betaMap.clear();
CalculateParameters.betaMap = null;
if(!individualToSolrDoc.documentModifiers.isEmpty()){
if(individualToSolrDoc.documentModifiers.get(0) instanceof CalculateParameters){
CalculateParameters c = (CalculateParameters) individualToSolrDoc.documentModifiers.get(0);
c.clearMap();
log.info("BetaMap cleared");
}
}
}
try {
server.optimize();
} catch (Exception e) {