fail with exception, if crypto worker is waiting too long (1 second) to be able to write to the output stream

This commit is contained in:
Sebastian Stenzel
2015-12-06 14:41:27 +01:00
parent fc06595977
commit 973a2fb395
3 changed files with 25 additions and 11 deletions

View File

@@ -24,6 +24,7 @@ import java.security.SecureRandom;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.crypto.AEADBadTagException;
import javax.crypto.BadPaddingException;
@@ -499,9 +500,14 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
try {
executor.waitUntilDone(1, TimeUnit.SECONDS);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
Throwable cause = e;
while (cause instanceof ExecutionException) {
cause = cause.getCause();
}
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof TimeoutException) {
throw new DecryptFailedException(cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
@@ -724,9 +730,14 @@ public class Aes256Cryptor implements Cryptor, AesCryptographicConfiguration {
try {
executor.waitUntilDone(1, TimeUnit.SECONDS);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
Throwable cause = e;
while (cause instanceof ExecutionException) {
cause = cause.getCause();
}
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof TimeoutException) {
throw new EncryptFailedException(cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {

View File

@@ -4,6 +4,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -14,10 +16,10 @@ abstract class CryptoWorker implements Callable<Void> {
static final BlocksData POISON = new BlocksData(ByteBuffer.allocate(0), -1L, 0);
final Lock lock;
final Condition blockDone;
final AtomicLong currentBlock;
final BlockingQueue<BlocksData> queue;
private final Lock lock;
private final Condition blockDone;
private final AtomicLong currentBlock;
private final BlockingQueue<BlocksData> queue;
public CryptoWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> queue) {
this.lock = lock;
@@ -27,7 +29,7 @@ abstract class CryptoWorker implements Callable<Void> {
}
@Override
public final Void call() throws IOException {
public final Void call() throws IOException, TimeoutException {
try {
while (!Thread.currentThread().isInterrupted()) {
final BlocksData blocksData = queue.take();
@@ -38,7 +40,9 @@ abstract class CryptoWorker implements Callable<Void> {
lock.lock();
try {
while (currentBlock.get() != blocksData.startBlockNum) {
blockDone.await();
if (!blockDone.await(1, TimeUnit.SECONDS)) {
throw new TimeoutException("Waited too long to write block " + blocksData.startBlockNum + "; Current block " + currentBlock.get());
}
}
assert currentBlock.get() == blocksData.startBlockNum;
// yay, its my turn!
@@ -51,7 +55,7 @@ abstract class CryptoWorker implements Callable<Void> {
}
}
} catch (InterruptedException e) {
// will happen for executorService.shutdownNow()
// will happen for executorService.shutdownNow() or future.cancel()
Thread.currentThread().interrupt();
}
return null;

View File

@@ -68,8 +68,7 @@ class CryptoWorkerExecutor {
try {
final boolean success = inputQueue.offer(data, timeout, unit);
if (!success) {
LOG.error("inputQueue is full.");
inputQueue.clear();
LOG.warn("Cancelling crypto workers due to timeout. Apparently the work queue not being drained by the workers any longer.");
allWork.cancel(true);
}
return success;