From 80e118532563a7f99557eae902308f0c78de38d6 Mon Sep 17 00:00:00 2001 From: Sebastian Stenzel Date: Sun, 20 Dec 2015 12:42:02 +0100 Subject: [PATCH] Adding blocking behaviour when appending data to be en/decrypted. Using composite instead of inheritance for FileContentDecryptorImpl and FileContentEncryptorImpl --- .../crypto/engine/FileContentDecryptor.java | 2 +- .../crypto/engine/FileContentEncryptor.java | 2 +- .../impl/AbstractFileContentProcessor.java | 60 -------- .../engine/impl/BytesWithSequenceNumber.java | 31 ----- .../impl/FifoParallelDataProcessor.java | 128 ++++++++++++++++++ .../engine/impl/FileContentDecryptorImpl.java | 25 ++-- .../engine/impl/FileContentEncryptorImpl.java | 25 ++-- .../crypto/fs/CryptoReadableFile.java | 24 ++-- .../crypto/fs/CryptoWritableFile.java | 8 +- .../impl/FifoParallelDataProcessorTest.java | 96 +++++++++++++ 10 files changed, 268 insertions(+), 133 deletions(-) delete mode 100644 main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/AbstractFileContentProcessor.java delete mode 100644 main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/BytesWithSequenceNumber.java create mode 100644 main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java create mode 100644 main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentDecryptor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentDecryptor.java index ca1949c07..6bd1f9e6c 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentDecryptor.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentDecryptor.java @@ -21,7 +21,7 @@ public interface FileContentDecryptor extends Destroyable, Closeable { * @param cleartext Cleartext data or {@link FileContentCryptor#EOF} to indicate the end of a ciphertext. * @see #skipToPosition(long) */ - void append(ByteBuffer ciphertext); + void append(ByteBuffer ciphertext) throws InterruptedException; /** * Returns the next decrypted cleartext in byte-by-byte FIFO order, meaning in the order ciphertext has been appended to this encryptor. diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentEncryptor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentEncryptor.java index bc5804010..ac60d8bb0 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentEncryptor.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/FileContentEncryptor.java @@ -23,7 +23,7 @@ public interface FileContentEncryptor extends Destroyable, Closeable { * * @param cleartext Cleartext data or {@link FileContentCryptor#EOF} to indicate the end of a cleartext. */ - void append(ByteBuffer cleartext); + void append(ByteBuffer cleartext) throws InterruptedException; /** * Returns the next ciphertext in byte-by-byte FIFO order, meaning in the order cleartext has been appended to this encryptor. diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/AbstractFileContentProcessor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/AbstractFileContentProcessor.java deleted file mode 100644 index acb754cd8..000000000 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/AbstractFileContentProcessor.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.cryptomator.crypto.engine.impl; - -import java.io.Closeable; -import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang3.concurrent.ConcurrentUtils; - -abstract class AbstractFileContentProcessor implements Closeable { - - private static final int NUM_WORKERS = Runtime.getRuntime().availableProcessors(); - private static final int READ_AHEAD = 0; - - private final BlockingQueue processedData = new PriorityBlockingQueue<>(); - private final BlockingQueue workQueue = new ArrayBlockingQueue<>(NUM_WORKERS + READ_AHEAD); - private final ExecutorService executorService = new ThreadPoolExecutor(1, NUM_WORKERS, 1, TimeUnit.SECONDS, workQueue); - private final AtomicLong jobSequence = new AtomicLong(); - - /** - * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}. - * - * @param processingJob A ByteBuffer-generating task. - */ - protected void submit(Callable processingJob) { - Future result = executorService.submit(processingJob); - processedData.offer(new BytesWithSequenceNumber(result, jobSequence.getAndIncrement())); - } - - /** - * Submits already processed data, that can be polled in FIFO order from {@link #processedData()}. - */ - protected void submitPreprocessed(ByteBuffer preprocessedData) { - Future resolvedFuture = ConcurrentUtils.constantFuture(preprocessedData); - processedData.offer(new BytesWithSequenceNumber(resolvedFuture, jobSequence.getAndIncrement())); - } - - /** - * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet. - * - * @return Next job result - * @throws InterruptedException If the calling thread was interrupted while waiting for the next result. - */ - protected ByteBuffer processedData() throws InterruptedException { - return processedData.take().get(); - } - - @Override - public void close() { - executorService.shutdown(); - } - -} diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/BytesWithSequenceNumber.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/BytesWithSequenceNumber.java deleted file mode 100644 index b835c475f..000000000 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/BytesWithSequenceNumber.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.cryptomator.crypto.engine.impl; - -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -class BytesWithSequenceNumber implements Comparable { - - private final Future byteBuffer; - private final long sequenceNumber; - - public BytesWithSequenceNumber(Future byteBuffer, long sequenceNumber) { - this.byteBuffer = byteBuffer; - this.sequenceNumber = sequenceNumber; - } - - public ByteBuffer get() throws InterruptedException { - try { - return byteBuffer.get(); - } catch (ExecutionException e) { - assert e.getCause() instanceof RuntimeException; - throw (RuntimeException) e.getCause(); - } - } - - @Override - public int compareTo(BytesWithSequenceNumber other) { - return Long.compare(this.sequenceNumber, other.sequenceNumber); - } - -} \ No newline at end of file diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java new file mode 100644 index 000000000..f63045016 --- /dev/null +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessor.java @@ -0,0 +1,128 @@ +package org.cryptomator.crypto.engine.impl; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Executes long-running computations and returns the result strictly in order of the job submissions, no matter how long each job takes. + * + * The internally used thread pool is shut down automatically as soon as this FifiParallelDataProcessor is no longer referenced (see Finalization behaviour of {@link ThreadPoolExecutor}). + */ +class FifoParallelDataProcessor { + + private final BlockingQueue processedData = new PriorityBlockingQueue<>(); + private final AtomicLong jobSequence = new AtomicLong(); + private final BlockingQueue workQueue; + private final ExecutorService executorService; + + /** + * @param numThreads How many jobs can run in parallel. + * @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}. + */ + public FifoParallelDataProcessor(int numThreads, int workQueueSize) { + this.workQueue = new ArrayBlockingQueue<>(workQueueSize); + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, workQueue, this::rejectedExecution); + } + + /** + * Enqueues tasks into the blocking queue, if they can not be executed immediately. + * + * @see ThreadPoolExecutor#execute(Runnable) + */ + private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + this.workQueue.put(r); + } catch (InterruptedException e) { + throw new SneakyInterruptedException(e); + } + } + + /** + * Enqueues a job for execution. The results of multiple submissions can be polled in FIFO order using {@link #processedData()}. + * + * @param processingJob A task, that will compute a result. + * @throws InterruptedException + */ + void submit(Callable processingJob) throws InterruptedException { + try { + Future future = executorService.submit(processingJob); + processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement())); + } catch (SneakyInterruptedException e) { + throw e.getCause(); + } + } + + /** + * Submits already pre-processed data, that can be polled in FIFO order from {@link #processedData()}. + * + * @throws InterruptedException + */ + void submitPreprocessed(T preprocessedData) throws InterruptedException { + this.submit(() -> { + return preprocessedData; + }); + } + + /** + * Result of previously {@link #submit(Callable) submitted} jobs in the same order as they have been submitted. Blocks if the job didn't finish yet. + * + * @return Next job result + * @throws InterruptedException If the calling thread was interrupted while waiting for the next result. + */ + T processedData() throws InterruptedException { + return processedData.take().get(); + } + + private class SequencedFutureResult implements Comparable { + + private final Future result; + private final long sequenceNumber; + + public SequencedFutureResult(Future result, long sequenceNumber) { + this.result = result; + this.sequenceNumber = sequenceNumber; + } + + public T get() throws InterruptedException { + try { + return result.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + + @Override + public int compareTo(SequencedFutureResult other) { + return Long.compare(this.sequenceNumber, other.sequenceNumber); + } + + } + + private static class SneakyInterruptedException extends RuntimeException { + + private static final long serialVersionUID = 331817765088138556L; + + public SneakyInterruptedException(InterruptedException cause) { + super(cause); + } + + @Override + public InterruptedException getCause() { + return (InterruptedException) super.getCause(); + } + + } + +} diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java index cf8ef471b..4706a1691 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentDecryptorImpl.java @@ -23,7 +23,7 @@ import org.cryptomator.crypto.engine.FileContentCryptor; import org.cryptomator.crypto.engine.FileContentDecryptor; import org.cryptomator.io.ByteBuffers; -class FileContentDecryptorImpl extends AbstractFileContentProcessor implements FileContentDecryptor { +class FileContentDecryptorImpl implements FileContentDecryptor { private static final String AES = "AES"; private static final int AES_BLOCK_LENGTH_IN_BYTES = 16; @@ -31,7 +31,10 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F private static final String HMAC_SHA256 = "HmacSHA256"; private static final int CHUNK_SIZE = 32 * 1024; private static final int MAC_SIZE = 32; + private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors(); + private static final int READ_AHEAD = 2; + private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD); private final ThreadLocal hmacSha256; private final SecretKey contentKey; private final byte[] nonce; @@ -101,7 +104,7 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F } @Override - public void append(ByteBuffer ciphertext) { + public void append(ByteBuffer ciphertext) throws InterruptedException { if (ciphertext == FileContentCryptor.EOF) { submitCiphertextBuffer(); submitEof(); @@ -113,26 +116,26 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F } } - private void submitCiphertextBufferIfFull() { + private void submitCiphertextBufferIfFull() throws InterruptedException { if (!ciphertextBuffer.hasRemaining()) { submitCiphertextBuffer(); ciphertextBuffer = ByteBuffer.allocate(CHUNK_SIZE + MAC_SIZE); } } - private void submitCiphertextBuffer() { + private void submitCiphertextBuffer() throws InterruptedException { ciphertextBuffer.flip(); Callable encryptionJob = new DecryptionJob(ciphertextBuffer, chunkNumber++); - submit(encryptionJob); + dataProcessor.submit(encryptionJob); } - private void submitEof() { - submitPreprocessed(FileContentCryptor.EOF); + private void submitEof() throws InterruptedException { + dataProcessor.submitPreprocessed(FileContentCryptor.EOF); } @Override public ByteBuffer cleartext() throws InterruptedException { - return processedData(); + return dataProcessor.processedData(); } @Override @@ -154,12 +157,6 @@ class FileContentDecryptorImpl extends AbstractFileContentProcessor implements F } } - @Override - public void close() { - this.destroy(); - super.close(); - } - private class DecryptionJob implements Callable { private final ByteBuffer ciphertextChunk; diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java index df195a1f2..4d4de84be 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/engine/impl/FileContentEncryptorImpl.java @@ -24,14 +24,17 @@ import org.cryptomator.crypto.engine.FileContentCryptor; import org.cryptomator.crypto.engine.FileContentEncryptor; import org.cryptomator.io.ByteBuffers; -class FileContentEncryptorImpl extends AbstractFileContentProcessor implements FileContentEncryptor { +class FileContentEncryptorImpl implements FileContentEncryptor { private static final String AES = "AES"; private static final int AES_BLOCK_LENGTH_IN_BYTES = 16; private static final String AES_CBC = "AES/CBC/PKCS5Padding"; private static final String HMAC_SHA256 = "HmacSHA256"; private static final int CHUNK_SIZE = 32 * 1024; + private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors(); + private static final int READ_AHEAD = 2; + private final FifoParallelDataProcessor dataProcessor = new FifoParallelDataProcessor<>(NUM_THREADS, NUM_THREADS + READ_AHEAD); private final ThreadLocal hmacSha256; private final SecretKey headerKey; private final SecretKey contentKey; @@ -98,7 +101,7 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F } @Override - public void append(ByteBuffer cleartext) { + public void append(ByteBuffer cleartext) throws InterruptedException { cleartextBytesEncrypted.add(cleartext.remaining()); if (cleartext == FileContentCryptor.EOF) { submitCleartextBuffer(); @@ -111,26 +114,26 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F } } - private void submitCleartextBufferIfFull() { + private void submitCleartextBufferIfFull() throws InterruptedException { if (!cleartextBuffer.hasRemaining()) { submitCleartextBuffer(); cleartextBuffer = ByteBuffer.allocate(CHUNK_SIZE); } } - private void submitCleartextBuffer() { + private void submitCleartextBuffer() throws InterruptedException { cleartextBuffer.flip(); Callable encryptionJob = new EncryptionJob(cleartextBuffer, chunkNumber++); - submit(encryptionJob); + dataProcessor.submit(encryptionJob); } - private void submitEof() { - submitPreprocessed(FileContentCryptor.EOF); + private void submitEof() throws InterruptedException { + dataProcessor.submitPreprocessed(FileContentCryptor.EOF); } @Override public ByteBuffer ciphertext() throws InterruptedException { - return processedData(); + return dataProcessor.processedData(); } @Override @@ -152,12 +155,6 @@ class FileContentEncryptorImpl extends AbstractFileContentProcessor implements F } } - @Override - public void close() { - this.destroy(); - super.close(); - } - private class EncryptionJob implements Callable { private final ByteBuffer cleartextChunk; diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoReadableFile.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoReadableFile.java index 64b3ee6de..475a7fdba 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoReadableFile.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoReadableFile.java @@ -92,16 +92,20 @@ class CryptoReadableFile implements ReadableFile { public Void call() { file.read(EMPTY_BUFFER, startpos); int bytesRead = -1; - do { - ByteBuffer ciphertext = ByteBuffer.allocate(READ_BUFFER_SIZE); - file.read(ciphertext); - ciphertext.flip(); - bytesRead = ciphertext.remaining(); - if (bytesRead > 0) { - decryptor.append(ciphertext); - } - } while (bytesRead > 0); - decryptor.append(FileContentCryptor.EOF); + try { + do { + ByteBuffer ciphertext = ByteBuffer.allocate(READ_BUFFER_SIZE); + file.read(ciphertext); + ciphertext.flip(); + bytesRead = ciphertext.remaining(); + if (bytesRead > 0) { + decryptor.append(ciphertext); + } + } while (bytesRead > 0); + decryptor.append(FileContentCryptor.EOF); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } return null; } diff --git a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoWritableFile.java b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoWritableFile.java index 028d63be5..0b10792e7 100644 --- a/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoWritableFile.java +++ b/main/filesystem-crypto/src/main/java/org/cryptomator/crypto/fs/CryptoWritableFile.java @@ -40,8 +40,12 @@ class CryptoWritableFile implements WritableFile { final ByteBuffer cleartextCopy = ByteBuffer.allocate(source.remaining()); ByteBuffers.copy(source, cleartextCopy); cleartextCopy.flip(); - encryptor.append(cleartextCopy); - file.write(source); + try { + encryptor.append(cleartextCopy); + file.write(source); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } @Override diff --git a/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java new file mode 100644 index 000000000..d519cf416 --- /dev/null +++ b/main/filesystem-crypto/src/test/java/org/cryptomator/crypto/engine/impl/FifoParallelDataProcessorTest.java @@ -0,0 +1,96 @@ +package org.cryptomator.crypto.engine.impl; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; + +public class FifoParallelDataProcessorTest { + + @Test + public void testStrictFifoOrder() throws InterruptedException { + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(4, 10); + processor.submit(new IntegerJob(100, 1)); + processor.submit(new IntegerJob(50, 2)); + processor.submitPreprocessed(3); + processor.submit(new IntegerJob(10, 4)); + processor.submit(new IntegerJob(10, 5)); + processor.submitPreprocessed(6); + + Assert.assertEquals(1, (int) processor.processedData()); + Assert.assertEquals(2, (int) processor.processedData()); + Assert.assertEquals(3, (int) processor.processedData()); + Assert.assertEquals(4, (int) processor.processedData()); + Assert.assertEquals(5, (int) processor.processedData()); + Assert.assertEquals(6, (int) processor.processedData()); + } + + @Test + public void testBlockingBehaviour() throws InterruptedException { + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + processor.submit(new IntegerJob(100, 1)); // runs immediatley + processor.submit(new IntegerJob(100, 2)); // #1 in queue + + Thread t1 = new Thread(() -> { + try { + processor.submit(new IntegerJob(10, 3)); // #2 in queue + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t1.join(10); + // job 3 should not have been submitted by now, thus t1 is still alive + Assert.assertTrue(t1.isAlive()); + Assert.assertEquals(1, (int) processor.processedData()); + Assert.assertEquals(2, (int) processor.processedData()); + Assert.assertEquals(3, (int) processor.processedData()); + Assert.assertFalse(t1.isAlive()); + } + + @Test + public void testInterruptionDuringSubmission() throws InterruptedException { + FifoParallelDataProcessor processor = new FifoParallelDataProcessor<>(1, 1); + processor.submit(new IntegerJob(100, 1)); // runs immediatley + processor.submit(new IntegerJob(100, 2)); // #1 in queue + + final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false); + Thread t1 = new Thread(() -> { + try { + processor.submit(new IntegerJob(10, 3)); // #2 in queue + } catch (InterruptedException e) { + interruptedExceptionThrown.set(true); + Thread.currentThread().interrupt(); + } + }); + t1.start(); + t1.join(10); + t1.interrupt(); + t1.join(10); + // job 3 should not have been submitted by now, thus t1 is still alive + Assert.assertFalse(t1.isAlive()); + Assert.assertTrue(interruptedExceptionThrown.get()); + Assert.assertEquals(1, (int) processor.processedData()); + Assert.assertEquals(2, (int) processor.processedData()); + } + + private static class IntegerJob implements Callable { + + private final long waitMillis; + private final int result; + + public IntegerJob(long waitMillis, int result) { + this.waitMillis = waitMillis; + this.result = result; + } + + @Override + public Integer call() throws Exception { + Thread.sleep(waitMillis); + return result; + } + + } + +}