[VIVO-1031] Add affinity for caches to ensure only one task can run at a time. fix background thread use in Visualisations, add startup servlet to get the rdf service, add comments

This commit is contained in:
grahamtriggs 2015-10-18 14:28:55 +01:00
parent 34ead82676
commit 6d74adccd8
8 changed files with 627 additions and 306 deletions

View file

@ -55,6 +55,8 @@ edu.cornell.mannlib.vitro.webapp.auth.policy.RestrictHomeMenuItemEditingPolicy$S
edu.cornell.mannlib.vitro.webapp.services.shortview.ShortViewServiceSetup
edu.cornell.mannlib.vitro.webapp.visualization.setup.VisualizationSetup
edu.ucsf.vitro.opensocial.OpenSocialSmokeTests
# For multiple language support

View file

@ -3,21 +3,14 @@
package edu.cornell.mannlib.vitro.webapp.visualization.mapofscience;
import java.io.IOException;
import java.io.InputStream;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.ResultSetFactory;
import com.hp.hpl.jena.rdf.model.Resource;
import edu.cornell.mannlib.vitro.webapp.beans.Individual;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.visualization.constants.QueryConstants;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CachingRDFServiceExecutor;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.OrgUtils;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches;
import mapping.ScienceMapping;
@ -97,8 +90,8 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq
RDFService rdfService = vitroRequest.getRDFService();
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService);
Map<String, String> publicationToJournalMap = cachedPublicationToJournal.get(rdfService);
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService);
Map<String, String> publicationToJournalMap = VisualizationCaches.publicationToJournal.get(rdfService);
if (!personToPublicationMap.containsKey(subjectEntityURI)) {
if (VisConstants.DataVisMode.JSON.equals(dataOuputFormat)) {
@ -166,7 +159,7 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq
RDFService rdfService = vitroRequest.getRDFService();
Map<String, String> orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService);
Map<String, String> orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService);
if (orgLabelMap.get(subjectEntityURI) == null) {
if (VisConstants.DataVisMode.JSON.equals(dataOuputFormat)) {
@ -176,10 +169,10 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq
}
}
Map<String, Set<String>> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService);
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService);
Map<String, String> publicationToJournalMap = cachedPublicationToJournal.get(rdfService);
Map<String, Set<String>> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService);
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService);
Map<String, String> publicationToJournalMap = VisualizationCaches.publicationToJournal.get(rdfService);
Set<String> orgPublications = new HashSet<String>();
Set<String> orgPublicationsPeople = new HashSet<String>();
@ -540,44 +533,6 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq
return null;
}
private static CachingRDFServiceExecutor<Map<String, String>> cachedPublicationToJournal =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
"SELECT ?document ?journalLabel\n" +
"WHERE\n" +
"{\n" +
" ?document a bibo:Document .\n" +
" ?document core:hasPublicationVenue ?journal . \n" +
" ?journal rdfs:label ?journalLabel . \n" +
"}\n";
Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
String document = qs.getResource("document").getURI();
String journalLabel = qs.getLiteral("journalLabel").getString();
map.put(document, journalLabel);
}
} finally {
silentlyClose(is);
}
return map;
}
}
);
private static class JournalPublicationCounts {
Map<String, Integer> map = new HashMap<String, Integer>();
int noJournalCount = 0;
@ -606,13 +561,4 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq
return total;
}
}
private static void silentlyClose(InputStream is) {
try {
if (is != null) {
is.close();
}
} catch (Throwable t) {
}
}
}

View file

