diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java index c091458db..3089aba16 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java @@ -497,7 +497,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { // wait for decryption workers to finish: try { - executor.waitUntilDone(); + executor.waitUntilDone(1, TimeUnit.SECONDS); } catch (ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof IOException) { @@ -721,7 +721,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { // wait for encryption workers to finish: try { - executor.waitUntilDone(); + executor.waitUntilDone(1, TimeUnit.SECONDS); } catch (ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof IOException) { diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java index 04f86a54e..a7f121669 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorker.java @@ -32,7 +32,6 @@ abstract class CryptoWorker implements Callable { while (!Thread.currentThread().isInterrupted()) { final BlocksData blocksData = queue.take(); if (blocksData == POISON) { - // put poison back in for other threads: break; } final ByteBuffer processedBytes = this.process(blocksData); @@ -52,6 +51,7 @@ abstract class CryptoWorker implements Callable { } } } catch (InterruptedException e) { + // will happen for executorService.shutdownNow() Thread.currentThread().interrupt(); } return null; diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java index ab8b391db..9005e4e81 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java @@ -6,6 +6,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -27,7 +28,7 @@ class CryptoWorkerExecutor { private final BlockingQueue inputQueue; private final ExecutorService executorService; private final CompletionService completionService; - private boolean acceptWork; + private volatile boolean acceptWork; /** * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}. @@ -64,7 +65,7 @@ class CryptoWorkerExecutor { if (!success) { this.acceptWork = false; inputQueue.clear(); - poisonWorkers(); + poisonWorkers(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } return success; } catch (InterruptedException e) { @@ -78,15 +79,22 @@ class CryptoWorkerExecutor { /** * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions). * + * @param timeout Maximum time spent per worker to wait for a graceful shutdown (technically worst case is: 2 * numWorkers * time) + * @param unit Timeout unit * @throws ExecutionException If any of the workers failed. */ - public void waitUntilDone() throws ExecutionException { + public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException { this.acceptWork = false; try { - poisonWorkers(); + // fail fast, if workers are done before being poisoned (i.e. exceptionally): + for (Future task = completionService.poll(); task != null; task = completionService.poll()) { + task.get(); // this will most likely throw an ExecutionException + } + // if we got to this point without any exception, all workers are still running, so lets poison them: + poisonWorkers(timeout, unit); // now workers will one after another finish their work, potentially throwing an ExecutionException: - for (int i = 0; i < numWorkers; i++) { - completionService.take().get(); + for (Future task = completionService.poll(timeout, unit); task != null; task = completionService.poll(timeout, unit)) { + task.get(); } } catch (InterruptedException e) { LOG.error("Interrupted thread.", e); @@ -97,10 +105,12 @@ class CryptoWorkerExecutor { } } - private void poisonWorkers() throws InterruptedException { + private void poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException { // add enough poison for each worker: for (int i = 0; i < numWorkers; i++) { - inputQueue.put(CryptoWorker.POISON); + if (!inputQueue.offer(CryptoWorker.POISON, timeout, unit)) { + break; + } } }