diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java index fbc8748d1..35aa1943e 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java @@ -13,13 +13,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.cryptomator.common.UncheckedInterruptedException; @@ -30,35 +26,16 @@ import org.cryptomator.common.UncheckedInterruptedException; */ class FifoParallelDataProcessor { - private final BlockingQueue processedData = new PriorityBlockingQueue<>(); - private final AtomicLong jobSequence = new AtomicLong(); - private final BlockingQueue workQueue; + private final BlockingQueue> processedData; private final ExecutorService executorService; /** * @param numThreads How many jobs can run in parallel. - * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}. + * @param workAhead Maximum number of jobs accepted in {@link #submit(Callable)} without blocking until results are polled from {@link #processedData()}. */ - public FifoParallelDataProcessor(int numThreads, int workQueueSize) { - this.workQueue = new ArrayBlockingQueue<>(workQueueSize); - this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, workQueue, this::rejectedExecution); - } - - /** - * Enqueues tasks into the blocking queue, if they can not be executed immediately. - * - * @see ThreadPoolExecutor#execute(Runnable) - * @see RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor) - */ - private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - if (executor.isShutdown()) { - throw new RejectedExecutionException("Executor has been shut down."); - } - try { - this.workQueue.put(r); - } catch (InterruptedException e) { - throw new UncheckedInterruptedException(e); - } + public FifoParallelDataProcessor(int numThreads, int workAhead) { + this.processedData = new ArrayBlockingQueue<>(workAhead); + this.executorService = Executors.newFixedThreadPool(numThreads); } /** @@ -70,7 +47,7 @@ class FifoParallelDataProcessor { void submit(Callable processingJob) throws InterruptedException { try { Future future = executorService.submit(processingJob); - processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement())); + processedData.put(future); } catch (UncheckedInterruptedException e) { throw e.getCause(); } @@ -94,36 +71,15 @@ class FifoParallelDataProcessor { * @throws InterruptedException If the calling thread was interrupted while waiting for the next result. */ T processedData() throws InterruptedException { - return processedData.take().get(); - } - - private class SequencedFutureResult implements Comparable { - - private final Future result; - private final long sequenceNumber; - - public SequencedFutureResult(Future result, long sequenceNumber) { - this.result = result; - this.sequenceNumber = sequenceNumber; - } - - public T get() throws InterruptedException { - try { - return result.get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new RuntimeException(e); - } + try { + return processedData.take().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); } } - - @Override - public int compareTo(SequencedFutureResult other) { - return Long.compare(this.sequenceNumber, other.sequenceNumber); - } - } } diff --git a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java index 28a6e4745..045cbe950 100644 --- a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java +++ b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java @@ -63,36 +63,33 @@ public class FifoParallelDataProcessorTest { @Test public void testBlockingBehaviour() throws InterruptedException { FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); - processor.submit(new IntegerJob(100, 1)); // runs immediatley - processor.submit(new IntegerJob(100, 2)); // #1 in queue + processor.submitPreprocessed(1); // #1 in queue Thread t1 = new Thread(() -> { try { - processor.submit(new IntegerJob(10, 3)); // #2 in queue + processor.submitPreprocessed(2); // #2 in queue } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); t1.start(); t1.join(10); - // job 3 should not have been submitted by now, thus t1 is still alive + // job 2 should not have been submitted by now, thus t1 is still alive Assert.assertTrue(t1.isAlive()); Assert.assertEquals(1, (int) processor.processedData()); Assert.assertEquals(2, (int) processor.processedData()); - Assert.assertEquals(3, (int) processor.processedData()); - Assert.assertFalse(t1.isAlive()); + t1.join(); } @Test public void testInterruptionDuringSubmission() throws InterruptedException { FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); - processor.submit(new IntegerJob(100, 1)); // runs immediatley - processor.submit(new IntegerJob(100, 2)); // #1 in queue + processor.submitPreprocessed(1); // #1 in queue final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false); Thread t1 = new Thread(() -> { try { - processor.submit(new IntegerJob(10, 3)); // #2 in queue + processor.submitPreprocessed(2); // #2 in queue } catch (InterruptedException e) { interruptedExceptionThrown.set(true); Thread.currentThread().interrupt(); @@ -102,11 +99,10 @@ public class FifoParallelDataProcessorTest { t1.join(10); t1.interrupt(); t1.join(10); - // job 3 should not have been submitted by now, thus t1 is still alive + // job 2 should not have been submitted by now, thus t1 is still alive Assert.assertFalse(t1.isAlive()); Assert.assertTrue(interruptedExceptionThrown.get()); Assert.assertEquals(1, (int) processor.processedData()); - Assert.assertEquals(2, (int) processor.processedData()); } private static class IntegerJob implements Callable { diff --git a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FileContentCryptorTest.java b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FileContentCryptorTest.java index c59ce7b4e..a0bca22c1 100644 --- a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FileContentCryptorTest.java +++ b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FileContentCryptorTest.java @@ -104,7 +104,7 @@ public class FileContentCryptorTest { Assert.assertArrayEquals("cleartext message".getBytes(), result); } - @Test + @Test(timeout = 20000) // assuming a minimum speed of 10mb/s during encryption and decryption 20s should be enough public void testEncryptionAndDecryptionSpeed() throws InterruptedException, IOException { final byte[] keyBytes = new byte[32]; final SecretKey encryptionKey = new SecretKeySpec(keyBytes, "AES"); @@ -112,49 +112,59 @@ public class FileContentCryptorTest { final FileContentCryptor cryptor = new FileContentCryptorImpl(encryptionKey, macKey, RANDOM_MOCK); final Path tmpFile = Files.createTempFile("encrypted", ".tmp"); + final Thread fileWriter; final ByteBuffer header; final long encStart = System.nanoTime(); - try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty()); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) { - final ByteBuffer cleartext = ByteBuffer.allocate(32768); // 32k - ByteBuffer ciphertext; - for (int i = 0; i < 4096; i++) { // 128M total + try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty())) { + fileWriter = new Thread(() -> { + try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) { + ByteBuffer ciphertext; + while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) { + fc.write(ciphertext); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + fileWriter.start(); + + final ByteBuffer cleartext = ByteBuffer.allocate(100000); // 100k + for (int i = 0; i < 1000; i++) { // 100M total cleartext.rewind(); encryptor.append(cleartext); - if (i > Runtime.getRuntime().availableProcessors()) { - ciphertext = encryptor.ciphertext(); - Assert.assertEquals(32 * 1024 + 32, ciphertext.remaining()); - fc.write(ciphertext); - } } encryptor.append(FileContentCryptor.EOF); - while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) { - fc.write(ciphertext); - } header = encryptor.getHeader(); } + fileWriter.join(); final long encEnd = System.nanoTime(); - LOG.debug("Encryption of 128M took {}ms", (encEnd - encStart) / 1000 / 1000); + LOG.debug("Encryption of 100M took {}ms", (encEnd - encStart) / 1000 / 1000); + final Thread fileReader; final long decStart = System.nanoTime(); - try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) { - final ByteBuffer ciphertext = ByteBuffer.allocate(654321); - ByteBuffer cleartext; - for (int i = 0; fc.read(ciphertext) != -1; i++) { - ciphertext.flip(); - decryptor.append(ciphertext); - ciphertext.clear(); - if (i > Runtime.getRuntime().availableProcessors()) { - cleartext = decryptor.cleartext(); - Assert.assertTrue(cleartext.hasRemaining()); + try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header)) { + fileReader = new Thread(() -> { + try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) { + ByteBuffer ciphertext = ByteBuffer.allocate(654321); + while (fc.read(ciphertext) != -1) { + ciphertext.flip(); + decryptor.append(ciphertext); + ciphertext.clear(); + } + decryptor.append(FileContentCryptor.EOF); + } catch (Exception e) { + e.printStackTrace(); } - } - decryptor.append(FileContentCryptor.EOF); + }); + fileReader.start(); + while (decryptor.cleartext() != FileContentCryptor.EOF) { // no-op } } + fileReader.join(); final long decEnd = System.nanoTime(); - LOG.debug("Decryption of 128M took {}ms", (decEnd - decStart) / 1000 / 1000); + LOG.debug("Decryption of 100M took {}ms", (decEnd - decStart) / 1000 / 1000); Files.delete(tmpFile); } }