From c3652a22a04d542a7a01e8363c40a0412ff5deed Mon Sep 17 00:00:00 2001 From: Sebastian Stenzel Date: Fri, 1 Jan 2016 14:05:41 +0100 Subject: [PATCH] reject execution when shut down --- .../impl/FifoParallelDataProcessor.java | 39 ++++++++----------- .../impl/FifoParallelDataProcessorTest.java | 13 +++++++ 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java index a4efa5745..fbc8748d1 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java @@ -15,6 +15,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -22,12 +24,9 @@ import java.util.concurrent.atomic.AtomicLong; import org.cryptomator.common.UncheckedInterruptedException; /** - * Executes long-running computations and returns the result strictly in order - * of the job submissions, no matter how long each job takes. + * Executes long-running computations and returns the result strictly in order of the job submissions, no matter how long each job takes. * - * The internally used thread pool is shut down automatically as soon as this - * FifiParallelDataProcessor is no longer referenced (see Finalization behaviour - * of {@link ThreadPoolExecutor}). + * The internally used thread pool is shut down automatically as soon as this FifiParallelDataProcessor is no longer referenced (see Finalization behaviour of {@link ThreadPoolExecutor}). */ class FifoParallelDataProcessor { @@ -37,11 +36,8 @@ class FifoParallelDataProcessor { private final ExecutorService executorService; /** - * @param numThreads - * How many jobs can run in parallel. - * @param workQueueSize - * Maximum number of jobs accepted without blocking, when no - * results are polled from {@link #processedData()}. + * @param numThreads How many jobs can run in parallel. + * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}. */ public FifoParallelDataProcessor(int numThreads, int workQueueSize) { this.workQueue = new ArrayBlockingQueue<>(workQueueSize); @@ -49,12 +45,15 @@ class FifoParallelDataProcessor { } /** - * Enqueues tasks into the blocking queue, if they can not be executed - * immediately. + * Enqueues tasks into the blocking queue, if they can not be executed immediately. * * @see ThreadPoolExecutor#execute(Runnable) + * @see RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor) */ private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("Executor has been shut down."); + } try { this.workQueue.put(r); } catch (InterruptedException e) { @@ -63,11 +62,9 @@ class FifoParallelDataProcessor { } /** - * Enqueues a job for execution. The results of multiple submissions can be - * polled in FIFO order using {@link #processedData()}. + * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}. * - * @param processingJob - * A task, that will compute a result. + * @param processingJob A task, that will compute a result. * @throws InterruptedException */ void submit(Callable processingJob) throws InterruptedException { @@ -80,8 +77,7 @@ class FifoParallelDataProcessor { } /** - * Submits already pre-processed data, that can be polled in FIFO order from - * {@link #processedData()}. + * Submits already pre-processed data, that can be polled in FIFO order from {@link #processedData()}. * * @throws InterruptedException */ @@ -92,13 +88,10 @@ class FifoParallelDataProcessor { } /** - * Result of previously {@link #submit(Callable) submitted} jobs in the same - * order as they have been submitted. Blocks if the job didn't finish yet. + * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet. * * @return Next job result - * @throws InterruptedException - * If the calling thread was interrupted while waiting for the - * next result. + * @throws InterruptedException If the calling thread was interrupted while waiting for the next result. */ T processedData() throws InterruptedException { return processedData.take().get(); diff --git a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java index 418c4155c..28a6e4745 100644 --- a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java +++ b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java @@ -8,7 +8,10 @@ *******************************************************************************/ package org.cryptomator.crypto.engine.impl; +import java.lang.reflect.Field; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -29,6 +32,16 @@ public class FifoParallelDataProcessorTest { processor.processedData(); } + @Test(expected = RejectedExecutionException.class) + public void testRejectExecutionAfterShutdown() throws InterruptedException, ReflectiveOperationException, SecurityException { + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + Field field = FifoParallelDataProcessor.class.getDeclaredField("executorService"); + field.setAccessible(true); + ExecutorService executorService = (ExecutorService) field.get(processor); + executorService.shutdownNow(); + processor.submit(new IntegerJob(0, 1)); + } + @Test public void testStrictFifoOrder() throws InterruptedException { FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(4, 10);