diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java index 3089aba16..104dbe659 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java @@ -506,6 +506,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { throw (RuntimeException) cause; } else { LOG.error("Unexpected exception", e); + throw new RuntimeException(cause); } } finally { destroyQuietly(fileKey); @@ -730,6 +731,7 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { throw (RuntimeException) cause; } else { LOG.error("Unexpected exception", e); + throw new RuntimeException(cause); } } finally { destroyQuietly(fileKey); diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java index 35158c6fd..57bde5bb9 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java @@ -32,7 +32,6 @@ class CryptoWorkerExecutor { private final BlockingQueue inputQueue; private final ExecutorService executorService; private final Future allWork; - private volatile boolean acceptWork; /** * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}. @@ -44,7 +43,6 @@ class CryptoWorkerExecutor { this.currentBlock = new AtomicLong(); this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead this.executorService = Executors.newFixedThreadPool(numWorkers); - this.acceptWork = true; // start workers: final CompletionService completionService = new ExecutorCompletionService<>(executorService); @@ -64,15 +62,15 @@ class CryptoWorkerExecutor { * @return true if the work has been added in time. false in any other case. */ public boolean offer(BlocksData data, long timeout, TimeUnit unit) { - if (!acceptWork) { + if (allWork.isDone()) { return false; } try { final boolean success = inputQueue.offer(data, timeout, unit); if (!success) { - this.acceptWork = false; + LOG.error("inputQueue is full."); inputQueue.clear(); - poisonWorkers(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + allWork.cancel(true); } return success; } catch (InterruptedException e) { @@ -91,32 +89,36 @@ class CryptoWorkerExecutor { * @throws ExecutionException If any of the workers failed. */ public void waitUntilDone(long timeout, TimeUnit unit) throws ExecutionException { - this.acceptWork = false; try { if (allWork.isDone()) { // Work is done before workers being poisoned? This will most likely throw an ExecutionException: allWork.get(); + } else if (!poisonWorkers(timeout, unit)) { + // Attempt to enqueue poison pill for all workers failed: + allWork.cancel(true); } else { - // Work not done yet, enqueue poison pill and wait for workers to finish: - poisonWorkers(timeout, unit); + // All poisons enqueued successfully. Now wait for termination by poison or exception: allWork.get(); } } catch (InterruptedException e) { LOG.error("Interrupted thread.", e); Thread.currentThread().interrupt(); + } catch (CancellationException e) { + throw new ExecutionException("Work canceled", e); } finally { - // shutdown either after normal decryption or if ANY worker threw an exception: + // in any case (normal or exceptional execution): shutdown executor including all workers and supervisor: executorService.shutdownNow(); } } - private void poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException { - // add enough poison for each worker: + private boolean poisonWorkers(long timeout, TimeUnit unit) throws InterruptedException { + // add enough poison for each worker; each worker will consume excatly one: for (int i = 0; i < numWorkers; i++) { if (!inputQueue.offer(CryptoWorker.POISON, timeout, unit)) { - break; + return false; } } + return true; } @FunctionalInterface