@ -9,6 +9,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches;
import org.apache.commons.logging.Log;
import com.google.gson.Gson;
@ -83,6 +84,8 @@ public class ModelConstructorRequestHandler implements
private Map<String, String> regenerateConstructedModels(VitroRequest vitroRequest,
Dataset dataSource) {
VisualizationCaches.rebuildAll(vitroRequest.getRDFService());
List<ConstructedModel> refreshedModels = new ArrayList<ConstructedModel>();
Set<String> currentModelIdentifiers = new HashSet<String>(ConstructedModelTracker.getAllModels().keySet());
@ -117,6 +120,7 @@ public class ModelConstructorRequestHandler implements
fileData.put(DataVisualizationController.FILE_CONTENT_KEY,
json.toJson(refreshedModels));
return fileData;
}

View file

@ -0,0 +1,43 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.visualization.setup;
import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CachingRDFServiceExecutor;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
/**
* Setup class for Visualizations, in order to record a context-referenced RDFService
*
* If this class is missing, caches can not be refreshed in the background
*/
public class VisualizationSetup implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
ServletContext ctx = sce.getServletContext();
RDFService rdfService = ModelAccess.on(ctx).getRDFService();
CachingRDFServiceExecutor.setBackgroundRDFService(rdfService);
/**
* Currently disabled, but if you want the Visualization caches to be "warmed" during startup,
* uncomment the line below.
*
* NB: Caches will refresh in the background, it won't pause the startup
*/
// VisualizationCaches.rebuildAll();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}

View file

@ -2,7 +2,6 @@
package edu.cornell.mannlib.vitro.webapp.visualization.temporalgraph;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@ -14,7 +13,6 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CounterUtils;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.OrgUtils;
import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Individual;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -28,15 +26,10 @@ import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.Res
import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.TemplateResponseValues;
import edu.cornell.mannlib.vitro.webapp.controller.visualization.DataVisualizationController;
import edu.cornell.mannlib.vitro.webapp.controller.visualization.VisualizationFrameworkConstants;
import edu.cornell.mannlib.vitro.webapp.visualization.constants.VOConstants;
import edu.cornell.mannlib.vitro.webapp.visualization.constants.VisConstants;
import edu.cornell.mannlib.vitro.webapp.visualization.exceptions.MalformedQueryParametersException;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Activity;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Entity;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.SubEntity;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.json.JsonObject;
import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.json.SubjectEntityJSON;
import edu.cornell.mannlib.vitro.webapp.visualization.visutils.SelectOnModelUtilities;
import edu.cornell.mannlib.vitro.webapp.visualization.visutils.UtilityFunctions;
import edu.cornell.mannlib.vitro.webapp.visualization.visutils.VisualizationRequestHandler;
@ -143,8 +136,8 @@ public class TemporalGrantVisualizationRequestHandler implements
RDFService rdfService = vitroRequest.getRDFService();
Map<String, String> orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService);
Map<String, String> personLabelMap = VisualizationCaches.cachedPersonLabels.get(rdfService);
Map<String, String> orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService);
Map<String, String> personLabelMap = VisualizationCaches.personLabels.get(rdfService);
if (orgLabelMap.get(subjectEntityURI) == null) {
if (VisConstants.DataVisMode.JSON.equals(visMode)) {
@ -154,12 +147,12 @@ public class TemporalGrantVisualizationRequestHandler implements
}
}
Map<String, Set<String>> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService);
Map<String, String> orgMostSpecificLabelMap = VisualizationCaches.cachedOrganizationToMostSpecificLabel.get(rdfService);
Map<String, String> personMostSpecificLabelMap = VisualizationCaches.cachedPersonToMostSpecificLabel.get(rdfService);
Map<String, Set<String>> personToGrantMap = VisualizationCaches.cachedPersonToGrant.get(rdfService);
Map<String, String> grantToYearMap = VisualizationCaches.cachedGrantToYear.get(rdfService);
Map<String, Set<String>> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService);
Map<String, String> orgMostSpecificLabelMap = VisualizationCaches.organizationToMostSpecificLabel.get(rdfService);
Map<String, String> personMostSpecificLabelMap = VisualizationCaches.personToMostSpecificLabel.get(rdfService);
Map<String, Set<String>> personToGrantMap = VisualizationCaches.personToGrant.get(rdfService);
Map<String, String> grantToYearMap = VisualizationCaches.grantToYear.get(rdfService);
Set<String> orgGrants = new HashSet<String>();
Set<String> orgGrantsPeople = new HashSet<String>();

