From 2ca805b4ffe0b0dcbca73d5344597c89cd2f8241 Mon Sep 17 00:00:00 2001 From: Graham Triggs Date: Thu, 28 Jul 2016 22:39:05 +0100 Subject: [PATCH] Threading improvements (part 2) --- .../utilities/CachingRDFServiceExecutor.java | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java index f0bf5049..bd211d7d 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/visualization/utilities/CachingRDFServiceExecutor.java @@ -11,6 +11,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; @@ -380,18 +381,19 @@ public class CachingRDFServiceExecutor { public static class Affinity { private final int maxThreads = 1; - static class ThreadTimers { - ThreadTimers(long started, long expectedDuration) { + static class ThreadControl { + ThreadControl(long started, long expectedDuration) { this.started = started; this.expectedDuration = expectedDuration; } final long started; final long expectedDuration; + final CountDownLatch latch = new CountDownLatch(1); } // Map of executing threads, and the time they expect to need to execute - private final Map threadToExecutionTime = new HashMap<>(); + private final Map threadToExecutionTime = new HashMap<>(); private final Set executingThreads = new HashSet<>(); /** @@ -402,15 +404,13 @@ public class CachingRDFServiceExecutor { Thread executingThread = Thread.currentThread(); // Ask if the task needs to be queued - if (queueThis(executingThread, expectedExecutionTime)) { - // Synchronize the thread to call wait - synchronized (executingThread) { - try { - // Make the thread wait until it is notified to continue - executingThread.wait(); - } catch(InterruptedException e) { - // No need to execute - } + CountDownLatch latch = queueThis(executingThread, expectedExecutionTime); + + // We got a latch from the queue, so wait for it to clear + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException e) { } } } @@ -421,20 +421,22 @@ public class CachingRDFServiceExecutor { * @param time start time of the thread * @return true if the thread needs to wait, false if it can continue */ - private synchronized boolean queueThis(Thread thread, Long time) { + private synchronized CountDownLatch queueThis(Thread thread, Long time) { // If we have fewer that the max threads running if (executingThreads.size() < maxThreads) { // Add thread to executing set executingThreads.add(thread); // Not queued - we can continue - return false; + return null; } else { - // Add the thread to the map - threadToExecutionTime.put(thread, new ThreadTimers(System.currentTimeMillis(), time)); + ThreadControl control = new ThreadControl(System.currentTimeMillis(), time); - // Let the caller know that we are queued - return true; + // Add the thread to the map + threadToExecutionTime.put(thread, control); + + // Give the caller a handle to the latch for the queued thread + return control.latch; } } @@ -451,36 +453,32 @@ public class CachingRDFServiceExecutor { // If there are still threads to execute, and we have not exhausted maximum threads while (threadToExecutionTime.size() > 0 && executingThreads.size() < maxThreads) { Thread nextToRelease = null; - long executionTime = -1; - long started = -1; + ThreadControl nextToReleaseControl = null; long current = System.currentTimeMillis(); boolean favourStartTime = false; // Find the thread that expects to take the least time for (Thread thread : threadToExecutionTime.keySet()) { - ThreadTimers thisThreadTimer = threadToExecutionTime.get(thread); + ThreadControl threadControl = threadToExecutionTime.get(thread); // If there are threads that have been waiting over 2 seconds, favour the oldest thread - if (thisThreadTimer.started + 2000 < current) { + if (threadControl.started + 2000 < current) { favourStartTime = true; } if (nextToRelease == null) { nextToRelease = thread; - executionTime = thisThreadTimer.expectedDuration; - started = thisThreadTimer.started; + nextToReleaseControl = threadControl; } else { if (favourStartTime) { // Find the oldest thread - if (thisThreadTimer.started < started) { + if (threadControl.started < nextToReleaseControl.started) { nextToRelease = thread; - executionTime = thisThreadTimer.expectedDuration; - started = thisThreadTimer.started; + nextToReleaseControl = threadControl; } - } else if (thisThreadTimer.expectedDuration < executionTime) { + } else if (threadControl.expectedDuration < nextToReleaseControl.expectedDuration) { nextToRelease = thread; - executionTime = thisThreadTimer.expectedDuration; - started = thisThreadTimer.started; + nextToReleaseControl = threadControl; } } } @@ -489,9 +487,7 @@ public class CachingRDFServiceExecutor { if (nextToRelease != null) { threadToExecutionTime.remove(nextToRelease); executingThreads.add(nextToRelease); - synchronized (nextToRelease) { - nextToRelease.notifyAll(); - } + nextToReleaseControl.latch.countDown(); } } }