Threading improvements (part 2)

This commit is contained in:
Graham Triggs 2016-07-28 22:39:05 +01:00
parent 4d8bfe1a65
commit 2ca805b4ff

View file

@ -11,6 +11,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
@ -380,18 +381,19 @@ public class CachingRDFServiceExecutor<T> {
public static class Affinity {
private final int maxThreads = 1;
static class ThreadTimers {
ThreadTimers(long started, long expectedDuration) {
static class ThreadControl {
ThreadControl(long started, long expectedDuration) {
this.started = started;
this.expectedDuration = expectedDuration;
}
final long started;
final long expectedDuration;
final CountDownLatch latch = new CountDownLatch(1);
}
// Map of executing threads, and the time they expect to need to execute
private final Map<Thread, ThreadTimers> threadToExecutionTime = new HashMap<>();
private final Map<Thread, ThreadControl> threadToExecutionTime = new HashMap<>();
private final Set<Thread> executingThreads = new HashSet<>();
/**
@ -402,15 +404,13 @@ public class CachingRDFServiceExecutor<T> {
Thread executingThread = Thread.currentThread();
// Ask if the task needs to be queued
if (queueThis(executingThread, expectedExecutionTime)) {
// Synchronize the thread to call wait
synchronized (executingThread) {
CountDownLatch latch = queueThis(executingThread, expectedExecutionTime);
// We got a latch from the queue, so wait for it to clear
if (latch != null) {
try {
// Make the thread wait until it is notified to continue
executingThread.wait();
latch.await();
} catch (InterruptedException e) {
// No need to execute
}
}
}
}
@ -421,20 +421,22 @@ public class CachingRDFServiceExecutor<T> {
* @param time start time of the thread
* @return true if the thread needs to wait, false if it can continue
*/
private synchronized boolean queueThis(Thread thread, Long time) {
private synchronized CountDownLatch queueThis(Thread thread, Long time) {
// If we have fewer that the max threads running
if (executingThreads.size() < maxThreads) {
// Add thread to executing set
executingThreads.add(thread);
// Not queued - we can continue
return false;
return null;
} else {
// Add the thread to the map
threadToExecutionTime.put(thread, new ThreadTimers(System.currentTimeMillis(), time));
ThreadControl control = new ThreadControl(System.currentTimeMillis(), time);
// Let the caller know that we are queued
return true;
// Add the thread to the map
threadToExecutionTime.put(thread, control);
// Give the caller a handle to the latch for the queued thread
return control.latch;
}
}
@ -451,36 +453,32 @@ public class CachingRDFServiceExecutor<T> {
// If there are still threads to execute, and we have not exhausted maximum threads
while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) {
Thread nextToRelease = null;
long executionTime = -1;
long started = -1;
ThreadControl nextToReleaseControl = null;
long current = System.currentTimeMillis();
boolean favourStartTime = false;
// Find the thread that expects to take the least time
for (Thread thread : threadToExecutionTime.keySet()) {
ThreadTimers thisThreadTimer = threadToExecutionTime.get(thread);
ThreadControl threadControl = threadToExecutionTime.get(thread);
// If there are threads that have been waiting over 2 seconds, favour the oldest thread
if (thisThreadTimer.started + 2000 < current) {
if (threadControl.started + 2000 < current) {
favourStartTime = true;
}
if (nextToRelease == null) {
nextToRelease = thread;
executionTime = thisThreadTimer.expectedDuration;
started = thisThreadTimer.started;
nextToReleaseControl = threadControl;
} else {
if (favourStartTime) {
// Find the oldest thread
if (thisThreadTimer.started < started) {
if (threadControl.started < nextToReleaseControl.started) {
nextToRelease = thread;
executionTime = thisThreadTimer.expectedDuration;
started = thisThreadTimer.started;
nextToReleaseControl = threadControl;
}
} else if (thisThreadTimer.expectedDuration < executionTime) {
} else if (threadControl.expectedDuration < nextToReleaseControl.expectedDuration) {
nextToRelease = thread;
executionTime = thisThreadTimer.expectedDuration;
started = thisThreadTimer.started;
nextToReleaseControl = threadControl;
}
}
}
@ -489,9 +487,7 @@ public class CachingRDFServiceExecutor<T> {
if (nextToRelease != null) {
threadToExecutionTime.remove(nextToRelease);
executingThreads.add(nextToRelease);
synchronized (nextToRelease) {
nextToRelease.notifyAll();
}
nextToReleaseControl.latch.countDown();
}
}
}