Using disting ExecutorService and ScheduledExecutorService since the latter doesn't scale beyond its pre-defined number of core threads

fixes #1051
This commit is contained in:
Sebastian Stenzel
2020-01-15 13:07:19 +01:00
parent eb047e29a0
commit 833bb085e6
3 changed files with 42 additions and 15 deletions

View File

@@ -5,7 +5,6 @@
*******************************************************************************/
package org.cryptomator.common;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import javafx.beans.binding.Binding;
@@ -19,6 +18,8 @@ import org.cryptomator.common.vaults.VaultComponent;
import org.cryptomator.common.vaults.VaultListManager;
import org.cryptomator.frontend.webdav.WebDavServer;
import org.fxmisc.easybind.EasyBind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Named;
import javax.inject.Singleton;
@@ -27,12 +28,18 @@ import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Module(subcomponents = {VaultComponent.class})
public abstract class CommonsModule {
private static final int NUM_SCHEDULER_THREADS = 4;
private static final Logger LOG = LoggerFactory.getLogger(CommonsModule.class);
private static final int NUM_SCHEDULER_THREADS = 2;
private static final int NUM_CORE_BG_THREADS = 6;
private static final long BG_THREAD_KEEPALIVE_SECONDS = 60l;
@Provides
@Singleton
@@ -69,18 +76,38 @@ public abstract class CommonsModule {
static ScheduledExecutorService provideScheduledExecutorService(ShutdownHook shutdownHook) {
final AtomicInteger threadNumber = new AtomicInteger(1);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(NUM_SCHEDULER_THREADS, r -> {
String name = String.format("App Scheduled Executor %02d", threadNumber.getAndIncrement());
Thread t = new Thread(r);
t.setName("Background Thread " + threadNumber.getAndIncrement());
t.setName(name);
t.setUncaughtExceptionHandler(CommonsModule::handleUncaughtExceptionInBackgroundThread);
t.setDaemon(true);
LOG.debug("Starting {}", t.getName());
return t;
});
shutdownHook.runOnShutdown(executorService::shutdown);
return executorService;
}
@Binds
@Provides
@Singleton
abstract ExecutorService bindExecutorService(ScheduledExecutorService executor);
static ExecutorService provideExecutorService(ShutdownHook shutdownHook) {
final AtomicInteger threadNumber = new AtomicInteger(1);
ExecutorService executorService = new ThreadPoolExecutor(NUM_CORE_BG_THREADS, Integer.MAX_VALUE, BG_THREAD_KEEPALIVE_SECONDS, TimeUnit.SECONDS, new SynchronousQueue<>(), r -> {
String name = String.format("App Background Thread %03d", threadNumber.getAndIncrement());
Thread t = new Thread(r);
t.setName(name);
t.setUncaughtExceptionHandler(CommonsModule::handleUncaughtExceptionInBackgroundThread);
t.setDaemon(true);
LOG.debug("Starting {}", t.getName());
return t;
});
shutdownHook.runOnShutdown(executorService::shutdown);
return executorService;
}
private static void handleUncaughtExceptionInBackgroundThread(Thread thread, Throwable throwable) {
LOG.error("Uncaught exception in " + thread.getName(), throwable);
}
@Provides
@Singleton

View File

@@ -32,7 +32,6 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -46,16 +45,17 @@ public class SettingsProvider implements Supplier<Settings> {
private static final Logger LOG = LoggerFactory.getLogger(SettingsProvider.class);
private static final long SAVE_DELAY_MS = 1000;
private final ScheduledExecutorService saveScheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<ScheduledFuture<?>> scheduledSaveCmd = new AtomicReference<>();
private final AtomicReference<Settings> settings = new AtomicReference<>();
private final SettingsJsonAdapter settingsJsonAdapter = new SettingsJsonAdapter();
private final Environment env;
private final ScheduledExecutorService scheduler;
private final Gson gson;
@Inject
public SettingsProvider(Environment env) {
public SettingsProvider(Environment env, ScheduledExecutorService scheduler) {
this.env = env;
this.scheduler = scheduler;
this.gson = new GsonBuilder() //
.setPrettyPrinting().setLenient().disableHtmlEscaping() //
.registerTypeAdapter(Settings.class, settingsJsonAdapter) //
@@ -98,7 +98,7 @@ public class SettingsProvider implements Supplier<Settings> {
final Optional<Path> settingsPath = env.getSettingsPath().findFirst(); // alway save to preferred (first) path
settingsPath.ifPresent(path -> {
Runnable saveCommand = () -> this.save(settings, path);
ScheduledFuture<?> scheduledTask = saveScheduler.schedule(saveCommand, SAVE_DELAY_MS, TimeUnit.MILLISECONDS);
ScheduledFuture<?> scheduledTask = scheduler.schedule(saveCommand, SAVE_DELAY_MS, TimeUnit.MILLISECONDS);
ScheduledFuture<?> previouslyScheduledTask = scheduledSaveCmd.getAndSet(scheduledTask);
if (previouslyScheduledTask != null) {
previouslyScheduledTask.cancel(false);

View File

@@ -73,21 +73,21 @@ public class Tasks {
return new TaskImpl<>(callable, successHandler, errorHandlers, finallyHandler);
}
public Task<T> runOnce(ExecutorService executorService) {
public Task<T> runOnce(ExecutorService executor) {
Task<T> task = build();
executorService.submit(task);
executor.submit(task);
return task;
}
public Task<T> scheduleOnce(ScheduledExecutorService executorService, long delay, TimeUnit unit) {
public Task<T> scheduleOnce(ScheduledExecutorService scheduler, long delay, TimeUnit unit) {
Task<T> task = build();
executorService.schedule(task, delay, unit);
scheduler.schedule(task, delay, unit);
return task;
}
public ScheduledService<T> schedulePeriodically(ExecutorService executorService, Duration initialDelay, Duration period) {
public ScheduledService<T> schedulePeriodically(ExecutorService executor, Duration initialDelay, Duration period) {
ScheduledService<T> service = new RestartingService<>(this::build);
service.setExecutor(executorService);
service.setExecutor(executor);
service.setDelay(initialDelay);
service.setPeriod(period);
Platform.runLater(service::start);