Threading improvements

This commit is contained in:
Graham Triggs 2016-07-28 22:22:20 +01:00
parent f43db39084
commit 4d8bfe1a65

View file

@ -5,7 +5,6 @@ package edu.cornell.mannlib.vitro.webapp.visualization.utilities;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread; import edu.cornell.mannlib.vitro.webapp.utils.threads.VitroBackgroundThread;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -18,7 +17,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
* Utilicy class that populates and returns a cache. * Utility class that populates and returns a cache.
* Once the cache is populated, it can return the cached results whilst refreshing in the background. * Once the cache is populated, it can return the cached results whilst refreshing in the background.
* *
* @param <T> * @param <T>
@ -30,7 +29,7 @@ public class CachingRDFServiceExecutor<T> {
private T cachedResults; private T cachedResults;
private long lastCacheTime; private long lastCacheTime;
private RDFServiceCallable<T> resultBuilder; private final RDFServiceCallable<T> resultBuilder;
/** /**
* Background task tracker * Background task tracker
@ -82,7 +81,7 @@ public class CachingRDFServiceExecutor<T> {
} }
} }
} else { } else {
// No cached results, so fetch the results using any availabe RDF service // No cached results, so fetch the results using any available RDF service
if (rdfService != null) { if (rdfService != null) {
startBackgroundTask(rdfService); startBackgroundTask(rdfService);
} else if (backgroundRDFService != null) { } else if (backgroundRDFService != null) {
@ -125,8 +124,8 @@ public class CachingRDFServiceExecutor<T> {
startBackgroundTask(backgroundRDFService); startBackgroundTask(backgroundRDFService);
completeBackgroundTaskAsync(); completeBackgroundTaskAsync();
} else if (rdfService != null) { } else if (rdfService != null) {
// No background service, so use the paassed RDF service, and wait for completion // No background service, so use the passed RDF service, and wait for completion
startBackgroundTask(backgroundRDFService); startBackgroundTask(rdfService);
completeBackgroundTask(); completeBackgroundTask();
} }
} }
@ -210,11 +209,7 @@ public class CachingRDFServiceExecutor<T> {
backgroundCompletion = new Thread(new Runnable() { backgroundCompletion = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
try { completeBackgroundTask(-1);
Thread.sleep(500);
completeBackgroundTask(-1);
} catch (InterruptedException e) {
}
} }
}); });
backgroundCompletion.setDaemon(true); backgroundCompletion.setDaemon(true);
@ -258,11 +253,8 @@ public class CachingRDFServiceExecutor<T> {
abortBackgroundTask(); abortBackgroundTask();
} catch (ExecutionException e) { } catch (ExecutionException e) {
// There was a problem inside the task, so abort and throw an exception // There was a problem inside the task, so abort and throw an exception
try { abortBackgroundTask();
abortBackgroundTask(); throw new RuntimeException("Background RDF thread through an exception", e.getCause());
} finally {
throw new RuntimeException("Background RDF thread through an exception", e.getCause());
}
} catch (TimeoutException e) { } catch (TimeoutException e) {
// Ignore a timeout waiting for the results // Ignore a timeout waiting for the results
} }
@ -312,7 +304,7 @@ public class CachingRDFServiceExecutor<T> {
/** /**
* Entry point for the background threads, ensuring the right start / cleanup is done * Entry point for the background threads, ensuring the right start / cleanup is done
* @throws Exception * @throws Exception Any exception
*/ */
@Override @Override
final public T call() throws Exception { final public T call() throws Exception {
@ -348,7 +340,7 @@ public class CachingRDFServiceExecutor<T> {
/** /**
* Method for users to implement, to return the results * Method for users to implement, to return the results
* @param rdfService An RDFService * @param rdfService An RDFService
* @throws Exception * @throws Exception Any exception
*/ */
protected abstract T callWithService(RDFService rdfService) throws Exception; protected abstract T callWithService(RDFService rdfService) throws Exception;
@ -386,25 +378,38 @@ public class CachingRDFServiceExecutor<T> {
* Affinity class that serializes background processing for tasks given the same affinity * Affinity class that serializes background processing for tasks given the same affinity
*/ */
public static class Affinity { public static class Affinity {
private int maxThreads = 1; private final int maxThreads = 1;
static class ThreadTimers {
ThreadTimers(long started, long expectedDuration) {
this.started = started;
this.expectedDuration = expectedDuration;
}
final long started;
final long expectedDuration;
}
// Map of executing threads, and the time they expect to need to execute // Map of executing threads, and the time they expect to need to execute
private Map<Thread, Long> threadToExecutionTime = new HashMap<>(); private final Map<Thread, ThreadTimers> threadToExecutionTime = new HashMap<>();
private Set<Thread> executingThreads = new HashSet<>(); private final Set<Thread> executingThreads = new HashSet<>();
/** /**
* Called by a background thread to determine if it is allowed to start * Called by a background thread to determine if it is allowed to start
* @param expectedExecutionTime time that the thread expects to take (usualling the last execution time) * @param expectedExecutionTime time that the thread expects to take (usually the last execution time)
*/ */
private void requestStart(long expectedExecutionTime) { private void requestStart(long expectedExecutionTime) {
Thread executingThread = Thread.currentThread();
// Ask if the task needs to be queued // Ask if the task needs to be queued
if (queueThis(Thread.currentThread(), expectedExecutionTime)) { if (queueThis(executingThread, expectedExecutionTime)) {
// Synchronize the thread to call wait // Synchronize the thread to call wait
synchronized (Thread.currentThread()) { synchronized (executingThread) {
try { try {
// Make the thread wait until it is notified to continue // Make the thread wait until it is notified to continue
Thread.currentThread().wait(); executingThread.wait();
} catch(InterruptedException e) { } catch(InterruptedException e) {
// No need to execute
} }
} }
} }
@ -426,7 +431,7 @@ public class CachingRDFServiceExecutor<T> {
return false; return false;
} else { } else {
// Add the thread to the map // Add the thread to the map
threadToExecutionTime.put(thread, time); threadToExecutionTime.put(thread, new ThreadTimers(System.currentTimeMillis(), time));
// Let the caller know that we are queued // Let the caller know that we are queued
return true; return true;
@ -447,25 +452,46 @@ public class CachingRDFServiceExecutor<T> {
while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) { while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) {
Thread nextToRelease = null; Thread nextToRelease = null;
long executionTime = -1; long executionTime = -1;
long started = -1;
long current = System.currentTimeMillis();
boolean favourStartTime = false;
// Find the thread that expects to take the least time // Find the thread that expects to take the least time
for (Thread thread : threadToExecutionTime.keySet()) { for (Thread thread : threadToExecutionTime.keySet()) {
long thisTime = threadToExecutionTime.get(thread); ThreadTimers thisThreadTimer = threadToExecutionTime.get(thread);
// If there are threads that have been waiting over 2 seconds, favour the oldest thread
if (thisThreadTimer.started + 2000 < current) {
favourStartTime = true;
}
if (nextToRelease == null) { if (nextToRelease == null) {
nextToRelease = thread; nextToRelease = thread;
executionTime = thisTime; executionTime = thisThreadTimer.expectedDuration;
} else if (thisTime < executionTime) { started = thisThreadTimer.started;
nextToRelease = thread; } else {
executionTime = thisTime; if (favourStartTime) {
// Find the oldest thread
if (thisThreadTimer.started < started) {
nextToRelease = thread;
executionTime = thisThreadTimer.expectedDuration;
started = thisThreadTimer.started;
}
} else if (thisThreadTimer.expectedDuration < executionTime) {
nextToRelease = thread;
executionTime = thisThreadTimer.expectedDuration;
started = thisThreadTimer.started;
}
} }
} }
// Synchronize on the thread we are releasing, and notify it to continue // Notify the Thread we are releasing to continue
synchronized (nextToRelease) { if (nextToRelease != null) {
threadToExecutionTime.remove(nextToRelease); threadToExecutionTime.remove(nextToRelease);
executingThreads.add(nextToRelease); executingThreads.add(nextToRelease);
nextToRelease.notify(); synchronized (nextToRelease) {
nextToRelease.notifyAll();
}
} }
} }
} }