Merge "Harden storage_service::stop_transport" from Pavel E

"
Stopping transport (cql, thrift, messaging, etc.) can happen from
several places -- drain, decommission, stop, isolation. Some of
them can even run in parallel. This patch makes transport stopping
bulletproof.

tests: unit(dev)
       start-stop (dev)
       start-drain-stop (dev)
fixes: #8911
"

* 'br-stop-transport-races' of https://github.com/xemul/scylla:
  storage_service: Indentation fix
  storage_service: Make stop_transport re-entrable
  storage_service: Stop transport on decommission
This commit is contained in:
Avi Kivity
2021-06-27 14:46:46 +03:00
2 changed files with 27 additions and 18 deletions

View File

@@ -1349,23 +1349,35 @@ future<> storage_service::unregister_subscriber(endpoint_lifecycle_subscriber* s
}
future<> storage_service::stop_transport() {
return seastar::async([this] {
slogger.info("Stop transport: starts");
if (!_transport_stopped.has_value()) {
_transport_stopped.emplace();
shutdown_client_servers();
slogger.info("Stop transport: shutdown rpc and cql server done");
(void) seastar::async([this] {
slogger.info("Stop transport: starts");
gms::stop_gossiping().get();
slogger.info("Stop transport: stop_gossiping done");
shutdown_client_servers();
slogger.info("Stop transport: shutdown rpc and cql server done");
do_stop_ms().get();
slogger.info("Stop transport: shutdown messaging_service done");
gms::stop_gossiping().get();
slogger.info("Stop transport: stop_gossiping done");
do_stop_stream_manager().get();
slogger.info("Stop transport: shutdown stream_manager done");
do_stop_ms().get();
slogger.info("Stop transport: shutdown messaging_service done");
slogger.info("Stop transport: done");
});
do_stop_stream_manager().get();
slogger.info("Stop transport: shutdown stream_manager done");
}).then_wrapped([this] (future<> f) {
if (f.failed()) {
_transport_stopped->set_exception(f.get_exception());
} else {
_transport_stopped->set_value();
}
slogger.info("Stop transport: done");
});
}
return _transport_stopped->get_shared_future();
}
future<> storage_service::drain_on_shutdown() {
@@ -2010,18 +2022,14 @@ future<> storage_service::decommission() {
throw;
}
ss.shutdown_client_servers();
slogger.info("DECOMMISSIONING: shutdown rpc and cql server done");
ss.stop_transport().get();
slogger.info("DECOMMISSIONING: stopped transport");
db::get_batchlog_manager().invoke_on_all([] (auto& bm) {
return bm.stop();
}).get();
slogger.info("DECOMMISSIONING: stop batchlog_manager done");
gms::stop_gossiping().get();
slogger.info("DECOMMISSIONING: stop_gossiping done");
ss.do_stop_ms().get();
slogger.info("DECOMMISSIONING: stop messaging_service done");
// StageManager.shutdownNow();
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
slogger.info("DECOMMISSIONING: set_bootstrap_state done");

View File

@@ -847,6 +847,7 @@ public:
private:
promise<> _drain_finished;
std::optional<shared_promise<>> _transport_stopped;
future<> do_drain(bool on_shutdown);
/**
* Seed data to the endpoints that will be responsible for it at the future