Merge pull request #27 from Tillerino/master

Single Running Instance + Double-clicking folders/files shows in GUI
This commit is contained in:
Sebastian Stenzel
2015-01-21 20:07:51 +01:00
12 changed files with 988 additions and 14 deletions

View File

@@ -63,4 +63,8 @@ interface FileNamingConventions {
*/
PathMatcher ENCRYPTED_FILE_GLOB_MATCHER = FileSystems.getDefault().getPathMatcher("glob:**/*{" + BASIC_FILE_EXT + "," + LONG_NAME_FILE_EXT + "}");
/**
* On OSX, folders with this extension are treated as a package.
*/
String FOLDER_EXTENSION = ".cryptomator";
}

View File

@@ -32,8 +32,10 @@
<commons-collections.version>4.0</commons-collections.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
<commons-codec.version>1.10</commons-codec.version>
<jackson-databind.version>2.4.4</jackson-databind.version>
</properties>
<jackson-databind.version>2.4.4</jackson-databind.version>
<mockito.version>1.10.19</mockito.version>
<axetDesktop.version>2.2.3</axetDesktop.version>
</properties>
<dependencyManagement>
<dependencies>
@@ -117,6 +119,19 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.axet</groupId>
<artifactId>desktop</artifactId>
<version>${axetDesktop.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -137,6 +152,10 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
</dependencies>
<modules>

View File

@@ -19,7 +19,7 @@
<properties>
<javafx.application.name>Cryptomator</javafx.application.name>
<exec.mainClass>org.cryptomator.ui.MainApplication</exec.mainClass>
<exec.mainClass>org.cryptomator.ui.Main</exec.mainClass>
<javafx.tools.ant.jar>${java.home}/../lib/ant-javafx.jar</javafx.tools.ant.jar>
<controlsfx.version>8.20.8</controlsfx.version>
</properties>
@@ -52,9 +52,14 @@
<!-- UI -->
<dependency>
<groupId>org.controlsfx</groupId>
<artifactId>controlsfx</artifactId>
<version>${controlsfx.version}</version>
<groupId>org.controlsfx</groupId>
<artifactId>controlsfx</artifactId>
<version>${controlsfx.version}</version>
</dependency>
<dependency>
<groupId>com.github.axet</groupId>
<artifactId>desktop</artifactId>
</dependency>
</dependencies>

View File

@@ -0,0 +1,76 @@
/*******************************************************************************
* Copyright (c) 2014 cryptomator.org
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Tillmann Gaida - initial implementation
******************************************************************************/
package org.cryptomator.ui;
import java.io.File;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javafx.application.Application;
import org.apache.commons.lang3.SystemUtils;
import org.cryptomator.ui.util.SingleInstanceManager;
import org.cryptomator.ui.util.SingleInstanceManager.RemoteInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.axet.desktop.os.mac.AppleHandlers;
public class Main {
public static final Logger LOG = LoggerFactory.getLogger(MainApplication.class);
public static final CompletableFuture<Consumer<File>> OPEN_FILE_HANDLER = new CompletableFuture<>();
public static void main(String[] args) {
if (SystemUtils.IS_OS_MAC_OSX) {
/*
* On OSX we're in an awkward position. We need to register a
* handler in the main thread of this application. However, we can't
* even pass objects to the application, so we're forced to use a
* static CompletableFuture for the handler, which actually opens
* the file in the application.
*/
try {
AppleHandlers.getAppleHandlers().addOpenFileListener(file -> {
try {
OPEN_FILE_HANDLER.get().accept(file);
} catch (Exception e) {
LOG.error("exception handling file open event", e);
throw new RuntimeException(e);
}
});
} catch (RuntimeException e) {
// Since we're trying to call OS-specific code, we'll just have
// to hope for the best.
LOG.error("exception adding OSX file open handler", e);
}
}
/*
* Before starting the application, we check if there is already an
* instance running on this computer. If so, we send our command line
* arguments to that instance and quit.
*/
final Optional<RemoteInstance> remoteInstance = SingleInstanceManager.getRemoteInstance(MainApplication.APPLICATION_KEY);
if (remoteInstance.isPresent()) {
try (RemoteInstance instance = remoteInstance.get()) {
LOG.info("An instance of Cryptomator is already running at {}.", instance.getRemotePort());
for (int i = 0; i < args.length; i++) {
remoteInstance.get().sendMessage(args[i], 1000);
}
} catch (Exception e) {
LOG.error("Error forwarding arguments to remote instance", e);
}
} else {
Application.launch(MainApplication.class, args);
}
}
}

View File

@@ -9,8 +9,13 @@
package org.cryptomator.ui;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
@@ -19,25 +24,39 @@ import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.stage.Stage;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.cryptomator.crypto.aes256.Aes256Cryptor;
import org.cryptomator.ui.settings.Settings;
import org.cryptomator.ui.util.ActiveWindowStyleSupport;
import org.cryptomator.ui.util.SingleInstanceManager;
import org.cryptomator.ui.util.SingleInstanceManager.LocalInstance;
import org.cryptomator.ui.util.TrayIconUtil;
import org.cryptomator.webdav.WebDavServer;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MainApplication extends Application {
private static final Set<Runnable> SHUTDOWN_TASKS = new ConcurrentHashSet<>();
private static final CleanShutdownPerformer CLEAN_SHUTDOWN_PERFORMER = new CleanShutdownPerformer();
public static void main(String[] args) {
Application.launch(args);
Runtime.getRuntime().addShutdownHook(CLEAN_SHUTDOWN_PERFORMER);
}
public static final String APPLICATION_KEY = "CryptomatorGUI";
private static final Logger LOG = LoggerFactory.getLogger(MainApplication.class);
private ExecutorService executorService;
@Override
public void start(final Stage primaryStage) throws IOException {
Runtime.getRuntime().addShutdownHook(MainApplication.CLEAN_SHUTDOWN_PERFORMER);
executorService = Executors.newCachedThreadPool();
addShutdownTask(() -> {
executorService.shutdown();
});
WebDavServer.getInstance().start();
chooseNativeStylesheet();
final ResourceBundle rb = ResourceBundle.getBundle("localization");
@@ -55,6 +74,47 @@ public class MainApplication extends Application {
TrayIconUtil.init(primaryStage, rb, () -> {
quit();
});
for (String arg : getParameters().getUnnamed()) {
handleCommandLineArg(ctrl, arg);
}
if (org.controlsfx.tools.Platform.getCurrent().equals(org.controlsfx.tools.Platform.OSX)) {
Main.OPEN_FILE_HANDLER.complete(file -> handleCommandLineArg(ctrl, file.getAbsolutePath()));
}
LocalInstance cryptomatorGuiInstance = SingleInstanceManager.startLocalInstance(APPLICATION_KEY, executorService);
addShutdownTask(() -> {
cryptomatorGuiInstance.close();
});
cryptomatorGuiInstance.registerListener(arg -> handleCommandLineArg(ctrl, arg));
}
void handleCommandLineArg(final MainController ctrl, String arg) {
Path file = FileSystems.getDefault().getPath(arg);
if (!Files.exists(file)) {
try {
if (!Files.isDirectory(Files.createDirectories(file))) {
return;
}
} catch (IOException e) {
return;
}
// directory created.
} else if (Files.isRegularFile(file)) {
if (StringUtils.endsWithIgnoreCase(file.getFileName().toString(), Aes256Cryptor.MASTERKEY_FILE_EXT)) {
file = file.getParent();
} else {
// is a file, but not a masterkey file
return;
}
}
Path f = file;
Platform.runLater(() -> {
ctrl.addDirectory(f);
ctrl.toFront();
});
}
private void chooseNativeStylesheet() {
@@ -95,7 +155,11 @@ public class MainApplication extends Application {
@Override
public void run() {
SHUTDOWN_TASKS.forEach(r -> {
r.run();
try {
r.run();
} catch (RuntimeException e) {
LOG.error("exception while shutting down", e);
}
});
SHUTDOWN_TASKS.clear();
}

View File

@@ -11,6 +11,8 @@ package org.cryptomator.ui;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.ResourceBundle;
import java.util.stream.Collectors;
@@ -75,8 +77,19 @@ public class MainController implements Initializable, InitializationListener, Un
private void didClickAddDirectory(ActionEvent event) {
final DirectoryChooser dirChooser = new DirectoryChooser();
final File file = dirChooser.showDialog(stage);
if (file != null && file.canWrite()) {
final Directory dir = new Directory(file.toPath());
addDirectory(file.toPath());
}
/**
* adds the given directory or selects it if it is already in the list of
* directories.
*
* @param file
* non-null, writable, existing directory
*/
void addDirectory(final Path file) {
if (file != null && Files.isWritable(file)) {
final Directory dir = new Directory(file);
if (!directoryList.getItems().contains(dir)) {
directoryList.getItems().add(dir);
}
@@ -199,4 +212,13 @@ public class MainController implements Initializable, InitializationListener, Un
this.stage = stage;
}
/**
* Attempts to make the application window visible.
*/
public void toFront() {
stage.setIconified(false);
stage.show();
stage.toFront();
}
}

View File

@@ -8,6 +8,7 @@ import java.nio.file.Path;
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.SimpleObjectProperty;
import org.apache.commons.lang3.StringUtils;
import org.cryptomator.crypto.Cryptor;
import org.cryptomator.crypto.SamplingDecorator;
import org.cryptomator.crypto.aes256.Aes256Cryptor;
@@ -116,7 +117,11 @@ public class Directory implements Serializable {
* @return Directory name without preceeding path components
*/
public String getName() {
return path.getFileName().toString();
String name = path.getFileName().toString();
if (StringUtils.endsWithIgnoreCase(name, Aes256Cryptor.FOLDER_EXTENSION)) {
name = name.substring(0, name.length() - Aes256Cryptor.FOLDER_EXTENSION.length());
}
return name;
}
public Cryptor getCryptor() {

View File

@@ -0,0 +1,80 @@
/*******************************************************************************
* Copyright (c) 2014 cryptomator.org
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Tillmann Gaida - initial implementation
******************************************************************************/
package org.cryptomator.ui.util;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
/**
* Manages and broadcasts events to a set of listeners. The types of the
* listener and event are entirely unbound. Instead, a method must be supplied
* to broadcast an event to a single listener.
*
* @author Tillmann Gaida
*
* @param <LISTENER>
* The type of listener.
* @param <EVENT>
* The type of event.
*/
public class ListenerRegistry<LISTENER, EVENT> {
final BiConsumer<LISTENER, EVENT> listenerCaller;
/**
* Constructs a new registry.
*
* @param listenerCaller
* The method which broadcasts an event to a single listener.
*/
public ListenerRegistry(BiConsumer<LISTENER, EVENT> listenerCaller) {
super();
this.listenerCaller = listenerCaller;
}
/**
* The handle of a registered listener.
*/
public interface ListenerRegistration {
void unregister();
}
final AtomicLong serial = new AtomicLong();
/*
* Since this is a {@link ConcurrentSkipListMap}, we can at the same time
* add to, remove from, and iterate over it. More importantly, a Listener
* can remove itself while being called from the {@link #broadcast(Object)}
* method.
*/
final Map<Long, LISTENER> listeners = new ConcurrentSkipListMap<>();
public ListenerRegistration registerListener(LISTENER listener) {
final long s = serial.incrementAndGet();
listeners.put(s, listener);
return () -> {
listeners.remove(s);
};
}
/**
* Broadcasts the given event to all registered listeners. If a listener
* causes an unchecked exception, that exception is thrown immediately
* without calling the other listeners.
*
* @param event
*/
public void broadcast(EVENT event) {
for (LISTENER listener : listeners.values()) {
listenerCaller.accept(listener, event);
}
}
}

View File

@@ -0,0 +1,365 @@
/*******************************************************************************
* Copyright (c) 2014 Sebastian Stenzel
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Sebastian Stenzel - initial API and implementation
******************************************************************************/
package org.cryptomator.ui.util;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.prefs.Preferences;
import org.apache.commons.io.IOUtils;
import org.cryptomator.ui.Main;
import org.cryptomator.ui.util.ListenerRegistry.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Classes and methods to manage running this application in a mode, which only
* shows one instance.
*
* @author Tillmann Gaida
*/
public class SingleInstanceManager {
private static final Logger LOG = LoggerFactory.getLogger(SingleInstanceManager.class);
/**
* Connection to a running instance
*/
public static class RemoteInstance implements Closeable {
final SocketChannel channel;
RemoteInstance(SocketChannel channel) {
super();
this.channel = channel;
}
/**
* Sends a message to the running instance.
*
* @param string
* May not be longer than 2^16 - 1 bytes.
* @param timeout
* timeout in milliseconds. this should be larger than the
* precision of {@link System#currentTimeMillis()}.
* @return true if the message was sent within the given timeout.
* @throws IOException
*/
public boolean sendMessage(String string, long timeout) throws IOException {
Objects.requireNonNull(string);
byte[] message = string.getBytes();
if (message.length >= 256 * 256) {
throw new IOException("Message too long.");
}
ByteBuffer buf = ByteBuffer.allocate(message.length + 2);
buf.put((byte) (message.length / 256));
buf.put((byte) (message.length % 256));
buf.put(message);
buf.flip();
TimeoutTask.attempt(t -> {
if (channel.write(buf) < 0) {
return true;
}
return !buf.hasRemaining();
}, timeout, 10);
return !buf.hasRemaining();
}
@Override
public void close() throws IOException {
channel.close();
}
public int getRemotePort() throws IOException {
return ((InetSocketAddress) channel.getRemoteAddress()).getPort();
}
}
public static interface MessageListener {
void handleMessage(String message);
}
/**
* Represents a socket making this the main instance of the application.
*/
public static class LocalInstance implements Closeable {
private class ChannelState {
ByteBuffer write = ByteBuffer.wrap(applicationKey.getBytes());
ByteBuffer readLength = ByteBuffer.allocate(2);
ByteBuffer readMessage = null;
}
final ListenerRegistry<MessageListener, String> registry = new ListenerRegistry<>(MessageListener::handleMessage);
final String applicationKey;
final ServerSocketChannel channel;
final Selector selector;
int port = 0;
public LocalInstance(String applicationKey, ServerSocketChannel channel, Selector selector) {
Objects.requireNonNull(applicationKey);
this.applicationKey = applicationKey;
this.channel = channel;
this.selector = selector;
}
/**
* Register a listener for
*
* @param listener
* @return
*/
public ListenerRegistration registerListener(MessageListener listener) {
Objects.requireNonNull(listener);
return registry.registerListener(listener);
}
void handleSelection(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
final SocketChannel accepted = channel.accept();
if (accepted != null) {
LOG.info("accepted incoming connection");
accepted.configureBlocking(false);
accepted.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
if (key.attachment() == null) {
key.attach(new ChannelState());
}
ChannelState state = (ChannelState) key.attachment();
if (key.isWritable() && state.write != null) {
((WritableByteChannel) key.channel()).write(state.write);
if (!state.write.hasRemaining()) {
state.write = null;
}
LOG.debug("wrote welcome. switching to read only.");
key.interestOps(SelectionKey.OP_READ);
}
if (key.isReadable()) {
ByteBuffer buffer = state.readLength != null ? state.readLength : state.readMessage;
if (((ReadableByteChannel) key.channel()).read(buffer) < 0) {
key.cancel();
}
if (!buffer.hasRemaining()) {
buffer.flip();
if (state.readLength != null) {
int length = (buffer.get() + 256) % 256;
length = length * 256 + ((buffer.get() + 256) % 256);
state.readLength = null;
state.readMessage = ByteBuffer.allocate(length);
} else {
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
state.readMessage = null;
state.readLength = ByteBuffer.allocate(2);
registry.broadcast(new String(bytes, "UTF-8"));
}
}
}
}
public void close() {
IOUtils.closeQuietly(selector);
IOUtils.closeQuietly(channel);
if (getSavedPort(applicationKey).orElse(-1).equals(port)) {
Preferences.userNodeForPackage(Main.class).remove(applicationKey);
}
}
void selectionLoop() {
try {
final Set<SelectionKey> keysToRemove = new HashSet<>();
while (selector.select() > 0) {
final Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
if (Thread.interrupted()) {
return;
}
try {
handleSelection(key);
} catch (IOException | IllegalStateException e) {
LOG.error("exception in selector", e);
} finally {
keysToRemove.add(key);
}
}
keys.removeAll(keysToRemove);
}
} catch (ClosedSelectorException e) {
return;
} catch (Exception e) {
LOG.error("error while selecting", e);
}
}
}
/**
* Checks if there is a valid port at
* {@link Preferences#userNodeForPackage(Class)} for {@link Main} under the
* given applicationKey, tries to connect to the port at the loopback
* address and checks if the port identifies with the applicationKey.
*
* @param applicationKey
* key used to load the port and check the identity of the
* connection.
* @return
*/
public static Optional<RemoteInstance> getRemoteInstance(String applicationKey) {
Optional<Integer> port = getSavedPort(applicationKey);
if (!port.isPresent()) {
return Optional.empty();
}
SocketChannel channel = null;
boolean close = true;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
LOG.info("connecting to instance {}", port.get());
channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), port.get()));
SocketChannel fChannel = channel;
if (!TimeoutTask.attempt(t -> fChannel.finishConnect(), 1000, 10)) {
return Optional.empty();
}
LOG.info("connected to instance {}", port.get());
final byte[] bytes = applicationKey.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
tryFill(channel, buf, 1000);
if (buf.hasRemaining()) {
return Optional.empty();
}
buf.flip();
for (int i = 0; i < bytes.length; i++) {
if (buf.get() != bytes[i]) {
return Optional.empty();
}
}
close = false;
return Optional.of(new RemoteInstance(channel));
} catch (Exception e) {
return Optional.empty();
} finally {
if (close) {
IOUtils.closeQuietly(channel);
}
}
}
static Optional<Integer> getSavedPort(String applicationKey) {
int port = Preferences.userNodeForPackage(Main.class).getInt(applicationKey, -1);
if (port == -1) {
LOG.info("no running instance found");
return Optional.empty();
}
return Optional.of(port);
}
/**
* Creates a server socket on a free port and saves the port in
* {@link Preferences#userNodeForPackage(Class)} for {@link Main} under the
* given applicationKey.
*
* @param applicationKey
* key used to save the port and identify upon connection.
* @param exec
* the task which is submitted is interruptable.
* @return
* @throws IOException
*/
public static LocalInstance startLocalInstance(String applicationKey, ExecutorService exec) throws IOException {
final ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
final int port = ((InetSocketAddress) channel.getLocalAddress()).getPort();
Preferences.userNodeForPackage(Main.class).putInt(applicationKey, port);
LOG.info("InstanceManager bound to port {}", port);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
LocalInstance instance = new LocalInstance(applicationKey, channel, selector);
exec.submit(() -> {
try {
instance.port = ((InetSocketAddress) channel.getLocalAddress()).getPort();
} catch (IOException e) {
}
instance.selectionLoop();
});
return instance;
}
/**
* tries to fill the given buffer for the given time
*
* @param channel
* @param buf
* @param timeout
* @throws ClosedChannelException
* @throws IOException
*/
public static <T extends SelectableChannel & ReadableByteChannel> void tryFill(T channel, final ByteBuffer buf, int timeout) throws IOException {
if (channel.isBlocking()) {
throw new IllegalStateException("Channel is in blocking mode.");
}
try (Selector selector = Selector.open()) {
channel.register(selector, SelectionKey.OP_READ);
TimeoutTask.attempt(remainingTime -> {
if (!buf.hasRemaining()) {
return true;
}
if (selector.select(remainingTime) > 0) {
if (channel.read(buf) < 0) {
return true;
}
}
return !buf.hasRemaining();
}, timeout, 1);
}
}
}

View File

@@ -0,0 +1,81 @@
/*******************************************************************************
* Copyright (c) 2014 cryptomator.org
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Tillmann Gaida - initial implementation
******************************************************************************/
package org.cryptomator.ui.util;
/**
* A task which is supposed to be repeated until it succeeds.
*
* @author Tillmann Gaida
*
* @param <E>
* The type of checked exception that this task may throw.
*/
public interface TimeoutTask<E extends Exception> {
/**
* Attempts to execute the task.
*
* @param timeout
* The time remaining to finish the task.
* @return true if the task finished, false if it needs to be attempted
* again.
* @throws E
* @throws InterruptedException
*/
boolean attempt(long timeout) throws E, InterruptedException;
/**
* Attempts a task until a timeout occurs. Checks for this timeout are based
* on {@link System#currentTimeMillis()}, so they are very crude. The task
* is guaranteed to be attempted once.
*
* @param task
* the task to perform.
* @param timeout
* time in millis before this method stops attempting to finish
* the task. greater than zero.
* @param sleepTimes
* time in millis to sleep between attempts. greater than zero.
* @return true if the task was finished, false if the task never always
* returned false or as soon as the task throws an
* {@link InterruptedException}.
* @throws E
* From the task.
*/
public static <E extends Exception> boolean attempt(TimeoutTask<E> task, long timeout, long sleepTimes) throws E {
if (timeout <= 0 || sleepTimes <= 0) {
throw new IllegalArgumentException();
}
long currentTime = System.currentTimeMillis();
long tryUntil = currentTime + timeout;
for (;; currentTime = System.currentTimeMillis()) {
if (currentTime >= tryUntil) {
return false;
}
try {
if (task.attempt(tryUntil - currentTime)) {
return true;
}
currentTime = System.currentTimeMillis();
if (currentTime + sleepTimes < tryUntil) {
Thread.sleep(sleepTimes);
} else {
return false;
}
} catch (InterruptedException e) {
return false;
}
}
}
}

View File

@@ -0,0 +1,53 @@
/*******************************************************************************
* Copyright (c) 2014 cryptomator.org
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Tillmann Gaida - initial implementation
******************************************************************************/
package org.cryptomator.ui.util;
import static org.junit.Assert.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentSkipListMap;
import org.junit.Test;
public class ListenerRegistryTest {
/**
* This test looks at how concurrent modifications affect the iterator of a
* {@link ConcurrentSkipListMap}. It shows that concurrent modifications
* work just fine, however the state of the iterator including the next
* value are advanced during retrieval of a value, so it's not possible to
* remove the next value.
*
* @throws Exception
*/
@Test
public void testConcurrentSkipListMap() throws Exception {
ConcurrentSkipListMap<Integer, Integer> map = new ConcurrentSkipListMap<>();
map.put(1, 1);
map.put(2, 2);
map.put(3, 3);
map.put(4, 4);
map.put(5, 5);
final Iterator<Integer> iterator = map.values().iterator();
assertTrue(iterator.hasNext());
assertEquals((Integer) 1, iterator.next());
map.remove(2);
assertTrue(iterator.hasNext());
// iterator returns 2 anyway.
assertEquals((Integer) 2, iterator.next());
assertTrue(iterator.hasNext());
map.remove(4);
assertEquals((Integer) 3, iterator.next());
assertTrue(iterator.hasNext());
// this time we removed 4 before retrieving 3, so it is skipped.
assertEquals((Integer) 5, iterator.next());
}
}

View File

@@ -0,0 +1,200 @@
/*******************************************************************************
* Copyright (c) 2014 cryptomator.org
* This file is licensed under the terms of the MIT license.
* See the LICENSE.txt file for more info.
*
* Contributors:
* Tillmann Gaida - initial implementation
******************************************************************************/
package org.cryptomator.ui.util;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import org.cryptomator.ui.util.SingleInstanceManager.LocalInstance;
import org.cryptomator.ui.util.SingleInstanceManager.MessageListener;
import org.cryptomator.ui.util.SingleInstanceManager.RemoteInstance;
import org.junit.Test;
public class SingleInstanceManagerTest {
@Test(timeout = 10000)
public void testTryFillTimeout() throws Exception {
try (final ServerSocket socket = new ServerSocket(0)) {
// we need to asynchronously accept the connection
final ForkJoinTask<?> forked = ForkJoinTask.adapt(() -> {
try {
socket.setSoTimeout(1000);
socket.accept();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).fork();
try (SocketChannel channel = SocketChannel.open()) {
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), socket.getLocalPort()));
TimeoutTask.attempt(t -> channel.finishConnect(), 1000, 1);
final ByteBuffer buffer = ByteBuffer.allocate(1);
SingleInstanceManager.tryFill(channel, buffer, 1000);
assertTrue(buffer.hasRemaining());
}
forked.join();
}
}
@Test(timeout = 10000)
public void testTryFill() throws Exception {
try (final ServerSocket socket = new ServerSocket(0)) {
// we need to asynchronously accept the connection
final ForkJoinTask<?> forked = ForkJoinTask.adapt(() -> {
try {
socket.setSoTimeout(1000);
socket.accept().getOutputStream().write(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}).fork();
try (SocketChannel channel = SocketChannel.open()) {
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), socket.getLocalPort()));
TimeoutTask.attempt(t -> channel.finishConnect(), 1000, 1);
final ByteBuffer buffer = ByteBuffer.allocate(1);
SingleInstanceManager.tryFill(channel, buffer, 1000);
assertFalse(buffer.hasRemaining());
}
forked.join();
}
}
String appKey = "APPKEY";
@Test
public void testOneMessage() throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
try {
final LocalInstance server = SingleInstanceManager.startLocalInstance(appKey, exec);
final Optional<RemoteInstance> r = SingleInstanceManager.getRemoteInstance(appKey);
CountDownLatch latch = new CountDownLatch(1);
final MessageListener listener = spy(new MessageListener() {
@Override
public void handleMessage(String message) {
latch.countDown();
}
});
server.registerListener(listener);
assertTrue(r.isPresent());
String message = "Is this thing on?";
assertTrue(r.get().sendMessage(message, 1000));
System.out.println("wrote message");
latch.await(10, TimeUnit.SECONDS);
verify(listener).handleMessage(message);
} finally {
exec.shutdownNow();
}
}
@Test(timeout = 60000)
public void testALotOfMessages() throws Exception {
final int connectors = 256;
final int messagesPerConnector = 256;
ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec2 = Executors.newFixedThreadPool(16);
try (final LocalInstance server = SingleInstanceManager.startLocalInstance(appKey, exec)) {
Set<String> sentMessages = new ConcurrentSkipListSet<>();
Set<String> receivedMessages = new HashSet<>();
CountDownLatch sendLatch = new CountDownLatch(connectors);
CountDownLatch receiveLatch = new CountDownLatch(connectors * messagesPerConnector);
server.registerListener(message -> {
receivedMessages.add(message);
receiveLatch.countDown();
});
Set<RemoteInstance> instances = Collections.synchronizedSet(new HashSet<>());
for (int i = 0; i < connectors; i++) {
exec2.submit(() -> {
try {
final Optional<RemoteInstance> r = SingleInstanceManager.getRemoteInstance(appKey);
assertTrue(r.isPresent());
instances.add(r.get());
for (int j = 0; j < messagesPerConnector; j++) {
exec2.submit(() -> {
try {
for (;;) {
final String message = UUID.randomUUID().toString();
if (!sentMessages.add(message)) {
continue;
}
r.get().sendMessage(message, 1000);
break;
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
sendLatch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
});
}
assertTrue(sendLatch.await(1, TimeUnit.MINUTES));
exec2.shutdown();
assertTrue(exec2.awaitTermination(1, TimeUnit.MINUTES));
assertTrue(receiveLatch.await(1, TimeUnit.MINUTES));
assertEquals(sentMessages, receivedMessages);
for (RemoteInstance remoteInstance : instances) {
try {
remoteInstance.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} finally {
exec.shutdownNow();
exec2.shutdownNow();
}
}
}