View file

@ -76,8 +76,8 @@ public class TemporalPublicationVisualizationRequestHandler implements
RDFService rdfService = vitroRequest.getRDFService();
Map<String, String> orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService);
Map<String, String> personLabelMap = VisualizationCaches.cachedPersonLabels.get(rdfService);
Map<String, String> orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService);
Map<String, String> personLabelMap = VisualizationCaches.personLabels.get(rdfService);
if (orgLabelMap.get(subjectEntityURI) == null) {
if (VisConstants.DataVisMode.JSON.equals(visMode)) {
@ -87,12 +87,12 @@ public class TemporalPublicationVisualizationRequestHandler implements
}
}
Map<String, Set<String>> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService);
Map<String, String> orgMostSpecificLabelMap = VisualizationCaches.cachedOrganizationToMostSpecificLabel.get(rdfService);
Map<String, String> personMostSpecificLabelMap = VisualizationCaches.cachedPersonToMostSpecificLabel.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService);
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService);
Map<String, String> publicationToYearMap = VisualizationCaches.cachedPublicationToYear.get(rdfService);
Map<String, Set<String>> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService);
Map<String, String> orgMostSpecificLabelMap = VisualizationCaches.organizationToMostSpecificLabel.get(rdfService);
Map<String, String> personMostSpecificLabelMap = VisualizationCaches.personToMostSpecificLabel.get(rdfService);
Map<String, Set<String>> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService);
Map<String, Set<String>> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService);
Map<String, String> publicationToYearMap = VisualizationCaches.publicationToYear.get(rdfService);
Set<String> orgPublications = new HashSet<String>();
Set<String> orgPublicationsPeople = new HashSet<String>();

View file

