From bdeef77f208cbd2c0c27498290832efbd65d20b2 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 10 Oct 2022 16:29:19 +0200 Subject: [PATCH] service/raft: ping `raft::server_id`s, not `gms::inet_address`es Whenever a Raft configuration change is performed, `raft::server` calls `raft_rpc::add_server`/`raft_rpc::remove_server`. Our `raft_rpc` implementation has a function, `_on_server_update`, passed in the constructor, which it called in `add_server`/`remove_server`; that function would update the set of endpoints detected by the direct failure detector. `_on_server_update` was passed an IP address and that address was added to / removed from the failure detector set (there's another translation layer between the IP addresses and internal failure detector 'endpoint ID's; but we can ignore it for the purposes of this commit). Therefore: the failure detector was pinging a certain set of IP addresses. These IP addresses were updated during Raft configuration changes. To implement the `is_alive(raft::server_id)` function (required by `raft::failure_detector` interface), we would translate the ID using the Raft address map, which is currently also updated during configuration changes, to an IP address, and check if that IP address is alive according to the direct failure detector (which maintained an `_alive_set` of type `unordered_set`). This all works well but it assumes that servers can be identified using IP addresses - it doesn't play well with the fact that servers may change their IP addresses. The only immutable identifier we have for a server is `raft::server_id`. In the future, Raft configurations will not associate IP addresses with Raft servers; instead we will assume that IP addresses can change at any time, and there will be a different mechanism that eventually updates the Raft address map with the latest IP address for each `raft::server_id`. To prepare us for that future, in this commit we no longer operate in terms of IP addresses in the failure detector, but in terms of `raft::server_id`s. Most of the commit is boilerplate, changing `gms::inet_address` to `raft::server_id` and function/variable names. The interesting changes are: - in `is_alive`, we no longer need to translate the `raft::server_id` to an IP address, because now the stored `_alive_set` already contains `raft::server_id`s instead of `gms::inet_address`es. - the `ping` function now takes a `raft::server_id` instead of `gms::inet_address`. To send the ping message, we need to translate this to IP address; we do it by the `raft_address_map` pointer introduced in an earlier commit. Thus, there is still a point where we have to translate between `raft::server_id` and `gms::inet_address`; but observe we now do it at the last possible moment - just before sending the message. If we have no translation, we consider the `ping` to have failed - it's equivalent to a network failure where no route to a given address was found. --- service/raft/raft_group0.cc | 10 ++-- service/raft/raft_group_registry.cc | 72 ++++++++++++++--------------- service/raft/raft_group_registry.hh | 16 +++---- service/raft/raft_rpc.cc | 9 ++-- service/raft/raft_rpc.hh | 4 +- 5 files changed, 53 insertions(+), 58 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index c1c92be071..73c17c0fed 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -161,15 +161,13 @@ raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, _raft_gr.address_map().set(my_addr); auto state_machine = std::make_unique(_client, _mm, _qp.proxy()); auto rpc = std::make_unique(*state_machine, _ms, _raft_gr.address_map(), gid, my_id, - [this] (gms::inet_address addr, raft::server_id raft_id, bool added) { - // FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme. - // See #6403. - auto fd_id = _raft_gr.get_fd_pinger().allocate_id(addr); + [this] (raft::server_id raft_id, bool added) { + auto fd_id = _raft_gr.get_fd_pinger().allocate_id(raft_id); if (added) { - group0_log.info("Added {} (address: {}) to group 0 RPC map", raft_id, addr); + group0_log.info("Added Raft server {} to failure detector (endpoint id: {})", raft_id, fd_id); _raft_gr.direct_fd().add_endpoint(fd_id); } else { - group0_log.info("Removed {} (address: {}) from group 0 RPC map", raft_id, addr); + group0_log.info("Removed Raft server {} from failure detector (endpoint id: {})", raft_id, fd_id); _raft_gr.direct_fd().remove_endpoint(fd_id); } }); diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 50b4d7ded2..fc21eed3a7 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -22,51 +22,43 @@ logging::logger rslog("raft_group_registry"); class raft_group_registry::direct_fd_proxy : public raft::failure_detector, public direct_failure_detector::listener { direct_fd_pinger& _fd_pinger; - raft_address_map<>& _address_map; - std::unordered_set _alive_set; + std::unordered_set _alive_set; public: - direct_fd_proxy(direct_fd_pinger& fd_pinger, raft_address_map<>& address_map) - : _fd_pinger(fd_pinger), _address_map(address_map) { + direct_fd_proxy(direct_fd_pinger& fd_pinger) : _fd_pinger(fd_pinger) { } future<> mark_alive(direct_failure_detector::pinger::endpoint_id id) override { - static const auto msg = "marking address {} as alive for raft groups"; + static const auto msg = "marking Raft server {} as alive for raft groups"; - auto addr = co_await _fd_pinger.get_address(id); - _alive_set.insert(addr); + auto raft_id = co_await _fd_pinger.get_raft_id(id); + _alive_set.insert(raft_id); // The listener should be registered on every shard. // Write the message on INFO level only on shard 0 so we don't spam the logs. if (this_shard_id() == 0) { - rslog.info(msg, addr); + rslog.info(msg, raft_id); } else { - rslog.debug(msg, addr); + rslog.debug(msg, raft_id); } } future<> mark_dead(direct_failure_detector::pinger::endpoint_id id) override { - static const auto msg = "marking address {} as dead for raft groups"; + static const auto msg = "marking Raft server {} as dead for raft groups"; - auto addr = co_await _fd_pinger.get_address(id); - _alive_set.erase(addr); + auto raft_id = co_await _fd_pinger.get_raft_id(id); + _alive_set.erase(raft_id); // As above. if (this_shard_id() == 0) { - rslog.info(msg, addr); + rslog.info(msg, raft_id); } else { - rslog.debug(msg, addr); + rslog.debug(msg, raft_id); } } bool is_alive(raft::server_id srv) override { - // We could yield between updating the list of servers in raft/fsm - // and updating the raft_address_map, e.g. in case of a set_configuration. - // If tick_leader happens before the raft_address_map is updated, - // is_alive will be called with server_id that is not in the map yet. - - const auto address = _address_map.find(srv); - return address && _alive_set.contains(*address); + return _alive_set.contains(srv); } direct_fd_pinger& get_fd_pinger() { return _fd_pinger; } @@ -78,7 +70,7 @@ raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map<>& ad , _ms(ms) , _srv_address_mappings{address_map} , _direct_fd(fd) - , _direct_fd_proxy(make_shared(pinger, _srv_address_mappings)) + , _direct_fd_proxy(make_shared(pinger)) { } @@ -336,40 +328,46 @@ direct_fd_pinger& raft_group_registry::get_fd_pinger() { return _direct_fd_proxy->get_fd_pinger(); } -direct_failure_detector::pinger::endpoint_id direct_fd_pinger::allocate_id(gms::inet_address addr) { +direct_failure_detector::pinger::endpoint_id direct_fd_pinger::allocate_id(raft::server_id srv) { assert(this_shard_id() == 0); - auto it = _addr_to_id.find(addr); - if (it == _addr_to_id.end()) { + auto it = _raft_id_to_ep_id.find(srv); + if (it == _raft_id_to_ep_id.end()) { auto id = _next_allocated_id++; - _id_to_addr.emplace(id, addr); - it = _addr_to_id.emplace(addr, id).first; - rslog.debug("direct_fd_pinger: assigned endpoint ID {} to address {}", id, addr); + _ep_id_to_raft_id.emplace(id, srv); + it = _raft_id_to_ep_id.emplace(srv, id).first; + rslog.debug("direct_fd_pinger: assigned endpoint ID {} to Raft server {}", id, srv); } return it->second; } -future direct_fd_pinger::get_address(direct_failure_detector::pinger::endpoint_id id) { - auto it = _id_to_addr.find(id); - if (it == _id_to_addr.end()) { +future direct_fd_pinger::get_raft_id(direct_failure_detector::pinger::endpoint_id id) { + auto it = _ep_id_to_raft_id.find(id); + if (it == _ep_id_to_raft_id.end()) { // Fetch the address from shard 0. By precondition it must be there. - auto addr = co_await container().invoke_on(0, [id] (direct_fd_pinger& pinger) { - auto it = pinger._id_to_addr.find(id); - if (it == pinger._id_to_addr.end()) { - on_internal_error(rslog, format("direct_fd_pinger: endpoint id {} has no corresponding address", id)); + auto srv = co_await container().invoke_on(0, [id] (direct_fd_pinger& pinger) { + auto it = pinger._ep_id_to_raft_id.find(id); + if (it == pinger._ep_id_to_raft_id.end()) { + on_internal_error(rslog, format("direct_fd_pinger: endpoint id {} has no corresponding Raft server", id)); } return it->second; }); - it = _id_to_addr.emplace(id, addr).first; + it = _ep_id_to_raft_id.emplace(id, srv).first; } co_return it->second; } future direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) { + auto raft_id = co_await get_raft_id(id); + auto addr = _address_map.find(raft_id); + if (!addr) { + co_return false; + } + try { - co_await _echo_pinger.ping(co_await get_address(id), as); + co_await _echo_pinger.ping(*addr, as); } catch (seastar::rpc::closed_error&) { co_return false; } diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh index 62030c44ab..4333a83c99 100644 --- a/service/raft/raft_group_registry.hh +++ b/service/raft/raft_group_registry.hh @@ -122,7 +122,7 @@ public: }; // Implementation of `direct_failure_detector::pinger` which uses gossip echo messages for pinging. -// Stores a mapping between `direct_failure_detector::pinger::endpoint_id`s and `inet_address`es. +// Stores a mapping between `direct_failure_detector::pinger::endpoint_id`s and `raft::server_id`s. // The actual pinging is performed by `echo_pinger`. class direct_fd_pinger : public seastar::peering_sharded_service, public direct_failure_detector::pinger { gms::echo_pinger& _echo_pinger; @@ -132,12 +132,12 @@ class direct_fd_pinger : public seastar::peering_sharded_service _id_to_addr; + // when `ping` or `get_raft_id` is called with an unknown ID on a different shard, it will fetch the ID from shard 0. + std::unordered_map _ep_id_to_raft_id; - // Used to quickly check if given address already has an assigned ID. + // Used to quickly check if given Raft ID already has an assigned endpoint ID. // Used only on shard 0, not replicated. - std::unordered_map _addr_to_id; + std::unordered_map _raft_id_to_ep_id; public: direct_fd_pinger(gms::echo_pinger& pinger, raft_address_map<>& address_map) @@ -146,12 +146,12 @@ public: direct_fd_pinger(const direct_fd_pinger&) = delete; direct_fd_pinger(direct_fd_pinger&&) = delete; - // Allocate a new endpoint_id for `addr`, or if one already exists, return it. + // Allocate a new endpoint_id for `srv`, or if one already exists, return it. // Call only on shard 0. - direct_failure_detector::pinger::endpoint_id allocate_id(gms::inet_address addr); + direct_failure_detector::pinger::endpoint_id allocate_id(raft::server_id srv); // Precondition: `id` was returned from `allocate_id` on shard 0 earlier. - future get_address(direct_failure_detector::pinger::endpoint_id id); + future get_raft_id(direct_failure_detector::pinger::endpoint_id id); future ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override; }; diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 71216be7f8..c5745ee3fd 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -22,7 +22,7 @@ static seastar::logger rlogger("raft_rpc"); raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id, - noncopyable_function on_server_update) + noncopyable_function on_server_update) : _sm(sm), _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms) , _address_map(address_map), _on_server_update(std::move(on_server_update)) {} @@ -163,13 +163,12 @@ void raft_rpc::add_server(raft::server_address addr) { // Entries explicitly managed via `rpc::add_server` and `rpc::remove_server` should never expire // while entries learnt upon receiving an rpc message should be expirable. _address_map.set(addr.id, inet_addr, false); - _on_server_update(inet_addr, addr.id, true); + _on_server_update(addr.id, true); } void raft_rpc::remove_server(raft::server_id id) { - if (auto inet_addr = _address_map.set_expiring_flag(id)) { - _on_server_update(*inet_addr, id, false); - } + _on_server_update(id, false); + _address_map.set_expiring_flag(id); } future<> raft_rpc::abort() { diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index 9fe7b04627..44fc83b6e9 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -26,7 +26,7 @@ class raft_rpc : public raft::rpc { raft::server_id _server_id; netw::messaging_service& _messaging; raft_address_map<>& _address_map; - noncopyable_function _on_server_update; + noncopyable_function _on_server_update; seastar::gate _shutdown_gate; raft_ticker_type::time_point timeout() { @@ -37,7 +37,7 @@ public: explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id, // Called when a server is added or removed from the RPC configuration. - noncopyable_function on_server_update); + noncopyable_function on_server_update); future send_snapshot(raft::server_id server_id, const raft::install_snapshot& snap, seastar::abort_source& as) override; future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) override;