From 295aec26335191d2c49cea701049e62c5aefec47 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 14 Mar 2022 19:18:10 +0100 Subject: [PATCH] service: raft: raft_group_registry: use direct failure detector notifications for raft server liveness On each shard, we register a listener for the new direct failure detector service. The listener maintains a set of live addresses; on mark_alive it adds a server to the set and on mark_dead it removes it. This set is then used to implement the `raft::failure_detector` interface, consisting of `is_alive()` function, which simply checks set membership. There is some complexity in between, because we need to translate direct_failure_detector endpoint_ids to inet_addresses and raft::server_ids to inet_addreses, but all building blocks are already there. --- service/raft/raft_group_registry.cc | 15 ++++++++++++--- service/raft/raft_group_registry.hh | 17 +++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 9873ae4756..87dc6db023 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -7,8 +7,8 @@ */ #include "service/raft/raft_group_registry.hh" #include "service/raft/raft_rpc.hh" -#include "service/raft/raft_gossip_failure_detector.hh" #include "message/messaging_service.hh" +#include "gms/gossiper.hh" #include "serializer_impl.hh" #include "idl/raft.dist.hh" #include "gms/feature_service.hh" @@ -68,8 +68,8 @@ raft_group_registry::raft_group_registry(bool is_enabled, netw::messaging_servic gms::gossiper& gossiper, gms::feature_service& feat, direct_failure_detector::failure_detector& fd) : _is_enabled(is_enabled) , _ms(ms) - , _fd(make_shared(gossiper, _srv_address_mappings)) , _direct_fd(fd) + , _direct_fd_proxy(make_shared(gossiper.get_direct_fd_pinger(), _srv_address_mappings)) , _raft_support_listener(feat.uses_raft_cluster_mgmt.when_enabled([] { // TODO: join group 0 on upgrade })) @@ -216,7 +216,10 @@ seastar::future<> raft_group_registry::start() { // Once a Raft server starts, it soon times out // and starts an election, so RPC must be ready by // then to send VoteRequest messages. - co_return init_rpc_verbs(); + init_rpc_verbs(); + + _direct_fd_subscription.emplace(co_await _direct_fd.register_listener(*_direct_fd_proxy, + direct_fd_clock::base::duration{std::chrono::seconds{1}}.count())); } seastar::future<> raft_group_registry::stop() { @@ -292,4 +295,10 @@ unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const { return 0; // schema raft server is always owned by shard 0 } +shared_ptr raft_group_registry::failure_detector() { + return _direct_fd_proxy; +} + +raft_group_registry::~raft_group_registry() = default; + } // end of namespace service diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh index be045b122a..fed2b855e6 100644 --- a/service/raft/raft_group_registry.hh +++ b/service/raft/raft_group_registry.hh @@ -16,18 +16,14 @@ #include "raft/server.hh" #include "service/raft/raft_address_map.hh" #include "gms/feature.hh" +#include "direct_failure_detector/failure_detector.hh" namespace gms { class gossiper; class feature_service; } -namespace direct_failure_detector { -class failure_detector; -} - namespace service { class raft_rpc; class raft_sys_table_storage; -class raft_gossip_failure_detector; struct raft_group_not_found: public raft::error { raft::group_id gid; @@ -60,10 +56,14 @@ private: std::unordered_map _servers; // inet_address:es for remote raft servers known to us raft_address_map<> _srv_address_mappings; - // Shard-local failure detector instance shared among all raft groups - seastar::shared_ptr _fd; direct_failure_detector::failure_detector& _direct_fd; + // Listens to notifications from direct failure detector. + // Implements the `raft::failure_detector` interface. Used by all raft groups to check server liveness. + class direct_fd_proxy; + seastar::shared_ptr _direct_fd_proxy; + // Direct failure detector listener subscription for `_direct_fd_proxy`. + std::optional _direct_fd_subscription; void init_rpc_verbs(); seastar::future<> uninit_rpc_verbs(); @@ -79,6 +79,7 @@ private: public: raft_group_registry(bool is_enabled, netw::messaging_service& ms, gms::gossiper& gs, gms::feature_service& feat, direct_failure_detector::failure_detector& fd); + ~raft_group_registry(); // Called manually at start seastar::future<> start(); @@ -103,7 +104,7 @@ public: // arm the associated timer to tick the server. future<> start_server_for_group(raft_server_for_group grp); unsigned shard_for_group(const raft::group_id& gid) const; - shared_ptr& failure_detector() { return _fd; } + shared_ptr failure_detector(); raft_address_map<>& address_map() { return _srv_address_mappings; } direct_failure_detector::failure_detector& direct_fd() { return _direct_fd; }