not spawning new threadpools for each individual encryption/decryption job anymore

This commit is contained in:
Sebastian Stenzel
2016-02-15 16:35:30 +01:00
parent ed0540e78f
commit 2725b6b920
4 changed files with 26 additions and 10 deletions

View File

@@ -13,7 +13,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@@ -31,9 +30,9 @@ class FifoParallelDataProcessor<T> {
* @param numThreads How many jobs can run in parallel.
* @param workAhead Maximum number of jobs accepted in {@link #submit(Callable)} without blocking until results are polled from {@link #processedData()}.
*/
public FifoParallelDataProcessor(int numThreads, int workAhead) {
public FifoParallelDataProcessor(int workAhead, ExecutorService executorService) {
this.processedData = new ArrayBlockingQueue<>(workAhead);
this.executorService = Executors.newFixedThreadPool(numThreads);
this.executorService = executorService;
}
/**

View File

@@ -19,6 +19,8 @@ import java.security.InvalidKeyException;
import java.security.MessageDigest;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.crypto.Cipher;
import javax.crypto.Mac;
@@ -37,8 +39,9 @@ class FileContentDecryptorImpl implements FileContentDecryptor {
private static final String HMAC_SHA256 = "HmacSHA256";
private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
private static final int READ_AHEAD = 2;
private static final ExecutorService SHARED_DECRYPTION_EXECUTOR = Executors.newFixedThreadPool(NUM_THREADS);
private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD);
private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS + READ_AHEAD, SHARED_DECRYPTION_EXECUTOR);
private final ThreadLocal<Mac> hmacSha256;
private final FileHeader header;
private final boolean authenticate;

View File

@@ -18,6 +18,8 @@ import java.security.InvalidKeyException;
import java.security.SecureRandom;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAdder;
import javax.crypto.Cipher;
@@ -37,8 +39,9 @@ class FileContentEncryptorImpl implements FileContentEncryptor {
private static final String HMAC_SHA256 = "HmacSHA256";
private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
private static final int READ_AHEAD = 2;
private static final ExecutorService SHARED_DECRYPTION_EXECUTOR = Executors.newFixedThreadPool(NUM_THREADS);
private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD);
private final FifoParallelDataProcessor<ByteBuffer> dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS + READ_AHEAD, SHARED_DECRYPTION_EXECUTOR);
private final ThreadLocalMac hmacSha256;
private final SecretKey headerKey;
private final FileHeader header;

View File

@@ -12,6 +12,7 @@ import java.lang.reflect.Field;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,7 +23,8 @@ public class FifoParallelDataProcessorTest {
@Test(expected = ExecutionException.class)
public void testRethrowsExceptionAsExecutionException() throws InterruptedException, ExecutionException {
FifoParallelDataProcessor<Object> processor = new FifoParallelDataProcessor<>(1, 1);
ExecutorService exec = Executors.newSingleThreadExecutor();
FifoParallelDataProcessor<Object> processor = new FifoParallelDataProcessor<>(1, exec);
try {
processor.submit(() -> {
throw new Exception("will be wrapped in a ExecutionException during 'processedData()'");
@@ -31,21 +33,25 @@ public class FifoParallelDataProcessorTest {
Assert.fail("Exception must not yet be thrown.");
}
processor.processedData();
exec.shutdownNow();
}
@Test(expected = RejectedExecutionException.class)
public void testRejectExecutionAfterShutdown() throws InterruptedException, ReflectiveOperationException, SecurityException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
ExecutorService exec = Executors.newSingleThreadExecutor();
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, exec);
Field field = FifoParallelDataProcessor.class.getDeclaredField("executorService");
field.setAccessible(true);
ExecutorService executorService = (ExecutorService) field.get(processor);
executorService.shutdownNow();
processor.submit(new IntegerJob(0, 1));
exec.shutdownNow();
}
@Test
public void testStrictFifoOrder() throws InterruptedException, ExecutionException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(4, 10);
ExecutorService exec = Executors.newFixedThreadPool(4);
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(10, exec);
processor.submit(new IntegerJob(100, 1));
processor.submit(new IntegerJob(50, 2));
processor.submitPreprocessed(3);
@@ -59,11 +65,13 @@ public class FifoParallelDataProcessorTest {
Assert.assertEquals(4, (int) processor.processedData());
Assert.assertEquals(5, (int) processor.processedData());
Assert.assertEquals(6, (int) processor.processedData());
exec.shutdownNow();
}
@Test
public void testBlockingBehaviour() throws InterruptedException, ExecutionException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
ExecutorService exec = Executors.newSingleThreadExecutor();
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, exec);
processor.submitPreprocessed(1); // #1 in queue
Thread t1 = new Thread(() -> {
@@ -80,11 +88,13 @@ public class FifoParallelDataProcessorTest {
Assert.assertEquals(1, (int) processor.processedData());
Assert.assertEquals(2, (int) processor.processedData());
t1.join();
exec.shutdownNow();
}
@Test
public void testInterruptionDuringSubmission() throws InterruptedException, ExecutionException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
ExecutorService exec = Executors.newSingleThreadExecutor();
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, exec);
processor.submitPreprocessed(1); // #1 in queue
final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false);
@@ -104,6 +114,7 @@ public class FifoParallelDataProcessorTest {
Assert.assertFalse(t1.isAlive());
Assert.assertTrue(interruptedExceptionThrown.get());
Assert.assertEquals(1, (int) processor.processedData());
exec.shutdownNow();
}
private static class IntegerJob implements Callable<Integer> {