service: raft: raft_group_registry: use direct failure detector notifications for raft server liveness

On each shard, we register a listener for the new direct failure detector service.
The listener maintains a set of live addresses; on mark_alive it adds a
server to the set and on mark_dead it removes it. This set is then used
to implement the `raft::failure_detector` interface, consisting of
`is_alive()` function, which simply checks set membership.

There is some complexity in between, because we need to translate
direct_failure_detector endpoint_ids to inet_addresses and raft::server_ids
to inet_addreses, but all building blocks are already there.
This commit is contained in:
Kamil Braun
2022-03-14 19:18:10 +01:00
parent 7e4bb68061
commit 295aec2633
2 changed files with 21 additions and 11 deletions

View File

@@ -7,8 +7,8 @@
*/
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_rpc.hh"
#include "service/raft/raft_gossip_failure_detector.hh"
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "serializer_impl.hh"
#include "idl/raft.dist.hh"
#include "gms/feature_service.hh"
@@ -68,8 +68,8 @@ raft_group_registry::raft_group_registry(bool is_enabled, netw::messaging_servic
gms::gossiper& gossiper, gms::feature_service& feat, direct_failure_detector::failure_detector& fd)
: _is_enabled(is_enabled)
, _ms(ms)
, _fd(make_shared<raft_gossip_failure_detector>(gossiper, _srv_address_mappings))
, _direct_fd(fd)
, _direct_fd_proxy(make_shared<direct_fd_proxy>(gossiper.get_direct_fd_pinger(), _srv_address_mappings))
, _raft_support_listener(feat.uses_raft_cluster_mgmt.when_enabled([] {
// TODO: join group 0 on upgrade
}))
@@ -216,7 +216,10 @@ seastar::future<> raft_group_registry::start() {
// Once a Raft server starts, it soon times out
// and starts an election, so RPC must be ready by
// then to send VoteRequest messages.
co_return init_rpc_verbs();
init_rpc_verbs();
_direct_fd_subscription.emplace(co_await _direct_fd.register_listener(*_direct_fd_proxy,
direct_fd_clock::base::duration{std::chrono::seconds{1}}.count()));
}
seastar::future<> raft_group_registry::stop() {
@@ -292,4 +295,10 @@ unsigned raft_group_registry::shard_for_group(const raft::group_id& gid) const {
return 0; // schema raft server is always owned by shard 0
}
shared_ptr<raft::failure_detector> raft_group_registry::failure_detector() {
return _direct_fd_proxy;
}
raft_group_registry::~raft_group_registry() = default;
} // end of namespace service

View File

@@ -16,18 +16,14 @@
#include "raft/server.hh"
#include "service/raft/raft_address_map.hh"
#include "gms/feature.hh"
#include "direct_failure_detector/failure_detector.hh"
namespace gms { class gossiper; class feature_service; }
namespace direct_failure_detector {
class failure_detector;
}
namespace service {
class raft_rpc;
class raft_sys_table_storage;
class raft_gossip_failure_detector;
struct raft_group_not_found: public raft::error {
raft::group_id gid;
@@ -60,10 +56,14 @@ private:
std::unordered_map<raft::group_id, raft_server_for_group> _servers;
// inet_address:es for remote raft servers known to us
raft_address_map<> _srv_address_mappings;
// Shard-local failure detector instance shared among all raft groups
seastar::shared_ptr<raft_gossip_failure_detector> _fd;
direct_failure_detector::failure_detector& _direct_fd;
// Listens to notifications from direct failure detector.
// Implements the `raft::failure_detector` interface. Used by all raft groups to check server liveness.
class direct_fd_proxy;
seastar::shared_ptr<direct_fd_proxy> _direct_fd_proxy;
// Direct failure detector listener subscription for `_direct_fd_proxy`.
std::optional<direct_failure_detector::subscription> _direct_fd_subscription;
void init_rpc_verbs();
seastar::future<> uninit_rpc_verbs();
@@ -79,6 +79,7 @@ private:
public:
raft_group_registry(bool is_enabled, netw::messaging_service& ms, gms::gossiper& gs,
gms::feature_service& feat, direct_failure_detector::failure_detector& fd);
~raft_group_registry();
// Called manually at start
seastar::future<> start();
@@ -103,7 +104,7 @@ public:
// arm the associated timer to tick the server.
future<> start_server_for_group(raft_server_for_group grp);
unsigned shard_for_group(const raft::group_id& gid) const;
shared_ptr<raft_gossip_failure_detector>& failure_detector() { return _fd; }
shared_ptr<raft::failure_detector> failure_detector();
raft_address_map<>& address_map() { return _srv_address_mappings; }
direct_failure_detector::failure_detector& direct_fd() { return _direct_fd; }