diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java index 9005e4e81..35158c6fd 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java @@ -1,6 +1,10 @@ package org.cryptomator.crypto.aes256; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -27,7 +31,7 @@ class CryptoWorkerExecutor { private final AtomicLong currentBlock; private final BlockingQueue inputQueue; private final ExecutorService executorService; - private final CompletionService completionService; + private final Future allWork; private volatile boolean acceptWork; /** @@ -40,14 +44,17 @@ class CryptoWorkerExecutor { this.currentBlock = new AtomicLong(); this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead this.executorService = Executors.newFixedThreadPool(numWorkers); - this.completionService = new ExecutorCompletionService<>(executorService); this.acceptWork = true; // start workers: + final CompletionService completionService = new ExecutorCompletionService<>(executorService); + final Collection> workers = new ArrayList<>(numWorkers); for (int i = 0; i < numWorkers; i++) { final CryptoWorker worker = workerFactory.createWorker(lock, blockDone, currentBlock, inputQueue); - completionService.submit(worker); + workers.add(completionService.submit(worker)); } + final Supervisor supervisor = new Supervisor(workers, completionService); + this.allWork = executorService.submit(supervisor); } /** @@ -79,22 +86,20 @@ class CryptoWorkerExecutor { /** * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions). * - * @param timeout Maximum time spent per worker to wait for a graceful shutdown (technically worst case is: 2 * numWorkers * time) + * @param timeout Maximum time spent per worker to wait for a graceful shutdown * @param unit Timeout unit * @throws ExecutionException If any of the workers failed. */ public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException { this.acceptWork = false; try { - // fail fast, if workers are done before being poisoned (i.e. exceptionally): - for (Future task = completionService.poll(); task != null; task = completionService.poll()) { - task.get(); // this will most likely throw an ExecutionException - } - // if we got to this point without any exception, all workers are still running, so lets poison them: - poisonWorkers(timeout, unit); - // now workers will one after another finish their work, potentially throwing an ExecutionException: - for (Future task = completionService.poll(timeout, unit); task != null; task = completionService.poll(timeout, unit)) { - task.get(); + if (allWork.isDone()) { + // Work is done before workers being poisoned? This will most likely throw an ExecutionException: + allWork.get(); + } else { + // Work not done yet, enqueue poison pill and wait for workers to finish: + poisonWorkers(timeout, unit); + allWork.get(); } } catch (InterruptedException e) { LOG.error("Interrupted thread.", e); @@ -119,4 +124,43 @@ class CryptoWorkerExecutor { CryptoWorker createWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue inputQueue); } + /** + * A supervisor watches the work results of a collection of workers. The supervisor waits for all workers to finish. + * The supvervisor itself does not cause any exceptions, but if one worker fails, all other workers are cancelled immediately and the exception propagates through this supvervisor. + * Anyone waiting for the supervisor to finish will thus effectively wait for all supvervisees to finish. + */ + private static class Supervisor implements Callable { + + private final Collection> workers; + private final CompletionService completionService; + + public Supervisor(Collection> workers, CompletionService completionService) { + this.workers = workers; + this.completionService = completionService; + } + + @Override + public Void call() throws ExecutionException { + try { + for (int i = 0; i < workers.size(); i++) { + try { + // any ExecutionException thrown here will propagate up (after work is canceled in finally block) + completionService.take().get(); + } catch (CancellationException ignore) { + } + } + } catch (InterruptedException e) { + // supervisor may be interrupted when executorservice is shut down. + Thread.currentThread().interrupt(); + } finally { + // make sure, that at the end of the day all remaining workers leave the building. + for (Future worker : workers) { + worker.cancel(true); + } + } + // no exception up to this point -> all workers finished work normally. + return null; + } + } + }