From ecb178d5b24d16eaae4827e5ea1b3158732b5e14 Mon Sep 17 00:00:00 2001 From: Sebastian Stenzel Date: Sun, 2 Aug 2015 15:13:56 +0200 Subject: [PATCH] simplified code --- .../crypto/aes256/Aes256Cryptor.java | 174 +++++------------- .../crypto/aes256/CryptoWorkerExecutor.java | 112 +++++++++++ 2 files changed, 162 insertions(+), 124 deletions(-) create mode 100644 main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java index 3ffd812c2..990cf99b1 100644 --- a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/Aes256Cryptor.java @@ -21,21 +21,9 @@ import java.security.InvalidKeyException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.crypto.BadPaddingException; import javax.crypto.Cipher; @@ -428,22 +416,11 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { final Long fileSize = sensitiveHeaderContentBuf.getLong(); sensitiveHeaderContentBuf.get(fileKeyBytes); - // content decryption: - encryptedFile.position(104l); + // prepare content decryption: final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM); - - // prepare some crypto workers: - final int numWorkers = Runtime.getRuntime().availableProcessors(); - final Lock lock = new ReentrantLock(); - final Condition blockDone = lock.newCondition(); - final AtomicLong currentBlock = new AtomicLong(); - final BlockingQueue inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead final LengthLimitingOutputStream paddingRemovingOutputStream = new LengthLimitingOutputStream(plaintextFile, fileSize); - final List workers = new ArrayList<>(); - final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers); - final CompletionService completionService = new ExecutorCompletionService<>(executorService); - for (int i = 0; i < numWorkers; i++) { - final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) { + final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> { + return new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) { @Override protected Cipher initCipher(long startBlockNum) { @@ -483,48 +460,33 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { } }; - workers.add(worker); - completionService.submit(worker); - } + }); - // reading ciphered input and MACs interleaved: + // read as many blocks from file as possible, but wait if queue is full: + encryptedFile.position(104l); + final int maxNumBlocks = 64; + int numBlocks = 1; int bytesRead = 0; long blockNumber = 0; - try { - // read as many blocks from file as possible, but wait if queue is full: - final int maxNumBlocks = 128; - int numBlocks = 0; - do { - if (numBlocks < maxNumBlocks) { - numBlocks++; - } - final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32); - final ByteBuffer buf = ByteBuffer.allocate(inBufSize); - bytesRead = encryptedFile.read(buf); - buf.flip(); - final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32)); - final boolean consumedInTime = inputQueue.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS); - if (!consumedInTime) { - // interrupt read loop and make room for some poisons: - inputQueue.clear(); - break; - } - blockNumber += numBlocks; - } while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32)); - - // each worker has to swallow some poison: - for (int i = 0; i < numWorkers; i++) { - inputQueue.put(CryptoWorker.POISON); + do { + if (numBlocks < maxNumBlocks) { + numBlocks++; } - } catch (InterruptedException e) { - LOG.error("Thread interrupted", e); - } + final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32); + final ByteBuffer buf = ByteBuffer.allocate(inBufSize); + bytesRead = encryptedFile.read(buf); + buf.flip(); + final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32)); + final boolean consumedInTime = executor.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS); + if (!consumedInTime) { + break; + } + blockNumber += numBlocks; + } while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32)); // wait for decryption workers to finish: try { - for (int i = 0; i < numWorkers; i++) { - completionService.take().get(); - } + executor.waitUntilDone(); } catch (ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof IOException) { @@ -534,14 +496,10 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { } else { LOG.error("Unexpected exception", e); } - } catch (InterruptedException e) { - LOG.error("Thread interrupted", e); } finally { - // shutdown either after normal decryption or if ANY worker threw an exception: - executorService.shutdownNow(); + destroyQuietly(fileKey); } - destroyQuietly(fileKey); return paddingRemovingOutputStream.getBytesWritten(); } @@ -674,24 +632,10 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { headerBuf.limit(104); encryptedFile.write(headerBuf); - // add random length padding to obfuscate file length: - final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH); - final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding); - - // content encryption: + // prepare content encryption: final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM); - - // prepare some crypto workers: - final int numWorkers = Runtime.getRuntime().availableProcessors(); - final Lock lock = new ReentrantLock(); - final Condition blockDone = lock.newCondition(); - final AtomicLong currentBlock = new AtomicLong(); - final BlockingQueue inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead - final List workers = new ArrayList<>(); - final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers); - final CompletionService completionService = new ExecutorCompletionService<>(executorService); - for (int i = 0; i < numWorkers; i++) { - final EncryptWorker worker = new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) { + final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> { + return new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) { @Override protected Cipher initCipher(long startBlockNum) { @@ -725,49 +669,35 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { } } }; - workers.add(worker); - completionService.submit(worker); - } + }); - // writing ciphered output and MACs interleaved: + // read as many blocks from file as possible, but wait if queue is full: + final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH); + final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding); + final ReadableByteChannel channel = Channels.newChannel(in); int bytesRead = 0; long blockNumber = 0; - try { - final ReadableByteChannel channel = Channels.newChannel(in); - // read as many blocks from file as possible, but wait if queue is full: - final int maxNumBlocks = 128; - int numBlocks = 0; - do { - if (numBlocks < maxNumBlocks) { - numBlocks++; - } - final int inBufSize = numBlocks * CONTENT_MAC_BLOCK; - final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize); - bytesRead = channel.read(inBuf); - inBuf.flip(); - final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK); - final boolean consumedInTime = inputQueue.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS); - if (!consumedInTime) { - // interrupt read loop and make room for some poisons: - inputQueue.clear(); - break; - } - blockNumber += numBlocks; - } while (bytesRead == numBlocks * CONTENT_MAC_BLOCK); - - // each worker has to swallow some poison: - for (int i = 0; i < numWorkers; i++) { - inputQueue.put(CryptoWorker.POISON); + final int maxNumBlocks = 64; + int numBlocks = 0; + do { + if (numBlocks < maxNumBlocks) { + numBlocks++; } - } catch (InterruptedException e) { - LOG.error("Thread interrupted", e); - } + final int inBufSize = numBlocks * CONTENT_MAC_BLOCK; + final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize); + bytesRead = channel.read(inBuf); + inBuf.flip(); + final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK); + final boolean consumedInTime = executor.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS); + if (!consumedInTime) { + break; + } + blockNumber += numBlocks; + } while (bytesRead == numBlocks * CONTENT_MAC_BLOCK); // wait for encryption workers to finish: try { - for (int i = 0; i < numWorkers; i++) { - completionService.take().get(); - } + executor.waitUntilDone(); } catch (ExecutionException e) { final Throwable cause = e.getCause(); if (cause instanceof IOException) { @@ -777,13 +707,9 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration { } else { LOG.error("Unexpected exception", e); } - } catch (InterruptedException e) { - LOG.error("Thread interrupted", e); } finally { - // shutdown either after normal encryption or if ANY worker threw an exception: - executorService.shutdownNow(); + destroyQuietly(fileKey); } - destroyQuietly(fileKey); // create and write header: final long plaintextSize = in.getRealInputLength(); diff --git a/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java new file mode 100644 index 000000000..ab8b391db --- /dev/null +++ b/main/crypto-aes/src/main/java/org/cryptomator/crypto/aes256/CryptoWorkerExecutor.java @@ -0,0 +1,112 @@ +package org.cryptomator.crypto.aes256; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CryptoWorkerExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(CryptoWorkerExecutor.class); + + private final int numWorkers; + private final Lock lock; + private final Condition blockDone; + private final AtomicLong currentBlock; + private final BlockingQueue inputQueue; + private final ExecutorService executorService; + private final CompletionService completionService; + private boolean acceptWork; + + /** + * Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}. + */ + public CryptoWorkerExecutor(int numWorkers, WorkerFactory workerFactory) { + this.numWorkers = numWorkers; + this.lock = new ReentrantLock(); + this.blockDone = lock.newCondition(); + this.currentBlock = new AtomicLong(); + this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead + this.executorService = Executors.newFixedThreadPool(numWorkers); + this.completionService = new ExecutorCompletionService<>(executorService); + this.acceptWork = true; + + // start workers: + for (int i = 0; i < numWorkers; i++) { + final CryptoWorker worker = workerFactory.createWorker(lock, blockDone, currentBlock, inputQueue); + completionService.submit(worker); + } + } + + /** + * Adds work to the work queue. On timeout all workers will be shut down. + * + * @see BlockingQueue#offer(Object, long, TimeUnit) + * @return true if the work has been added in time. false in any other case. + */ + public boolean offer(BlocksData data, long timeout, TimeUnit unit) { + if (!acceptWork) { + return false; + } + try { + final boolean success = inputQueue.offer(data, timeout, unit); + if (!success) { + this.acceptWork = false; + inputQueue.clear(); + poisonWorkers(); + } + return success; + } catch (InterruptedException e) { + LOG.error("Interrupted thread.", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + return false; + } + + /** + * Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions). + * + * @throws ExecutionException If any of the workers failed. + */ + public void waitUntilDone() throws ExecutionException { + this.acceptWork = false; + try { + poisonWorkers(); + // now workers will one after another finish their work, potentially throwing an ExecutionException: + for (int i = 0; i < numWorkers; i++) { + completionService.take().get(); + } + } catch (InterruptedException e) { + LOG.error("Interrupted thread.", e); + Thread.currentThread().interrupt(); + } finally { + // shutdown either after normal decryption or if ANY worker threw an exception: + executorService.shutdownNow(); + } + } + + private void poisonWorkers() throws InterruptedException { + // add enough poison for each worker: + for (int i = 0; i < numWorkers; i++) { + inputQueue.put(CryptoWorker.POISON); + } + } + + @FunctionalInterface + interface WorkerFactory { + CryptoWorker createWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue inputQueue); + } + +}