main: RAII-ify shutdown

Instead of app-template::run_deprecated() and at_exit() hooks, use
app_template::run() and RAII (via defer()) to stop services. This makes it
easier to add services that do support shutdown correctly.

Ref #2737
Message-Id: <20190420175733.29454-1-avi@scylladb.com>
This commit is contained in:
Avi Kivity
2019-04-20 20:57:33 +03:00
committed by Tomasz Grabiec
parent 21fbf59fa8
commit b19792405f

101
main.cc
View File

@@ -84,6 +84,32 @@ void register_tracing_keyspace_backend(backend_registry& br);
}
class stop_signal {
bool _caught = false;
condition_variable _cond;
private:
void signaled() {
_caught = true;
_cond.broadcast();
}
public:
stop_signal() {
engine().handle_signal(SIGINT, [this] { signaled(); });
engine().handle_signal(SIGTERM, [this] { signaled(); });
}
~stop_signal() {
// There's no way to unregister a handler yet, so register a no-op handler instead.
engine().handle_signal(SIGINT, [] {});
engine().handle_signal(SIGTERM, [] {});
}
future<> wait() {
return _cond.wait([this] { return _caught; });
}
bool stopping() const {
return _caught;
}
};
template<typename K, typename V, typename... Args, typename K2, typename V2 = V>
V get_or_default(const std::unordered_map<K, V, Args...>& ss, const K2& key, const V2& def = V()) {
const auto iter = ss.find(key);
@@ -307,6 +333,7 @@ int main(int ac, char** av) {
app_template::config app_cfg;
app_cfg.name = "Scylla";
app_cfg.default_task_quota = 500us;
app_cfg.auto_handle_sigint_sigterm = false;
app_template app(std::move(app_cfg));
auto ext = std::make_shared<db::extensions>();
@@ -347,7 +374,7 @@ int main(int ac, char** av) {
directories dirs;
sharded<gms::feature_service> feature_service;
return app.run_deprecated(ac, av, [&] {
return app.run(ac, av, [&] () -> future<int> {
fmt::print("Scylla version {} starting ...\n", scylla_version());
auto&& opts = app.configuration();
@@ -375,6 +402,8 @@ int main(int ac, char** av) {
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
&feature_service] {
try {
::stop_signal stop_signal; // we can move this earlier to support SIGINT during initialization
read_config(opts, *cfg).get();
configurable::init_all(opts, *cfg, *ext).get();
@@ -426,13 +455,14 @@ int main(int ac, char** av) {
}();
supervisor::notify("starting prometheus API server");
uint16_t pport = cfg->prometheus_port();
std::any stop_prometheus;
if (pport) {
pctx.metric_help = "Scylla server statistics";
pctx.prefix = cfg->prometheus_prefix();
prometheus_server.start("prometheus").get();
engine().at_exit([&prometheus_server] {
return prometheus_server.stop();
});
stop_prometheus = ::make_shared(defer([&prometheus_server] {
prometheus_server.stop().get();
}));
prometheus::start(prometheus_server, pctx);
with_scheduling_group(maintenance_scheduling_group, [&] {
return prometheus_server.listen(ipv4_addr{prom_addr.addr_list.front(), pport}).handle_exception([pport, &cfg] (auto ep) {
@@ -541,22 +571,19 @@ int main(int ac, char** av) {
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.available_memory = memory::stats().total_memory();
db.start(std::ref(*cfg), dbcfg).get();
engine().at_exit([&db, &return_value] {
auto stop_database_and_sstables = defer([&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
// call stop on each db instance, but leave the shareded<database> pointers alive.
startlog.info("Shutdown database started");
return stop_database(db).then([&db] {
stop_database(db).then([&db] {
return db.invoke_on_all([](auto& db) {
return db.stop();
});
}).then([] {
startlog.info("Shutdown database finished");
return sstables::await_background_jobs_on_all_shards();
}).then([&return_value] {
startlog.info("Scylla version {} shutdown complete.", scylla_version());
::_exit(return_value);
});
}).get();
});
verify_seastar_io_scheduler(opts.count("max-io-requests"), opts.count("io-properties") || opts.count("io-properties-file"),
db.local().get_config().developer_mode()).get();
@@ -814,18 +841,18 @@ int main(int ac, char** av) {
auto lb = make_shared<service::load_broadcaster>(db, gms::get_local_gossiper());
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
auto stop_load_broadcater = defer([lb = std::move(lb)] () { lb->stop_broadcasting().get(); });
supervisor::notify("starting cf cache hit rate calculator");
cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
engine().at_exit([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop(); });
auto stop_cache_hitrate_calculator = defer([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop().get(); });
cf_cache_hitrate_calculator.local().run_on(engine().cpu_id());
supervisor::notify("starting view update backlog broker");
static sharded<service::view_update_backlog_broker> view_backlog_broker;
view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get();
view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get();
engine().at_exit([] {
return view_backlog_broker.stop();
auto stop_view_backlog_broker = defer([] {
view_backlog_broker.stop().get();
});
api::set_server_cache(ctx);
@@ -877,35 +904,43 @@ int main(int ac, char** av) {
api::set_server_done(ctx).get();
supervisor::notify("serving");
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
auto stop_repair = defer([] {
repair_shutdown(service::get_local_storage_service().db()).get();
});
engine().at_exit([] {
return view_update_generator.stop();
auto stop_view_update_generator = defer([] {
view_update_generator.stop().get();
});
engine().at_exit([] {
return service::get_local_storage_service().drain_on_shutdown();
auto do_drain = defer([] {
service::get_local_storage_service().drain_on_shutdown().get();
});
engine().at_exit([] {
return view_builder.stop();
auto stop_view_builder = defer([] {
view_builder.stop().get();
});
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
auto stop_compaction_manager = defer([&db] {
db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();
});
}).get();
});
}).then_wrapped([&return_value] (auto && f) {
try {
f.get();
startlog.info("Scylla version {} initialization completed.", scylla_version());
} catch (...) {
return_value = 1;
engine_exit(std::current_exception());
}
startlog.info("Scylla version {} initialization completed.", scylla_version());
stop_signal.wait().get();
startlog.info("Signal received; shutting down");
// At this point, all objects destructors and all shutdown hooks registered with defer() are executed
} catch (...) {
startlog.info("Startup failed: {}", std::current_exception());
// We should be returning 1 here, but the system is not yet prepared for orderly rollback of main() objects
// and thread_local variables.
_exit(1);
return 1;
}
startlog.info("Scylla version {} shutdown complete.", scylla_version());
// We should be returning 0 here, but the system is not yet prepared for orderly rollback of main() objects
// and thread_local variables.
_exit(0);
return 0;
});
});
} catch (...) {