From 2725b6b920a5986290c112ce6b1d3626d4e47995 Mon Sep 17 00:00:00 2001 From: Sebastian Stenzel Date: Mon, 15 Feb 2016 16:35:30 +0100 Subject: [PATCH] not spawning new threadpools for each individual encryption/decryption job anymore --- .../impl/FifoParallelDataProcessor.java | 5 ++--- .../engine/impl/FileContentDecryptorImpl.java | 5 ++++- .../engine/impl/FileContentEncryptorImpl.java | 5 ++++- .../impl/FifoParallelDataProcessorTest.java | 21 ++++++++++++++----- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java index dba921b82..811ca7556 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java @@ -13,7 +13,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -31,9 +30,9 @@ class FifoParallelDataProcessor { * @param numThreads How many jobs can run in parallel. * @param workAhead Maximum number of jobs accepted in {@link #submit(Callable)} without blocking until results are polled from {@link #processedData()}. */ - public FifoParallelDataProcessor(int numThreads, int workAhead) { + public FifoParallelDataProcessor(int workAhead, ExecutorService executorService) { this.processedData = new ArrayBlockingQueue<>(workAhead); - this.executorService = Executors.newFixedThreadPool(numThreads); + this.executorService = executorService; } /** diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java index af9e46061..0af4c1597 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java @@ -19,6 +19,8 @@ import java.security.InvalidKeyException; import java.security.MessageDigest; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.crypto.Cipher; import javax.crypto.Mac; @@ -37,8 +39,9 @@ class FileContentDecryptorImpl implements FileContentDecryptor { private static final String HMAC_SHA256 = "HmacSHA256"; private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors(); private static final int READ_AHEAD = 2; + private static final ExecutorService SHARED_DECRYPTION_EXECUTOR = Executors.newFixedThreadPool(NUM_THREADS); - private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD); + private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS + READ_AHEAD, SHARED_DECRYPTION_EXECUTOR); private final ThreadLocal hmacSha256; private final FileHeader header; private final boolean authenticate; diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java index e2927c224..88ea63c16 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java @@ -18,6 +18,8 @@ import java.security.InvalidKeyException; import java.security.SecureRandom; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAdder; import javax.crypto.Cipher; @@ -37,8 +39,9 @@ class FileContentEncryptorImpl implements FileContentEncryptor { private static final String HMAC_SHA256 = "HmacSHA256"; private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors(); private static final int READ_AHEAD = 2; + private static final ExecutorService SHARED_DECRYPTION_EXECUTOR = Executors.newFixedThreadPool(NUM_THREADS); - private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD); + private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS + READ_AHEAD, SHARED_DECRYPTION_EXECUTOR); private final ThreadLocalMac hmacSha256; private final SecretKey headerKey; private final FileHeader header; diff --git a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java index 56d0a4c87..69937f3cb 100644 --- a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java +++ b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java @@ -12,6 +12,7 @@ import java.lang.reflect.Field; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -22,7 +23,8 @@ public class FifoParallelDataProcessorTest { @Test(expected = ExecutionException.class) public void testRethrowsExceptionAsExecutionException() throws InterruptedException, ExecutionException { - FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + ExecutorService exec = Executors.newSingleThreadExecutor(); + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, exec); try { processor.submit(() -> { throw new Exception("will be wrapped in a ExecutionException during 'processedData()'"); @@ -31,21 +33,25 @@ public class FifoParallelDataProcessorTest { Assert.fail("Exception must not yet be thrown."); } processor.processedData(); + exec.shutdownNow(); } @Test(expected = RejectedExecutionException.class) public void testRejectExecutionAfterShutdown() throws InterruptedException, ReflectiveOperationException, SecurityException { - FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + ExecutorService exec = Executors.newSingleThreadExecutor(); + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, exec); Field field = FifoParallelDataProcessor.class.getDeclaredField("executorService"); field.setAccessible(true); ExecutorService executorService = (ExecutorService) field.get(processor); executorService.shutdownNow(); processor.submit(new IntegerJob(0, 1)); + exec.shutdownNow(); } @Test public void testStrictFifoOrder() throws InterruptedException, ExecutionException { - FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(4, 10); + ExecutorService exec = Executors.newFixedThreadPool(4); + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(10, exec); processor.submit(new IntegerJob(100, 1)); processor.submit(new IntegerJob(50, 2)); processor.submitPreprocessed(3); @@ -59,11 +65,13 @@ public class FifoParallelDataProcessorTest { Assert.assertEquals(4, (int) processor.processedData()); Assert.assertEquals(5, (int) processor.processedData()); Assert.assertEquals(6, (int) processor.processedData()); + exec.shutdownNow(); } @Test public void testBlockingBehaviour() throws InterruptedException, ExecutionException { - FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + ExecutorService exec = Executors.newSingleThreadExecutor(); + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, exec); processor.submitPreprocessed(1); // #1 in queue Thread t1 = new Thread(() -> { @@ -80,11 +88,13 @@ public class FifoParallelDataProcessorTest { Assert.assertEquals(1, (int) processor.processedData()); Assert.assertEquals(2, (int) processor.processedData()); t1.join(); + exec.shutdownNow(); } @Test public void testInterruptionDuringSubmission() throws InterruptedException, ExecutionException { - FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + ExecutorService exec = Executors.newSingleThreadExecutor(); + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, exec); processor.submitPreprocessed(1); // #1 in queue final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false); @@ -104,6 +114,7 @@ public class FifoParallelDataProcessorTest { Assert.assertFalse(t1.isAlive()); Assert.assertTrue(interruptedExceptionThrown.get()); Assert.assertEquals(1, (int) processor.processedData()); + exec.shutdownNow(); } private static class IntegerJob implements Callable {