diff --git a/main.cc b/main.cc index e004b961cf..b13af6bc5c 100644 --- a/main.cc +++ b/main.cc @@ -84,6 +84,32 @@ void register_tracing_keyspace_backend(backend_registry& br); } +class stop_signal { + bool _caught = false; + condition_variable _cond; +private: + void signaled() { + _caught = true; + _cond.broadcast(); + } +public: + stop_signal() { + engine().handle_signal(SIGINT, [this] { signaled(); }); + engine().handle_signal(SIGTERM, [this] { signaled(); }); + } + ~stop_signal() { + // There's no way to unregister a handler yet, so register a no-op handler instead. + engine().handle_signal(SIGINT, [] {}); + engine().handle_signal(SIGTERM, [] {}); + } + future<> wait() { + return _cond.wait([this] { return _caught; }); + } + bool stopping() const { + return _caught; + } +}; + template V get_or_default(const std::unordered_map& ss, const K2& key, const V2& def = V()) { const auto iter = ss.find(key); @@ -307,6 +333,7 @@ int main(int ac, char** av) { app_template::config app_cfg; app_cfg.name = "Scylla"; app_cfg.default_task_quota = 500us; + app_cfg.auto_handle_sigint_sigterm = false; app_template app(std::move(app_cfg)); auto ext = std::make_shared(); @@ -347,7 +374,7 @@ int main(int ac, char** av) { directories dirs; sharded feature_service; - return app.run_deprecated(ac, av, [&] { + return app.run(ac, av, [&] () -> future { fmt::print("Scylla version {} starting ...\n", scylla_version()); auto&& opts = app.configuration(); @@ -375,6 +402,8 @@ int main(int ac, char** av) { return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator, &feature_service] { + try { + ::stop_signal stop_signal; // we can move this earlier to support SIGINT during initialization read_config(opts, *cfg).get(); configurable::init_all(opts, *cfg, *ext).get(); @@ -426,13 +455,14 @@ int main(int ac, char** av) { }(); supervisor::notify("starting prometheus API server"); uint16_t pport = cfg->prometheus_port(); + std::any stop_prometheus; if (pport) { pctx.metric_help = "Scylla server statistics"; pctx.prefix = cfg->prometheus_prefix(); prometheus_server.start("prometheus").get(); - engine().at_exit([&prometheus_server] { - return prometheus_server.stop(); - }); + stop_prometheus = ::make_shared(defer([&prometheus_server] { + prometheus_server.stop().get(); + })); prometheus::start(prometheus_server, pctx); with_scheduling_group(maintenance_scheduling_group, [&] { return prometheus_server.listen(ipv4_addr{prom_addr.addr_list.front(), pport}).handle_exception([pport, &cfg] (auto ep) { @@ -541,22 +571,19 @@ int main(int ac, char** av) { dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200); dbcfg.available_memory = memory::stats().total_memory(); db.start(std::ref(*cfg), dbcfg).get(); - engine().at_exit([&db, &return_value] { + auto stop_database_and_sstables = defer([&db] { // #293 - do not stop anything - not even db (for real) //return db.stop(); // call stop on each db instance, but leave the shareded pointers alive. startlog.info("Shutdown database started"); - return stop_database(db).then([&db] { + stop_database(db).then([&db] { return db.invoke_on_all([](auto& db) { return db.stop(); }); }).then([] { startlog.info("Shutdown database finished"); return sstables::await_background_jobs_on_all_shards(); - }).then([&return_value] { - startlog.info("Scylla version {} shutdown complete.", scylla_version()); - ::_exit(return_value); - }); + }).get(); }); verify_seastar_io_scheduler(opts.count("max-io-requests"), opts.count("io-properties") || opts.count("io-properties-file"), db.local().get_config().developer_mode()).get(); @@ -814,18 +841,18 @@ int main(int ac, char** av) { auto lb = make_shared(db, gms::get_local_gossiper()); lb->start_broadcasting(); service::get_local_storage_service().set_load_broadcaster(lb); - engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); }); + auto stop_load_broadcater = defer([lb = std::move(lb)] () { lb->stop_broadcasting().get(); }); supervisor::notify("starting cf cache hit rate calculator"); cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get(); - engine().at_exit([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop(); }); + auto stop_cache_hitrate_calculator = defer([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop().get(); }); cf_cache_hitrate_calculator.local().run_on(engine().cpu_id()); supervisor::notify("starting view update backlog broker"); static sharded view_backlog_broker; view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get(); view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get(); - engine().at_exit([] { - return view_backlog_broker.stop(); + auto stop_view_backlog_broker = defer([] { + view_backlog_broker.stop().get(); }); api::set_server_cache(ctx); @@ -877,35 +904,43 @@ int main(int ac, char** av) { api::set_server_done(ctx).get(); supervisor::notify("serving"); // Register at_exit last, so that storage_service::drain_on_shutdown will be called first - engine().at_exit([] { - return repair_shutdown(service::get_local_storage_service().db()); + auto stop_repair = defer([] { + repair_shutdown(service::get_local_storage_service().db()).get(); }); - engine().at_exit([] { - return view_update_generator.stop(); + auto stop_view_update_generator = defer([] { + view_update_generator.stop().get(); }); - engine().at_exit([] { - return service::get_local_storage_service().drain_on_shutdown(); + auto do_drain = defer([] { + service::get_local_storage_service().drain_on_shutdown().get(); }); - engine().at_exit([] { - return view_builder.stop(); + auto stop_view_builder = defer([] { + view_builder.stop().get(); }); - engine().at_exit([&db] { - return db.invoke_on_all([](auto& db) { + auto stop_compaction_manager = defer([&db] { + db.invoke_on_all([](auto& db) { return db.get_compaction_manager().stop(); - }); + }).get(); }); - }).then_wrapped([&return_value] (auto && f) { - try { - f.get(); - startlog.info("Scylla version {} initialization completed.", scylla_version()); - } catch (...) { - return_value = 1; - engine_exit(std::current_exception()); - } + startlog.info("Scylla version {} initialization completed.", scylla_version()); + stop_signal.wait().get(); + startlog.info("Signal received; shutting down"); + // At this point, all objects destructors and all shutdown hooks registered with defer() are executed + } catch (...) { + startlog.info("Startup failed: {}", std::current_exception()); + // We should be returning 1 here, but the system is not yet prepared for orderly rollback of main() objects + // and thread_local variables. + _exit(1); + return 1; + } + startlog.info("Scylla version {} shutdown complete.", scylla_version()); + // We should be returning 0 here, but the system is not yet prepared for orderly rollback of main() objects + // and thread_local variables. + _exit(0); + return 0; }); }); } catch (...) {