diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java index 2b9e9f8a..f0bf5049 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java @@ -5,7 +5,6 @@ package edu.cornell.mannlib.vitro.webapp.visualization.utilities; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -18,7 +17,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * Utilicy class that populates and returns a cache. + * Utility class that populates and returns a cache. * Once the cache is populated, it can return the cached results whilst refreshing in the background. * * @param @@ -30,7 +29,7 @@ public class CachingRDFServiceExecutor { private T cachedResults; private long lastCacheTime; - private RDFServiceCallable resultBuilder; + private final RDFServiceCallable resultBuilder; /** * Background task tracker @@ -82,7 +81,7 @@ public class CachingRDFServiceExecutor { } } } else { - // No cached results, so fetch the results using any availabe RDF service + // No cached results, so fetch the results using any available RDF service if (rdfService != null) { startBackgroundTask(rdfService); } else if (backgroundRDFService != null) { @@ -125,8 +124,8 @@ public class CachingRDFServiceExecutor { startBackgroundTask(backgroundRDFService); completeBackgroundTaskAsync(); } else if (rdfService != null) { - // No background service, so use the paassed RDF service, and wait for completion - startBackgroundTask(backgroundRDFService); + // No background service, so use the passed RDF service, and wait for completion + startBackgroundTask(rdfService); completeBackgroundTask(); } } @@ -210,11 +209,7 @@ public class CachingRDFServiceExecutor { backgroundCompletion = new Thread(new Runnable() { @Override public void run() { - try { - Thread.sleep(500); - completeBackgroundTask(-1); - } catch (InterruptedException e) { - } + completeBackgroundTask(-1); } }); backgroundCompletion.setDaemon(true); @@ -258,11 +253,8 @@ public class CachingRDFServiceExecutor { abortBackgroundTask(); } catch (ExecutionException e) { // There was a problem inside the task, so abort and throw an exception - try { - abortBackgroundTask(); - } finally { - throw new RuntimeException("Background RDF thread through an exception", e.getCause()); - } + abortBackgroundTask(); + throw new RuntimeException("Background RDF thread through an exception", e.getCause()); } catch (TimeoutException e) { // Ignore a timeout waiting for the results } @@ -312,7 +304,7 @@ public class CachingRDFServiceExecutor { /** * Entry point for the background threads, ensuring the right start / cleanup is done - * @throws Exception + * @throws Exception Any exception */ @Override final public T call() throws Exception { @@ -348,7 +340,7 @@ public class CachingRDFServiceExecutor { /** * Method for users to implement, to return the results * @param rdfService An RDFService - * @throws Exception + * @throws Exception Any exception */ protected abstract T callWithService(RDFService rdfService) throws Exception; @@ -386,25 +378,38 @@ public class CachingRDFServiceExecutor { * Affinity class that serializes background processing for tasks given the same affinity */ public static class Affinity { - private int maxThreads = 1; + private final int maxThreads = 1; + + static class ThreadTimers { + ThreadTimers(long started, long expectedDuration) { + this.started = started; + this.expectedDuration = expectedDuration; + } + + final long started; + final long expectedDuration; + } // Map of executing threads, and the time they expect to need to execute - private Map threadToExecutionTime = new HashMap<>(); - private Set executingThreads = new HashSet<>(); + private final Map threadToExecutionTime = new HashMap<>(); + private final Set executingThreads = new HashSet<>(); /** * Called by a background thread to determine if it is allowed to start - * @param expectedExecutionTime time that the thread expects to take (usualling the last execution time) + * @param expectedExecutionTime time that the thread expects to take (usually the last execution time) */ private void requestStart(long expectedExecutionTime) { + Thread executingThread = Thread.currentThread(); + // Ask if the task needs to be queued - if (queueThis(Thread.currentThread(), expectedExecutionTime)) { + if (queueThis(executingThread, expectedExecutionTime)) { // Synchronize the thread to call wait - synchronized (Thread.currentThread()) { + synchronized (executingThread) { try { // Make the thread wait until it is notified to continue - Thread.currentThread().wait(); + executingThread.wait(); } catch(InterruptedException e) { + // No need to execute } } } @@ -426,7 +431,7 @@ public class CachingRDFServiceExecutor { return false; } else { // Add the thread to the map - threadToExecutionTime.put(thread, time); + threadToExecutionTime.put(thread, new ThreadTimers(System.currentTimeMillis(), time)); // Let the caller know that we are queued return true; @@ -447,25 +452,46 @@ public class CachingRDFServiceExecutor { while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) { Thread nextToRelease = null; long executionTime = -1; + long started = -1; + long current = System.currentTimeMillis(); + boolean favourStartTime = false; // Find the thread that expects to take the least time for (Thread thread : threadToExecutionTime.keySet()) { - long thisTime = threadToExecutionTime.get(thread); + ThreadTimers thisThreadTimer = threadToExecutionTime.get(thread); + + // If there are threads that have been waiting over 2 seconds, favour the oldest thread + if (thisThreadTimer.started + 2000 < current) { + favourStartTime = true; + } if (nextToRelease == null) { nextToRelease = thread; - executionTime = thisTime; - } else if (thisTime < executionTime) { - nextToRelease = thread; - executionTime = thisTime; + executionTime = thisThreadTimer.expectedDuration; + started = thisThreadTimer.started; + } else { + if (favourStartTime) { + // Find the oldest thread + if (thisThreadTimer.started < started) { + nextToRelease = thread; + executionTime = thisThreadTimer.expectedDuration; + started = thisThreadTimer.started; + } + } else if (thisThreadTimer.expectedDuration < executionTime) { + nextToRelease = thread; + executionTime = thisThreadTimer.expectedDuration; + started = thisThreadTimer.started; + } } } - // Synchronize on the thread we are releasing, and notify it to continue - synchronized (nextToRelease) { + // Notify the Thread we are releasing to continue + if (nextToRelease != null) { threadToExecutionTime.remove(nextToRelease); executingThreads.add(nextToRelease); - nextToRelease.notify(); + synchronized (nextToRelease) { + nextToRelease.notifyAll(); + } } } }