From 6d74adccd802f4964bd0f482fd5c6c87aaf0ef2c Mon Sep 17 00:00:00 2001 From: grahamtriggs Date: Sun, 18 Oct 2015 14:28:55 +0100 Subject: [PATCH] [VIVO-1031] Add affinity for caches to ensure only one task can run at a time. fix background thread use in Visualisations, add startup servlet to get the rdf service, add comments --- .../WEB-INF/resources/startup_listeners.txt | 2 + ...pOfScienceVisualizationRequestHandler.java | 68 +-- .../ModelConstructorRequestHandler.java | 4 + .../setup/VisualizationSetup.java | 43 ++ ...poralGrantVisualizationRequestHandler.java | 23 +- ...ublicationVisualizationRequestHandler.java | 16 +- .../utilities/CachingRDFServiceExecutor.java | 385 +++++++++++++++-- .../utilities/VisualizationCaches.java | 392 +++++++++--------- 8 files changed, 627 insertions(+), 306 deletions(-) create mode 100644 src/edu/cornell/mannlib/vitro/webapp/visualization/setup/VisualizationSetup.java diff --git a/productMods/WEB-INF/resources/startup_listeners.txt b/productMods/WEB-INF/resources/startup_listeners.txt index 4aa41172..63c37f33 100644 --- a/productMods/WEB-INF/resources/startup_listeners.txt +++ b/productMods/WEB-INF/resources/startup_listeners.txt @@ -55,6 +55,8 @@ edu.cornell.mannlib.vitro.webapp.auth.policy.RestrictHomeMenuItemEditingPolicy$S edu.cornell.mannlib.vitro.webapp.services.shortview.ShortViewServiceSetup +edu.cornell.mannlib.vitro.webapp.visualization.setup.VisualizationSetup + edu.ucsf.vitro.opensocial.OpenSocialSmokeTests # For multiple language support diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/mapofscience/MapOfScienceVisualizationRequestHandler.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/mapofscience/MapOfScienceVisualizationRequestHandler.java index 0ffd5904..0ed035a1 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/mapofscience/MapOfScienceVisualizationRequestHandler.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/mapofscience/MapOfScienceVisualizationRequestHandler.java @@ -3,21 +3,14 @@ package edu.cornell.mannlib.vitro.webapp.visualization.mapofscience; import java.io.IOException; -import java.io.InputStream; import java.text.DecimalFormat; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; -import com.hp.hpl.jena.query.ResultSetFactory; -import com.hp.hpl.jena.rdf.model.Resource; import edu.cornell.mannlib.vitro.webapp.beans.Individual; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; -import edu.cornell.mannlib.vitro.webapp.visualization.constants.QueryConstants; -import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CachingRDFServiceExecutor; import edu.cornell.mannlib.vitro.webapp.visualization.utilities.OrgUtils; import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches; import mapping.ScienceMapping; @@ -97,8 +90,8 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq RDFService rdfService = vitroRequest.getRDFService(); - Map> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService); - Map publicationToJournalMap = cachedPublicationToJournal.get(rdfService); + Map> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService); + Map publicationToJournalMap = VisualizationCaches.publicationToJournal.get(rdfService); if (!personToPublicationMap.containsKey(subjectEntityURI)) { if (VisConstants.DataVisMode.JSON.equals(dataOuputFormat)) { @@ -166,7 +159,7 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq RDFService rdfService = vitroRequest.getRDFService(); - Map orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService); + Map orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService); if (orgLabelMap.get(subjectEntityURI) == null) { if (VisConstants.DataVisMode.JSON.equals(dataOuputFormat)) { @@ -176,10 +169,10 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq } } - Map> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService); - Map> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService); - Map> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService); - Map publicationToJournalMap = cachedPublicationToJournal.get(rdfService); + Map> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService); + Map> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService); + Map> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService); + Map publicationToJournalMap = VisualizationCaches.publicationToJournal.get(rdfService); Set orgPublications = new HashSet(); Set orgPublicationsPeople = new HashSet(); @@ -540,44 +533,6 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq return null; } - private static CachingRDFServiceExecutor> cachedPublicationToJournal = - new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { - @Override - protected Map callWithService(RDFService rdfService) throws Exception { - String query = QueryConstants.getSparqlPrefixQuery() + - "SELECT ?document ?journalLabel\n" + - "WHERE\n" + - "{\n" + - " ?document a bibo:Document .\n" + - " ?document core:hasPublicationVenue ?journal . \n" + - " ?journal rdfs:label ?journalLabel . \n" + - "}\n"; - - Map map = new HashMap<>(); - - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); - String document = qs.getResource("document").getURI(); - String journalLabel = qs.getLiteral("journalLabel").getString(); - - map.put(document, journalLabel); - } - } finally { - silentlyClose(is); - } - - return map; - } - } - ); - private static class JournalPublicationCounts { Map map = new HashMap(); int noJournalCount = 0; @@ -606,13 +561,4 @@ public class MapOfScienceVisualizationRequestHandler implements VisualizationReq return total; } } - private static void silentlyClose(InputStream is) { - try { - if (is != null) { - is.close(); - } - } catch (Throwable t) { - - } - } } diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/modelconstructor/ModelConstructorRequestHandler.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/modelconstructor/ModelConstructorRequestHandler.java index af6bc40c..4bfd9eaa 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/modelconstructor/ModelConstructorRequestHandler.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/modelconstructor/ModelConstructorRequestHandler.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches; import org.apache.commons.logging.Log; import com.google.gson.Gson; @@ -83,6 +84,8 @@ public class ModelConstructorRequestHandler implements private Map regenerateConstructedModels(VitroRequest vitroRequest, Dataset dataSource) { + VisualizationCaches.rebuildAll(vitroRequest.getRDFService()); + List refreshedModels = new ArrayList(); Set currentModelIdentifiers = new HashSet(ConstructedModelTracker.getAllModels().keySet()); @@ -117,6 +120,7 @@ public class ModelConstructorRequestHandler implements fileData.put(DataVisualizationController.FILE_CONTENT_KEY, json.toJson(refreshedModels)); + return fileData; } diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/setup/VisualizationSetup.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/setup/VisualizationSetup.java new file mode 100644 index 00000000..2e8a485a --- /dev/null +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/setup/VisualizationSetup.java @@ -0,0 +1,43 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.visualization.setup; + +import edu.cornell.mannlib.vitro.webapp.modelaccess.ModelAccess; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CachingRDFServiceExecutor; +import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches; + +import javax.servlet.ServletContext; +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +/** + * Setup class for Visualizations, in order to record a context-referenced RDFService + * + * If this class is missing, caches can not be refreshed in the background + */ +public class VisualizationSetup implements ServletContextListener { + @Override + public void contextInitialized(ServletContextEvent sce) { + ServletContext ctx = sce.getServletContext(); + + RDFService rdfService = ModelAccess.on(ctx).getRDFService(); + + CachingRDFServiceExecutor.setBackgroundRDFService(rdfService); + + /** + * Currently disabled, but if you want the Visualization caches to be "warmed" during startup, + * uncomment the line below. + * + * NB: Caches will refresh in the background, it won't pause the startup + */ + + // VisualizationCaches.rebuildAll(); + + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + + } +} diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalGrantVisualizationRequestHandler.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalGrantVisualizationRequestHandler.java index 11835447..00808e5c 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalGrantVisualizationRequestHandler.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalGrantVisualizationRequestHandler.java @@ -2,7 +2,6 @@ package edu.cornell.mannlib.vitro.webapp.visualization.temporalgraph; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -14,7 +13,6 @@ import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.visualization.utilities.CounterUtils; import edu.cornell.mannlib.vitro.webapp.visualization.utilities.OrgUtils; import edu.cornell.mannlib.vitro.webapp.visualization.utilities.VisualizationCaches; -import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Individual; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -28,15 +26,10 @@ import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.Res import edu.cornell.mannlib.vitro.webapp.controller.freemarker.responsevalues.TemplateResponseValues; import edu.cornell.mannlib.vitro.webapp.controller.visualization.DataVisualizationController; import edu.cornell.mannlib.vitro.webapp.controller.visualization.VisualizationFrameworkConstants; -import edu.cornell.mannlib.vitro.webapp.visualization.constants.VOConstants; import edu.cornell.mannlib.vitro.webapp.visualization.constants.VisConstants; import edu.cornell.mannlib.vitro.webapp.visualization.exceptions.MalformedQueryParametersException; -import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Activity; -import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.Entity; -import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.SubEntity; import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.json.JsonObject; import edu.cornell.mannlib.vitro.webapp.visualization.valueobjects.json.SubjectEntityJSON; -import edu.cornell.mannlib.vitro.webapp.visualization.visutils.SelectOnModelUtilities; import edu.cornell.mannlib.vitro.webapp.visualization.visutils.UtilityFunctions; import edu.cornell.mannlib.vitro.webapp.visualization.visutils.VisualizationRequestHandler; @@ -143,8 +136,8 @@ public class TemporalGrantVisualizationRequestHandler implements RDFService rdfService = vitroRequest.getRDFService(); - Map orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService); - Map personLabelMap = VisualizationCaches.cachedPersonLabels.get(rdfService); + Map orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService); + Map personLabelMap = VisualizationCaches.personLabels.get(rdfService); if (orgLabelMap.get(subjectEntityURI) == null) { if (VisConstants.DataVisMode.JSON.equals(visMode)) { @@ -154,12 +147,12 @@ public class TemporalGrantVisualizationRequestHandler implements } } - Map> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService); - Map> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService); - Map orgMostSpecificLabelMap = VisualizationCaches.cachedOrganizationToMostSpecificLabel.get(rdfService); - Map personMostSpecificLabelMap = VisualizationCaches.cachedPersonToMostSpecificLabel.get(rdfService); - Map> personToGrantMap = VisualizationCaches.cachedPersonToGrant.get(rdfService); - Map grantToYearMap = VisualizationCaches.cachedGrantToYear.get(rdfService); + Map> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService); + Map> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService); + Map orgMostSpecificLabelMap = VisualizationCaches.organizationToMostSpecificLabel.get(rdfService); + Map personMostSpecificLabelMap = VisualizationCaches.personToMostSpecificLabel.get(rdfService); + Map> personToGrantMap = VisualizationCaches.personToGrant.get(rdfService); + Map grantToYearMap = VisualizationCaches.grantToYear.get(rdfService); Set orgGrants = new HashSet(); Set orgGrantsPeople = new HashSet(); diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalPublicationVisualizationRequestHandler.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalPublicationVisualizationRequestHandler.java index bb305fe5..452ece12 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalPublicationVisualizationRequestHandler.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/temporalgraph/TemporalPublicationVisualizationRequestHandler.java @@ -76,8 +76,8 @@ public class TemporalPublicationVisualizationRequestHandler implements RDFService rdfService = vitroRequest.getRDFService(); - Map orgLabelMap = VisualizationCaches.cachedOrganizationLabels.get(rdfService); - Map personLabelMap = VisualizationCaches.cachedPersonLabels.get(rdfService); + Map orgLabelMap = VisualizationCaches.organizationLabels.get(rdfService); + Map personLabelMap = VisualizationCaches.personLabels.get(rdfService); if (orgLabelMap.get(subjectEntityURI) == null) { if (VisConstants.DataVisMode.JSON.equals(visMode)) { @@ -87,12 +87,12 @@ public class TemporalPublicationVisualizationRequestHandler implements } } - Map> subOrgMap = VisualizationCaches.cachedOrganizationSubOrgs.get(rdfService); - Map orgMostSpecificLabelMap = VisualizationCaches.cachedOrganizationToMostSpecificLabel.get(rdfService); - Map personMostSpecificLabelMap = VisualizationCaches.cachedPersonToMostSpecificLabel.get(rdfService); - Map> organisationToPeopleMap = VisualizationCaches.cachedOrganisationToPeopleMap.get(rdfService); - Map> personToPublicationMap = VisualizationCaches.cachedPersonToPublication.get(rdfService); - Map publicationToYearMap = VisualizationCaches.cachedPublicationToYear.get(rdfService); + Map> subOrgMap = VisualizationCaches.organizationSubOrgs.get(rdfService); + Map orgMostSpecificLabelMap = VisualizationCaches.organizationToMostSpecificLabel.get(rdfService); + Map personMostSpecificLabelMap = VisualizationCaches.personToMostSpecificLabel.get(rdfService); + Map> organisationToPeopleMap = VisualizationCaches.organisationToPeopleMap.get(rdfService); + Map> personToPublicationMap = VisualizationCaches.personToPublication.get(rdfService); + Map publicationToYearMap = VisualizationCaches.publicationToYear.get(rdfService); Set orgPublications = new HashSet(); Set orgPublicationsPeople = new HashSet(); diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java index 27d4a6d8..5c9c3478 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java @@ -1,87 +1,323 @@ package edu.cornell.mannlib.vitro.webapp.visualization.utilities; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +/** + * Utilicy class that populates and returns a cache. + * Once the cache is populated, it can return the cached results whilst refreshing in the background. + * + * @param + */ public class CachingRDFServiceExecutor { + /** + * Cache information + */ private T cachedResults; private long lastCacheTime; private RDFServiceCallable resultBuilder; + + /** + * Background task tracker + */ private FutureTask backgroundTask = null; + private long backgroundTaskStartTime = -1; + + /** + * RDF Service to be used by background threads + */ + private static RDFService backgroundRDFService = null; public CachingRDFServiceExecutor(RDFServiceCallable resultBuilder) { this.resultBuilder = resultBuilder; } + /** + * Return the cached results if present, or start the task. + * Will wait for completion if the cache is not already populated, otherwise the refresh will happen in the background. + * + * @param rdfService an RDF service to use, in foreground mode, if the background service is missing + * @return + */ public synchronized T get(RDFService rdfService) { - if (cachedResults != null) { - if (!resultBuilder.invalidateCache(System.currentTimeMillis() - lastCacheTime)) { - return cachedResults; - } + // First, check if there are results from the previous background task, and update the cache + if (backgroundTask != null && backgroundTask.isDone()) { + completeBackgroundTask(); } - try { - if (backgroundTask == null) { - resultBuilder.setRDFService(rdfService); - backgroundTask = new FutureTask(resultBuilder); + // If we have cached results + if (cachedResults != null) { + // If the background service exists, and the cache is considered invalid + if (backgroundRDFService != null && resultBuilder.invalidateCache(System.currentTimeMillis() - lastCacheTime)) { + // Determine how long we are prepared to wait for an answer + long waitFor = (backgroundTask == null ? 1000 : 500); - Thread thread = new Thread(backgroundTask); - thread.setDaemon(true); - thread.start(); + // Start the background task to refresh the cache + startBackgroundTask(rdfService); - if (cachedResults == null || resultBuilder.executionTime < 2000) { - completeBackgroundTask(); + // See if we expect it to complete in time, and if so, wait for it + if (isExpectedToCompleteIn(waitFor)) { + completeBackgroundTask(waitFor); } - } else if (backgroundTask.isDone()) { - completeBackgroundTask(); } - } catch (InterruptedException e) { - abortBackgroundTask(); - } catch (ExecutionException e) { - abortBackgroundTask(); - throw new RuntimeException("Background RDF thread through an exception", e.getCause()); + } else { + // No cached results, so fetch the results using any availabe RDF service + if (rdfService != null) { + startBackgroundTask(rdfService); + } else if (backgroundRDFService != null) { + startBackgroundTask(backgroundRDFService); + } else { + throw new RuntimeException("Can't execute without an RDF Service"); + } + + // As there are no cached results, wait for an answer regardless of the RDF service used + completeBackgroundTask(); } return cachedResults; } + /** + * (Re)build the current cache + * + * @param rdfService an RDF service to use, if the background RDF service is not set + */ + public synchronized void build(RDFService rdfService) { + // First, check if there are results from the previous background task, and update the cache + if (backgroundTask != null && backgroundTask.isDone()) { + completeBackgroundTask(); + } + + // If we have a background RDF service, we can launch the task in the background and leave it + if (backgroundRDFService != null) { + startBackgroundTask(backgroundRDFService); + } else if (rdfService != null) { + // No background service, so use the paassed RDF service, and wait for completion + startBackgroundTask(backgroundRDFService); + completeBackgroundTask(); + } + } + + /** + * Determine if a task is likely to complete with the time frame specified + * + * @param interval - time in milliseconds that you want the task to complete in + * @return true if the task is likely to complete + */ + private boolean isExpectedToCompleteIn(long interval) { + // If there is no background task, there is nothing to complete + if (backgroundTask == null) { + return false; + } + + // If the task has already completed, then return true + if (backgroundTask.isDone()) { + return true; + } + + // Get the current time + long now = System.currentTimeMillis(); + + // If the task has started, and has a previous execution time + if (resultBuilder.startedAt > -1 && resultBuilder.executionTime > -1) { + // Estimate a finish time, based on when the task started, and how long it last took + long expectedFinish = resultBuilder.startedAt + resultBuilder.executionTime; + + // If we expect it to complete before the interval passes, return true + if (expectedFinish < (now + interval)) { + return true; + } + + } + + // We expect the task to take longer than the timeout, so return false + return false; + } + + /** + * Create and start a background thread using the configured task + * @param rdfService + */ + private void startBackgroundTask(RDFService rdfService) { + // Ensure that there isn't already a task + if (backgroundTask == null && rdfService != null) { + // Set an RDF service to use + resultBuilder.setRDFService(backgroundRDFService != null ? backgroundRDFService : rdfService); + + // Create the background task, and record the time + backgroundTask = new FutureTask(resultBuilder); + backgroundTaskStartTime = System.currentTimeMillis(); + + // Start a background thread, ensuring that it can be terminated by the host + Thread thread = new VitroBackgroundThread(backgroundTask, resultBuilder.getClass().getName()); + thread.setDaemon(true); + thread.start(); + } + } + + /** + * Abort the current background task + */ private void abortBackgroundTask() { + // Ensure that we have a background task if (backgroundTask != null) { + // Cancel the background task and clear the start time backgroundTask.cancel(true); backgroundTask = null; + backgroundTaskStartTime = -1; } } - private void completeBackgroundTask() throws InterruptedException, ExecutionException { - if (backgroundTask != null) { - cachedResults = backgroundTask.get(); - lastCacheTime = System.currentTimeMillis(); - backgroundTask = null; + /** + * Complete the background task + */ + private void completeBackgroundTask() { + completeBackgroundTask(-1); + } + + /** + * Complete the background task + * @param waitFor - maximum time to wait for the results, -1 if forever + */ + private void completeBackgroundTask(long waitFor) { + try { + // If we have a background task + if (backgroundTask != null) { + + // Update the cached results + if (waitFor < 0) { + cachedResults = backgroundTask.get(); + } else { + cachedResults = backgroundTask.get(waitFor, TimeUnit.MILLISECONDS); + } + + // Set the time of the cache equal to the start time of the task that generated the results + lastCacheTime = backgroundTaskStartTime; + + // Clear the background task information + backgroundTask = null; + backgroundTaskStartTime = -1; + } + } catch (InterruptedException e) { + // Task was interrupted, so abort it + abortBackgroundTask(); + } catch (ExecutionException e) { + // There was a problem inside the task, so abort and throw an exception + try { + abortBackgroundTask(); + } finally { + throw new RuntimeException("Background RDF thread through an exception", e.getCause()); + } + } catch (TimeoutException e) { + // Ignore a timeout waiting for the results } } + /** + * Set the RDF service to be used for background threads (called from a startup servlet) + * @param rdfService + */ + public static void setBackgroundRDFService(RDFService rdfService) { + backgroundRDFService = rdfService; + } + + /** + * Class to be implemented by user to provide the means of generating the results + * @param + */ public static abstract class RDFServiceCallable implements Callable { + // The RDF Service private RDFService rdfService; + + // Start and execution times + private long startedAt = -1; private long executionTime = -1; + // Affinity object to prevent tasks with the same affinity from running in parallel + private Affinity affinity = null; + + /** + * Default constructor + */ + public RDFServiceCallable() { } + + /** + * Constructor that allows an affinity object to be supplied + * @param affinity + */ + public RDFServiceCallable(Affinity affinity) { this.affinity = affinity; } + + /** + * Set the RDF service to be used + * @param rdfService + */ final void setRDFService(RDFService rdfService) { this.rdfService = rdfService; } + /** + * Entry point for the background threads, ensuring the right start / cleanup is done + * @return + * @throws Exception + */ @Override final public T call() throws Exception { - long start = System.currentTimeMillis(); - T val = callWithService(rdfService); - executionTime = System.currentTimeMillis() - start; - return val; + try { + // If we have an affinity object + if (affinity != null) { + // Ask for permission to process processing + affinity.requestStart(executionTime); + } + + // Record the start time + startedAt = System.currentTimeMillis(); + + // Call the user implementation, passing the RDF service + T val = callWithService(rdfService); + + // Record how long it to to execute + executionTime = System.currentTimeMillis() - startedAt; + + // Return the results + return val; + } finally { + // Ensure that we reset the start time + startedAt = -1; + + // Tell any affinity object that we have completed + if (affinity != null) { + affinity.complete(); + } + } + } + /** + * Method for users to implement, to return the results + * @param rdfService + * @return + * @throws Exception + */ protected abstract T callWithService(RDFService rdfService) throws Exception; + /** + * Method to determine if the cache should be invalidated for the current results + * Default implementation dynamically adjusts the cache time based on the efficiency of creating results + * @param timeCached + * @return + */ boolean invalidateCache(long timeCached) { if (executionTime > -1) { /* @@ -102,7 +338,98 @@ public class CachingRDFServiceExecutor { return timeCached > Math.min(executionTime * 120, 86400000); } + return false; } } + + /** + * Affinity class that serializes background processing for tasks given the same affinity + */ + public static class Affinity { + private int maxThreads = 1; + + // Map of executing threads, and the time they expect to need to execute + private Map threadToExecutionTime = new HashMap<>(); + private Set executingThreads = new HashSet<>(); + + /** + * Called by a background thread to determine if it is allowed to start + * @param expectedExecutionTime time that the thread expects to take (usualling the last execution time) + */ + private void requestStart(long expectedExecutionTime) { + // Ask if the task needs to be queued + if (queueThis(Thread.currentThread(), expectedExecutionTime)) { + // Synchronize the thread to call wait + synchronized (Thread.currentThread()) { + try { + // Make the thread wait until it is notified to continue + Thread.currentThread().wait(); + } catch(InterruptedException e) { + } + } + } + } + + /** + * Adds a thread to the map, returns whether the thread needs to wait + * @param thread + * @param time + * @return true if the thread needs to wait, false if it can continue + */ + private synchronized boolean queueThis(Thread thread, Long time) { + // If we have fewer that the max threads running + if (executingThreads.size() < maxThreads) { + // Add thread to executing set + executingThreads.add(thread); + + // Not queued - we can continue + return false; + } else { + // Add the thread to the map + threadToExecutionTime.put(thread, time); + + // Let the caller know that we are queued + return true; + } + } + + /** + * Complete a thread + */ + private synchronized void complete() { + // Check that we are tracking this thread + if (executingThreads.contains(Thread.currentThread())) { + + // Remove the thread from the map + executingThreads.remove(Thread.currentThread()); + + // If there are still threads to execute, and we have not exhausted maximum threads + while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) { + Thread nextToRelease = null; + long executionTime = -1; + + // Find the thread that expects to take the least time + for (Thread thread : threadToExecutionTime.keySet()) { + long thisTime = threadToExecutionTime.get(thread); + + if (nextToRelease == null) { + nextToRelease = thread; + executionTime = thisTime; + } else if (thisTime < executionTime) { + nextToRelease = thread; + executionTime = thisTime; + } + } + + // Synchronize on the thread we are releasing, and notify it to continue + synchronized (nextToRelease) { + threadToExecutionTime.remove(nextToRelease); + executingThreads.add(nextToRelease); + nextToRelease.notify(); + } + } + } + } + } } diff --git a/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/VisualizationCaches.java b/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/VisualizationCaches.java index 3b4980c9..3b9f39b8 100644 --- a/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/VisualizationCaches.java +++ b/src/edu/cornell/mannlib/vitro/webapp/visualization/utilities/VisualizationCaches.java @@ -1,24 +1,56 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + package edu.cornell.mannlib.vitro.webapp.visualization.utilities; import com.hp.hpl.jena.query.QuerySolution; -import com.hp.hpl.jena.query.ResultSet; -import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.rdf.model.Resource; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer; import edu.cornell.mannlib.vitro.webapp.visualization.constants.QueryConstants; import edu.cornell.mannlib.vitro.webapp.visualization.visutils.UtilityFunctions; import org.joda.time.DateTime; -import java.io.InputStream; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +/** + * Holder for the caches we are using in the visualizations + */ final public class VisualizationCaches { - public static final CachingRDFServiceExecutor> cachedOrganizationLabels = + // Affinity object to ensure that only one background thread can be running at once when updating the caches + private static final CachingRDFServiceExecutor.Affinity visualizationAffinity = new CachingRDFServiceExecutor.Affinity(); + + /** + * Rebuild all the caches + */ + public static void rebuildAll() { rebuildAll(null); } + + /** + * Rebuild all the caches + * @param rdfService if not null, use this service in foreground, otherwise may use the background thread + */ + public static void rebuildAll(RDFService rdfService) { + organizationLabels.build(rdfService); + organizationSubOrgs.build(rdfService); + organizationToMostSpecificLabel.build(rdfService); + organisationToPeopleMap.build(rdfService); + personLabels.build(rdfService); + personToMostSpecificLabel.build(rdfService); + personToPublication.build(rdfService); + publicationToJournal.build(rdfService); + publicationToYear.build(rdfService); + personToGrant.build(rdfService); + grantToYear.build(rdfService); + } + + /** + * Cache of organization labels (uri -> label) + */ + public static final CachingRDFServiceExecutor> organizationLabels = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { @Override protected Map callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -29,33 +61,29 @@ final public class VisualizationCaches { " ?org rdfs:label ?orgLabel .\n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String org = qs.getResource("org").getURI(); String orgLabel = qs.getLiteral("orgLabel").getString(); map.put(org, orgLabel); } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor>> cachedOrganizationSubOrgs = + /** + * Cache of organization to sub organizations (uri -> list of uris) + */ + public static final CachingRDFServiceExecutor>> organizationSubOrgs = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>>() { + new CachingRDFServiceExecutor.RDFServiceCallable>>(visualizationAffinity) { @Override protected Map> callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -66,16 +94,11 @@ final public class VisualizationCaches { " ?org ?subOrg .\n" + "}\n"; - Map> map = new HashMap<>(); + final Map> map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String org = qs.getResource("org").getURI(); String subOrg = qs.getResource("subOrg").getURI(); @@ -88,16 +111,17 @@ final public class VisualizationCaches { subOrgs.add(subOrg); } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor> cachedOrganizationToMostSpecificLabel = + /** + * Organization most specific type label (uri -> string) + */ + public static final CachingRDFServiceExecutor> organizationToMostSpecificLabel = new CachingRDFServiceExecutor<>( new CachingRDFServiceExecutor.RDFServiceCallable>() { @Override @@ -111,32 +135,28 @@ final public class VisualizationCaches { " ?type rdfs:label ?typeLabel .\n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String org = qs.getResource("org").getURI(); String typeLabel = qs.getLiteral("typeLabel").getString(); map.put(org, String.valueOf(typeLabel)); } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor>> cachedOrganisationToPeopleMap = + /** + * Map of people within an organisation (org uri -> list of person uri) + */ + public static final CachingRDFServiceExecutor>> organisationToPeopleMap = new CachingRDFServiceExecutor>>( - new CachingRDFServiceExecutor.RDFServiceCallable>>() { + new CachingRDFServiceExecutor.RDFServiceCallable>>(visualizationAffinity) { @Override protected Map> callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -145,22 +165,16 @@ final public class VisualizationCaches { "{\n" + " ?organisation a foaf:Organization .\n" + " ?organisation core:relatedBy ?position .\n" + + " ?position a core:Position .\n" + " ?position core:relates ?person .\n" + " ?person a foaf:Person .\n" + "}\n"; - // TODO Critical section? + final Map> orgToPeopleMap = new HashMap>(); - Map> orgToPeopleMap = new HashMap>(); - - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String org = qs.getResource("organisation").getURI(); String person = qs.getResource("person").getURI(); @@ -173,18 +187,19 @@ final public class VisualizationCaches { people.add(person); } } - } finally { - silentlyClose(is); - } + }); return orgToPeopleMap; } } ); - public static final CachingRDFServiceExecutor> cachedPersonLabels = + /** + * Display labels for people (uri -> label) + */ + public static final CachingRDFServiceExecutor> personLabels = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { @Override protected Map callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -195,33 +210,62 @@ final public class VisualizationCaches { " ?person rdfs:label ?personLabel .\n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String person = qs.getResource("person").getURI(); String personLabel = qs.getLiteral("personLabel").getString(); map.put(person, personLabel); } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor>> cachedPersonToPublication = + /** + * Most specific type for person (uri -> label) + */ + public static final CachingRDFServiceExecutor> personToMostSpecificLabel = + new CachingRDFServiceExecutor<>( + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { + @Override + protected Map callWithService(RDFService rdfService) throws Exception { + String query = QueryConstants.getSparqlPrefixQuery() + + "SELECT ?person ?typeLabel\n" + + "WHERE\n" + + "{\n" + + " ?person a foaf:Person .\n" + + " ?person vitro:mostSpecificType ?type .\n" + + " ?type rdfs:label ?typeLabel .\n" + + "}\n"; + + final Map map = new HashMap<>(); + + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { + String person = qs.getResource("person").getURI(); + String typeLabel = qs.getLiteral("typeLabel").getString(); + map.put(person, String.valueOf(typeLabel)); + } + }); + + return map; + } + } + ); + + /** + * Person to publication Map (person uri -> list of publication uri) + */ + public static final CachingRDFServiceExecutor>> personToPublication = new CachingRDFServiceExecutor>>( - new CachingRDFServiceExecutor.RDFServiceCallable>>() { + new CachingRDFServiceExecutor.RDFServiceCallable>>(visualizationAffinity) { @Override protected Map> callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -235,17 +279,11 @@ final public class VisualizationCaches { " ?document a bibo:Document .\n" + "}\n"; - Map> map = new HashMap>(); - - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + final Map> map = new HashMap>(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { Resource person = qs.getResource("person"); Resource document = qs.getResource("document"); @@ -262,18 +300,53 @@ final public class VisualizationCaches { } } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor> cachedPublicationToYear = + /** + * Publication to journal (publication uri -> journal label) + */ + public static final CachingRDFServiceExecutor> publicationToJournal = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { + @Override + protected Map callWithService(RDFService rdfService) throws Exception { + String query = QueryConstants.getSparqlPrefixQuery() + + "SELECT ?document ?journalLabel\n" + + "WHERE\n" + + "{\n" + + " ?document a bibo:Document .\n" + + " ?document core:hasPublicationVenue ?journal . \n" + + " ?journal rdfs:label ?journalLabel . \n" + + "}\n"; + + final Map map = new HashMap<>(); + + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { + String document = qs.getResource("document").getURI(); + String journalLabel = qs.getLiteral("journalLabel").getString(); + + map.put(document, journalLabel); + } + }); + + return map; + } + } + ); + + /** + * Publication to year (publication uri -> year) + */ + public static final CachingRDFServiceExecutor> publicationToYear = + new CachingRDFServiceExecutor<>( + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { @Override protected Map callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -285,16 +358,11 @@ final public class VisualizationCaches { " ?dateTimeValue core:dateTime ?publicationDate . \n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String document = qs.getResource("document").getURI(); String pubDate = qs.getLiteral("publicationDate").getString(); if (pubDate != null) { @@ -306,18 +374,19 @@ final public class VisualizationCaches { } } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor>> cachedPersonToGrant = + /** + * Person to grant (person uri -> grant uri) + */ + public static final CachingRDFServiceExecutor>> personToGrant = new CachingRDFServiceExecutor>>( - new CachingRDFServiceExecutor.RDFServiceCallable>>() { + new CachingRDFServiceExecutor.RDFServiceCallable>>(visualizationAffinity) { @Override protected Map> callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -331,17 +400,11 @@ final public class VisualizationCaches { " ?grant a core:Grant .\n" + "}\n"; - Map> map = new HashMap>(); - - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + final Map> map = new HashMap>(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { Resource person = qs.getResource("person"); Resource grant = qs.getResource("grant"); @@ -358,18 +421,19 @@ final public class VisualizationCaches { } } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor> cachedGrantToYear = + /** + * Grant to year (grant uri -> year) + */ + public static final CachingRDFServiceExecutor> grantToYear = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { @Override protected Map callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -382,16 +446,11 @@ final public class VisualizationCaches { " ?startDate core:dateTime ?startDateTimeValue . \n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String grant = qs.getResource("grant").getURI(); String startDate = qs.getLiteral("startDateTimeValue").getString(); if (startDate != null) { @@ -403,18 +462,19 @@ final public class VisualizationCaches { } } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - public static final CachingRDFServiceExecutor> cachedGrantToRoleYear = + /** + * Grant to year of start in role (grant uri -> year) + */ + public static final CachingRDFServiceExecutor> grantToRoleYear = new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { + new CachingRDFServiceExecutor.RDFServiceCallable>(visualizationAffinity) { @Override protected Map callWithService(RDFService rdfService) throws Exception { String query = QueryConstants.getSparqlPrefixQuery() + @@ -428,16 +488,11 @@ final public class VisualizationCaches { " ?startDate core:dateTime ?startDateTimeValue . \n" + "}\n"; - Map map = new HashMap<>(); + final Map map = new HashMap<>(); - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); + rdfService.sparqlSelectQuery(query, new ResultSetConsumer() { + @Override + protected void processQuerySolution(QuerySolution qs) { String grant = qs.getResource("grant").getURI(); String startDate = qs.getLiteral("startDateTimeValue").getString(); if (startDate != null) { @@ -449,59 +504,10 @@ final public class VisualizationCaches { } } } - } finally { - silentlyClose(is); - } + }); return map; } } ); - - public static final CachingRDFServiceExecutor> cachedPersonToMostSpecificLabel = - new CachingRDFServiceExecutor<>( - new CachingRDFServiceExecutor.RDFServiceCallable>() { - @Override - protected Map callWithService(RDFService rdfService) throws Exception { - String query = QueryConstants.getSparqlPrefixQuery() + - "SELECT ?person ?typeLabel\n" + - "WHERE\n" + - "{\n" + - " ?person a foaf:Person .\n" + - " ?person vitro:mostSpecificType ?type .\n" + - " ?type rdfs:label ?typeLabel .\n" + - "}\n"; - - Map map = new HashMap<>(); - - InputStream is = null; - ResultSet rs = null; - try { - is = rdfService.sparqlSelectQuery(query, RDFService.ResultFormat.JSON); - rs = ResultSetFactory.fromJSON(is); - - while (rs.hasNext()) { - QuerySolution qs = rs.next(); - String person = qs.getResource("person").getURI(); - String typeLabel = qs.getLiteral("typeLabel").getString(); - map.put(person, String.valueOf(typeLabel)); - } - } finally { - silentlyClose(is); - } - - return map; - } - } - ); - - private static void silentlyClose(InputStream is) { - try { - if (is != null) { - is.close(); - } - } catch (Throwable t) { - - } - } }