service: storage_service: s/client_shutdown_hooks/protocol_servers/

Replace the simple client shutdown hook registry mechanism with a more
powerful registry of the protocol servers themselves. This allows
enumerating the protocol servers at runtime, checking whether they are
running or not and starting/stopping them.
This commit is contained in:
Botond Dénes
2021-10-21 17:06:18 +03:00
parent e9c9a39c06
commit 3f56c49a9e
3 changed files with 22 additions and 22 deletions

14
main.cc
View File

@@ -1328,9 +1328,7 @@ int main(int ac, char** av) {
cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg);
ss.local().register_client_shutdown_hook("native transport", [&cql_server_ctl] {
cql_server_ctl.stop_server().get();
});
ss.local().register_protocol_server(cql_server_ctl);
std::any stop_cql;
if (cfg->start_native_transport()) {
@@ -1352,9 +1350,7 @@ int main(int ac, char** av) {
::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter, ss);
ss.local().register_client_shutdown_hook("rpc server", [&thrift_ctl] {
thrift_ctl.stop_server().get();
});
ss.local().register_protocol_server(thrift_ctl);
std::any stop_rpc;
if (cfg->start_rpc()) {
@@ -1379,11 +1375,8 @@ int main(int ac, char** av) {
with_scheduling_group(dbcfg.statement_scheduling_group, [&alternator_ctl] () mutable {
return alternator_ctl.start_server();
}).get();
ss.local().register_client_shutdown_hook("alternator", [&alternator_ctl] {
alternator_ctl.stop_server().get();
});
}
ss.local().register_protocol_server(alternator_ctl);
redis_service redis(proxy, auth_service, mm, *cfg, gossiper);
if (cfg->redis_port() || cfg->redis_ssl_port()) {
@@ -1391,6 +1384,7 @@ int main(int ac, char** av) {
return redis.start_server();
}).get();
}
ss.local().register_protocol_server(redis);
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
api::set_server_done(ctx).get();

View File

@@ -1276,7 +1276,7 @@ future<> storage_service::stop_transport() {
(void) seastar::async([this] {
slogger.info("Stop transport: starts");
shutdown_client_servers();
shutdown_protocol_servers();
slogger.info("Stop transport: shutdown rpc and cql server done");
_gossiper.container().invoke_on_all(&gms::gossiper::shutdown).get();
@@ -3080,17 +3080,17 @@ void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t ex
}
}
void storage_service::shutdown_client_servers() {
for (auto& [name, hook] : _client_shutdown_hooks) {
slogger.info("Shutting down {}", name);
void storage_service::shutdown_protocol_servers() {
for (auto& server : _protocol_servers) {
slogger.info("Shutting down {} server", server->name());
try {
hook();
server->stop_server().get();
} catch (...) {
slogger.error("Unexpected error shutting down {}: {}",
name, std::current_exception());
slogger.error("Unexpected error shutting down {} server: {}",
server->name(), std::current_exception());
throw;
}
slogger.info("Shutting down {} was successful", name);
slogger.info("Shutting down {} server was successful", server->name());
}
}

View File

@@ -60,6 +60,7 @@
#include <seastar/core/distributed.hh>
#include "utils/disk-error-handler.hh"
#include "service/migration_listener.hh"
#include "protocol_server.hh"
#include "gms/feature_service.hh"
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/rwlock.hh>
@@ -174,7 +175,7 @@ private:
bool _stream_manager_stopped = false;
seastar::metrics::metric_groups _metrics;
using client_shutdown_hook = noncopyable_function<void()>;
std::vector<std::pair<std::string, client_shutdown_hook>> _client_shutdown_hooks;
std::vector<protocol_server*> _protocol_servers;
std::vector<std::any> _listeners;
std::unordered_map<utils::UUID, node_ops_meta_data> _node_ops;
@@ -336,14 +337,19 @@ public:
// should only be called via JMX
future<bool> is_gossip_running();
void register_client_shutdown_hook(std::string name, client_shutdown_hook hook) {
_client_shutdown_hooks.push_back({std::move(name), std::move(hook)});
void register_protocol_server(protocol_server& server) {
_protocol_servers.push_back(&server);
}
// All pointers are valid.
const std::vector<protocol_server*>& protocol_servers() const {
return _protocol_servers;
}
private:
future<> do_stop_ms();
future<> do_stop_stream_manager();
// Runs in thread context
void shutdown_client_servers();
void shutdown_protocol_servers();
// Tokens and the CDC streams timestamp of the replaced node.
using replacement_info = std::unordered_set<token>;