@ -1,87 +1,323 @@
package edu.cornell.mannlib.vitro.webapp.visualization.utilities;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Utilicy class that populates and returns a cache.
* Once the cache is populated, it can return the cached results whilst refreshing in the background.
*
* @param <T>
*/
public class CachingRDFServiceExecutor<T> {
/**
* Cache information
*/
private T cachedResults;
private long lastCacheTime;
private RDFServiceCallable<T> resultBuilder;
/**
* Background task tracker
*/
private FutureTask<T> backgroundTask = null;
private long backgroundTaskStartTime = -1;
/**
* RDF Service to be used by background threads
*/
private static RDFService backgroundRDFService = null;
public CachingRDFServiceExecutor(RDFServiceCallable<T> resultBuilder) {
this.resultBuilder = resultBuilder;
}
/**
* Return the cached results if present, or start the task.
* Will wait for completion if the cache is not already populated, otherwise the refresh will happen in the background.
*
* @param rdfService an RDF service to use, in foreground mode, if the background service is missing
* @return
*/
public synchronized T get(RDFService rdfService) {
// First, check if there are results from the previous background task, and update the cache
if (backgroundTask != null && backgroundTask.isDone()) {
completeBackgroundTask();
}
// If we have cached results
if (cachedResults != null) {
if (!resultBuilder.invalidateCache(System.currentTimeMillis() - lastCacheTime)) {
// If the background service exists, and the cache is considered invalid
if (backgroundRDFService != null && resultBuilder.invalidateCache(System.currentTimeMillis() - lastCacheTime)) {
// Determine how long we are prepared to wait for an answer
long waitFor = (backgroundTask == null ? 1000 : 500);
// Start the background task to refresh the cache
startBackgroundTask(rdfService);
// See if we expect it to complete in time, and if so, wait for it
if (isExpectedToCompleteIn(waitFor)) {
completeBackgroundTask(waitFor);
}
}
} else {
// No cached results, so fetch the results using any availabe RDF service
if (rdfService != null) {
startBackgroundTask(rdfService);
} else if (backgroundRDFService != null) {
startBackgroundTask(backgroundRDFService);
} else {
throw new RuntimeException("Can't execute without an RDF Service");
}
// As there are no cached results, wait for an answer regardless of the RDF service used
completeBackgroundTask();
}
return cachedResults;
}
/**
* (Re)build the current cache
*
* @param rdfService an RDF service to use, if the background RDF service is not set
*/
public synchronized void build(RDFService rdfService) {
// First, check if there are results from the previous background task, and update the cache
if (backgroundTask != null && backgroundTask.isDone()) {
completeBackgroundTask();
}
try {
if (backgroundTask == null) {
resultBuilder.setRDFService(rdfService);
backgroundTask = new FutureTask<T>(resultBuilder);
// If we have a background RDF service, we can launch the task in the background and leave it
if (backgroundRDFService != null) {
startBackgroundTask(backgroundRDFService);
} else if (rdfService != null) {
// No background service, so use the paassed RDF service, and wait for completion
startBackgroundTask(backgroundRDFService);
completeBackgroundTask();
}
}
Thread thread = new Thread(backgroundTask);
/**
* Determine if a task is likely to complete with the time frame specified
*
* @param interval - time in milliseconds that you want the task to complete in
* @return true if the task is likely to complete
*/
private boolean isExpectedToCompleteIn(long interval) {
// If there is no background task, there is nothing to complete
if (backgroundTask == null) {
return false;
}
// If the task has already completed, then return true
if (backgroundTask.isDone()) {
return true;
}
// Get the current time
long now = System.currentTimeMillis();
// If the task has started, and has a previous execution time
if (resultBuilder.startedAt > -1 && resultBuilder.executionTime > -1) {
// Estimate a finish time, based on when the task started, and how long it last took
long expectedFinish = resultBuilder.startedAt + resultBuilder.executionTime;
// If we expect it to complete before the interval passes, return true
if (expectedFinish < (now + interval)) {
return true;
}
}
// We expect the task to take longer than the timeout, so return false
return false;
}
/**
* Create and start a background thread using the configured task
* @param rdfService
*/
private void startBackgroundTask(RDFService rdfService) {
// Ensure that there isn't already a task
if (backgroundTask == null && rdfService != null) {
// Set an RDF service to use
resultBuilder.setRDFService(backgroundRDFService != null ? backgroundRDFService : rdfService);
// Create the background task, and record the time
backgroundTask = new FutureTask<T>(resultBuilder);
backgroundTaskStartTime = System.currentTimeMillis();
// Start a background thread, ensuring that it can be terminated by the host
Thread thread = new VitroBackgroundThread(backgroundTask, resultBuilder.getClass().getName());
thread.setDaemon(true);
thread.start();
if (cachedResults == null || resultBuilder.executionTime < 2000) {
completeBackgroundTask();
}
} else if (backgroundTask.isDone()) {
completeBackgroundTask();
}
} catch (InterruptedException e) {
abortBackgroundTask();
} catch (ExecutionException e) {
abortBackgroundTask();
throw new RuntimeException("Background RDF thread through an exception", e.getCause());
}
return cachedResults;
}
/**
* Abort the current background task
*/
private void abortBackgroundTask() {
// Ensure that we have a background task
if (backgroundTask != null) {
// Cancel the background task and clear the start time
backgroundTask.cancel(true);
backgroundTask = null;
backgroundTaskStartTime = -1;
}
}
private void completeBackgroundTask() throws InterruptedException, ExecutionException {
/**
* Complete the background task
*/
private void completeBackgroundTask() {
completeBackgroundTask(-1);
}
/**
* Complete the background task
* @param waitFor - maximum time to wait for the results, -1 if forever
*/
private void completeBackgroundTask(long waitFor) {
try {
// If we have a background task
if (backgroundTask != null) {
// Update the cached results
if (waitFor < 0) {
cachedResults = backgroundTask.get();
lastCacheTime = System.currentTimeMillis();
} else {
cachedResults = backgroundTask.get(waitFor, TimeUnit.MILLISECONDS);
}
// Set the time of the cache equal to the start time of the task that generated the results
lastCacheTime = backgroundTaskStartTime;
// Clear the background task information
backgroundTask = null;
backgroundTaskStartTime = -1;
}
} catch (InterruptedException e) {
// Task was interrupted, so abort it
abortBackgroundTask();
} catch (ExecutionException e) {
// There was a problem inside the task, so abort and throw an exception
try {
abortBackgroundTask();
} finally {
throw new RuntimeException("Background RDF thread through an exception", e.getCause());
}
} catch (TimeoutException e) {
// Ignore a timeout waiting for the results
}
}
/**
* Set the RDF service to be used for background threads (called from a startup servlet)
* @param rdfService
*/
public static void setBackgroundRDFService(RDFService rdfService) {
backgroundRDFService = rdfService;
}
/**
* Class to be implemented by user to provide the means of generating the results
* @param <T>
*/
public static abstract class RDFServiceCallable<T> implements Callable<T> {
// The RDF Service
private RDFService rdfService;
// Start and execution times
private long startedAt = -1;
private long executionTime = -1;
// Affinity object to prevent tasks with the same affinity from running in parallel
private Affinity affinity = null;
/**
* Default constructor
*/
public RDFServiceCallable() { }
/**
* Constructor that allows an affinity object to be supplied
* @param affinity
*/
public RDFServiceCallable(Affinity affinity) { this.affinity = affinity; }
/**
* Set the RDF service to be used
* @param rdfService
*/
final void setRDFService(RDFService rdfService) {
this.rdfService = rdfService;
}
/**
* Entry point for the background threads, ensuring the right start / cleanup is done
* @return
* @throws Exception
*/
@Override
final public T call() throws Exception {
long start = System.currentTimeMillis();
T val = callWithService(rdfService);
executionTime = System.currentTimeMillis() - start;
return val;
try {
// If we have an affinity object
if (affinity != null) {
// Ask for permission to process processing
affinity.requestStart(executionTime);
}
// Record the start time
startedAt = System.currentTimeMillis();
// Call the user implementation, passing the RDF service
T val = callWithService(rdfService);
// Record how long it to to execute
executionTime = System.currentTimeMillis() - startedAt;
// Return the results
return val;
} finally {
// Ensure that we reset the start time
startedAt = -1;
// Tell any affinity object that we have completed
if (affinity != null) {
affinity.complete();
}
}
}
/**
* Method for users to implement, to return the results
* @param rdfService
* @return
* @throws Exception
*/
protected abstract T callWithService(RDFService rdfService) throws Exception;
/**
* Method to determine if the cache should be invalidated for the current results
* Default implementation dynamically adjusts the cache time based on the efficiency of creating results
* @param timeCached
* @return
*/
boolean invalidateCache(long timeCached) {
if (executionTime > -1) {
/*
@ -102,7 +338,98 @@ public class CachingRDFServiceExecutor<T> {
return timeCached > Math.min(executionTime * 120, 86400000);
}
return false;
}
}
/**
* Affinity class that serializes background processing for tasks given the same affinity
*/
public static class Affinity {
private int maxThreads = 1;
// Map of executing threads, and the time they expect to need to execute
private Map<Thread, Long> threadToExecutionTime = new HashMap<>();
private Set<Thread> executingThreads = new HashSet<>();
/**
* Called by a background thread to determine if it is allowed to start
* @param expectedExecutionTime time that the thread expects to take (usualling the last execution time)
*/
private void requestStart(long expectedExecutionTime) {
// Ask if the task needs to be queued
if (queueThis(Thread.currentThread(), expectedExecutionTime)) {
// Synchronize the thread to call wait
synchronized (Thread.currentThread()) {
try {
// Make the thread wait until it is notified to continue
Thread.currentThread().wait();
} catch(InterruptedException e) {
}
}
}
}
/**
* Adds a thread to the map, returns whether the thread needs to wait
* @param thread
* @param time
* @return true if the thread needs to wait, false if it can continue
*/
private synchronized boolean queueThis(Thread thread, Long time) {
// If we have fewer that the max threads running
if (executingThreads.size() < maxThreads) {
// Add thread to executing set
executingThreads.add(thread);
// Not queued - we can continue
return false;
} else {
// Add the thread to the map
threadToExecutionTime.put(thread, time);
// Let the caller know that we are queued
return true;
}
}
/**
* Complete a thread
*/
private synchronized void complete() {
// Check that we are tracking this thread
if (executingThreads.contains(Thread.currentThread())) {
// Remove the thread from the map
executingThreads.remove(Thread.currentThread());
// If there are still threads to execute, and we have not exhausted maximum threads
while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) {
Thread nextToRelease = null;
long executionTime = -1;
// Find the thread that expects to take the least time
for (Thread thread : threadToExecutionTime.keySet()) {
long thisTime = threadToExecutionTime.get(thread);
if (nextToRelease == null) {
nextToRelease = thread;
executionTime = thisTime;
} else if (thisTime < executionTime) {
nextToRelease = thread;
executionTime = thisTime;
}
}
// Synchronize on the thread we are releasing, and notify it to continue
synchronized (nextToRelease) {
threadToExecutionTime.remove(nextToRelease);
executingThreads.add(nextToRelease);
nextToRelease.notify();
}
}
}
}
}
}

View file

@ -1,24 +1,56 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.visualization.utilities;
import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.ResultSetFactory;
import com.hp.hpl.jena.rdf.model.Resource;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.visualization.constants.QueryConstants;
import edu.cornell.mannlib.vitro.webapp.visualization.visutils.UtilityFunctions;
import org.joda.time.DateTime;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Holder for the caches we are using in the visualizations
*/
final public class VisualizationCaches {
public static final CachingRDFServiceExecutor<Map<String, String>> cachedOrganizationLabels =
// Affinity object to ensure that only one background thread can be running at once when updating the caches
private static final CachingRDFServiceExecutor.Affinity visualizationAffinity = new CachingRDFServiceExecutor.Affinity();
/**
* Rebuild all the caches
*/
public static void rebuildAll() { rebuildAll(null); }
/**
* Rebuild all the caches
* @param rdfService if not null, use this service in foreground, otherwise may use the background thread
*/
public static void rebuildAll(RDFService rdfService) {
organizationLabels.build(rdfService);
organizationSubOrgs.build(rdfService);
organizationToMostSpecificLabel.build(rdfService);
organisationToPeopleMap.build(rdfService);
personLabels.build(rdfService);
personToMostSpecificLabel.build(rdfService);
personToPublication.build(rdfService);
publicationToJournal.build(rdfService);
publicationToYear.build(rdfService);
personToGrant.build(rdfService);
grantToYear.build(rdfService);
}
/**
* Cache of organization labels (uri -> label)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> organizationLabels =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -29,33 +61,29 @@ final public class VisualizationCaches {
" ?org rdfs:label ?orgLabel .\n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String org = qs.getResource("org").getURI();
String orgLabel = qs.getLiteral("orgLabel").getString();
map.put(org, orgLabel);
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> cachedOrganizationSubOrgs =
/**
* Cache of organization to sub organizations (uri -> list of uris)
*/
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> organizationSubOrgs =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>(visualizationAffinity) {
@Override
protected Map<String, Set<String>> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -66,16 +94,11 @@ final public class VisualizationCaches {
" ?org <http://purl.obolibrary.org/obo/BFO_0000051> ?subOrg .\n" +
"}\n";
Map<String, Set<String>> map = new HashMap<>();
final Map<String, Set<String>> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String org = qs.getResource("org").getURI();
String subOrg = qs.getResource("subOrg").getURI();
@ -88,16 +111,17 @@ final public class VisualizationCaches {
subOrgs.add(subOrg);
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedOrganizationToMostSpecificLabel =
/**
* Organization most specific type label (uri -> string)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> organizationToMostSpecificLabel =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
@Override
@ -111,32 +135,28 @@ final public class VisualizationCaches {
" ?type rdfs:label ?typeLabel .\n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String org = qs.getResource("org").getURI();
String typeLabel = qs.getLiteral("typeLabel").getString();
map.put(org, String.valueOf(typeLabel));
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> cachedOrganisationToPeopleMap =
/**
* Map of people within an organisation (org uri -> list of person uri)
*/
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> organisationToPeopleMap =
new CachingRDFServiceExecutor<Map<String, Set<String>>>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>(visualizationAffinity) {
@Override
protected Map<String, Set<String>> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -145,22 +165,16 @@ final public class VisualizationCaches {
"{\n" +
" ?organisation a foaf:Organization .\n" +
" ?organisation core:relatedBy ?position .\n" +
" ?position a core:Position .\n" +
" ?position core:relates ?person .\n" +
" ?person a foaf:Person .\n" +
"}\n";
// TODO Critical section?
final Map<String, Set<String>> orgToPeopleMap = new HashMap<String, Set<String>>();
Map<String, Set<String>> orgToPeopleMap = new HashMap<String, Set<String>>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String org = qs.getResource("organisation").getURI();
String person = qs.getResource("person").getURI();
@ -173,18 +187,19 @@ final public class VisualizationCaches {
people.add(person);
}
}
} finally {
silentlyClose(is);
}
});
return orgToPeopleMap;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedPersonLabels =
/**
* Display labels for people (uri -> label)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> personLabels =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -195,33 +210,62 @@ final public class VisualizationCaches {
" ?person rdfs:label ?personLabel .\n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String person = qs.getResource("person").getURI();
String personLabel = qs.getLiteral("personLabel").getString();
map.put(person, personLabel);
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> cachedPersonToPublication =
/**
* Most specific type for person (uri -> label)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> personToMostSpecificLabel =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
"SELECT ?person ?typeLabel\n" +
"WHERE\n" +
"{\n" +
" ?person a foaf:Person .\n" +
" ?person vitro:mostSpecificType ?type .\n" +
" ?type rdfs:label ?typeLabel .\n" +
"}\n";
final Map<String, String> map = new HashMap<>();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String person = qs.getResource("person").getURI();
String typeLabel = qs.getLiteral("typeLabel").getString();
map.put(person, String.valueOf(typeLabel));
}
});
return map;
}
}
);
/**
* Person to publication Map (person uri -> list of publication uri)
*/
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> personToPublication =
new CachingRDFServiceExecutor<Map<String, Set<String>>>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>(visualizationAffinity) {
@Override
protected Map<String, Set<String>> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -235,17 +279,11 @@ final public class VisualizationCaches {
" ?document a bibo:Document .\n" +
"}\n";
Map<String, Set<String>> map = new HashMap<String, Set<String>>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
final Map<String, Set<String>> map = new HashMap<String, Set<String>>();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
Resource person = qs.getResource("person");
Resource document = qs.getResource("document");
@ -262,18 +300,53 @@ final public class VisualizationCaches {
}
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedPublicationToYear =
/**
* Publication to journal (publication uri -> journal label)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> publicationToJournal =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
"SELECT ?document ?journalLabel\n" +
"WHERE\n" +
"{\n" +
" ?document a bibo:Document .\n" +
" ?document core:hasPublicationVenue ?journal . \n" +
" ?journal rdfs:label ?journalLabel . \n" +
"}\n";
final Map<String, String> map = new HashMap<>();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String document = qs.getResource("document").getURI();
String journalLabel = qs.getLiteral("journalLabel").getString();
map.put(document, journalLabel);
}
});
return map;
}
}
);
/**
* Publication to year (publication uri -> year)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> publicationToYear =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -285,16 +358,11 @@ final public class VisualizationCaches {
" ?dateTimeValue core:dateTime ?publicationDate . \n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String document = qs.getResource("document").getURI();
String pubDate = qs.getLiteral("publicationDate").getString();
if (pubDate != null) {
@ -306,18 +374,19 @@ final public class VisualizationCaches {
}
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> cachedPersonToGrant =
/**
* Person to grant (person uri -> grant uri)
*/
public static final CachingRDFServiceExecutor<Map<String, Set<String>>> personToGrant =
new CachingRDFServiceExecutor<Map<String, Set<String>>>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, Set<String>>>(visualizationAffinity) {
@Override
protected Map<String, Set<String>> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -331,17 +400,11 @@ final public class VisualizationCaches {
" ?grant a core:Grant .\n" +
"}\n";
Map<String, Set<String>> map = new HashMap<String, Set<String>>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
final Map<String, Set<String>> map = new HashMap<String, Set<String>>();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
Resource person = qs.getResource("person");
Resource grant = qs.getResource("grant");
@ -358,18 +421,19 @@ final public class VisualizationCaches {
}
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedGrantToYear =
/**
* Grant to year (grant uri -> year)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> grantToYear =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -382,16 +446,11 @@ final public class VisualizationCaches {
" ?startDate core:dateTime ?startDateTimeValue . \n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String grant = qs.getResource("grant").getURI();
String startDate = qs.getLiteral("startDateTimeValue").getString();
if (startDate != null) {
@ -403,18 +462,19 @@ final public class VisualizationCaches {
}
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedGrantToRoleYear =
/**
* Grant to year of start in role (grant uri -> year)
*/
public static final CachingRDFServiceExecutor<Map<String, String>> grantToRoleYear =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>(visualizationAffinity) {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
@ -428,16 +488,11 @@ final public class VisualizationCaches {
" ?startDate core:dateTime ?startDateTimeValue . \n" +
"}\n";
Map<String, String> map = new HashMap<>();
final Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
rdfService.sparqlSelectQuery(query, new ResultSetConsumer() {
@Override
protected void processQuerySolution(QuerySolution qs) {
String grant = qs.getResource("grant").getURI();
String startDate = qs.getLiteral("startDateTimeValue").getString();
if (startDate != null) {
@ -449,59 +504,10 @@ final public class VisualizationCaches {
}
}
}
} finally {
silentlyClose(is);
}
});
return map;
}
}
);
public static final CachingRDFServiceExecutor<Map<String, String>> cachedPersonToMostSpecificLabel =
new CachingRDFServiceExecutor<>(
new CachingRDFServiceExecutor.RDFServiceCallable<Map<String, String>>() {
@Override
protected Map<String, String> callWithService(RDFService rdfService) throws Exception {
String query = QueryConstants.getSparqlPrefixQuery() +
"SELECT ?person ?typeLabel\n" +
"WHERE\n" +
"{\n" +
" ?person a foaf:Person .\n" +
" ?person vitro:mostSpecificType ?type .\n" +
" ?type rdfs:label ?typeLabel .\n" +
"}\n";
Map<String, String> map = new HashMap<>();
InputStream is = null;
ResultSet rs = null;
try {
is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON);
rs = ResultSetFactory.fromJSON(is);
while (rs.hasNext()) {
QuerySolution qs = rs.next();
String person = qs.getResource("person").getURI();
String typeLabel = qs.getLiteral("typeLabel").getString();
map.put(person, String.valueOf(typeLabel));
}
} finally {
silentlyClose(is);
}
return map;
}
}
);
private static void silentlyClose(InputStream is) {
try {
if (is != null) {
is.close();
}
} catch (Throwable t) {
}
}
}