diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index c1c92be071..73c17c0fed 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -161,15 +161,13 @@ raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, _raft_gr.address_map().set(my_addr); auto state_machine = std::make_unique(_client, _mm, _qp.proxy()); auto rpc = std::make_unique(*state_machine, _ms, _raft_gr.address_map(), gid, my_id, - [this] (gms::inet_address addr, raft::server_id raft_id, bool added) { - // FIXME: we should eventually switch to UUID-based (not IP-based) node identification/communication scheme. - // See #6403. - auto fd_id = _raft_gr.get_fd_pinger().allocate_id(addr); + [this] (raft::server_id raft_id, bool added) { + auto fd_id = _raft_gr.get_fd_pinger().allocate_id(raft_id); if (added) { - group0_log.info("Added {} (address: {}) to group 0 RPC map", raft_id, addr); + group0_log.info("Added Raft server {} to failure detector (endpoint id: {})", raft_id, fd_id); _raft_gr.direct_fd().add_endpoint(fd_id); } else { - group0_log.info("Removed {} (address: {}) from group 0 RPC map", raft_id, addr); + group0_log.info("Removed Raft server {} from failure detector (endpoint id: {})", raft_id, fd_id); _raft_gr.direct_fd().remove_endpoint(fd_id); } }); diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 50b4d7ded2..fc21eed3a7 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -22,51 +22,43 @@ logging::logger rslog("raft_group_registry"); class raft_group_registry::direct_fd_proxy : public raft::failure_detector, public direct_failure_detector::listener { direct_fd_pinger& _fd_pinger; - raft_address_map<>& _address_map; - std::unordered_set _alive_set; + std::unordered_set _alive_set; public: - direct_fd_proxy(direct_fd_pinger& fd_pinger, raft_address_map<>& address_map) - : _fd_pinger(fd_pinger), _address_map(address_map) { + direct_fd_proxy(direct_fd_pinger& fd_pinger) : _fd_pinger(fd_pinger) { } future<> mark_alive(direct_failure_detector::pinger::endpoint_id id) override { - static const auto msg = "marking address {} as alive for raft groups"; + static const auto msg = "marking Raft server {} as alive for raft groups"; - auto addr = co_await _fd_pinger.get_address(id); - _alive_set.insert(addr); + auto raft_id = co_await _fd_pinger.get_raft_id(id); + _alive_set.insert(raft_id); // The listener should be registered on every shard. // Write the message on INFO level only on shard 0 so we don't spam the logs. if (this_shard_id() == 0) { - rslog.info(msg, addr); + rslog.info(msg, raft_id); } else { - rslog.debug(msg, addr); + rslog.debug(msg, raft_id); } } future<> mark_dead(direct_failure_detector::pinger::endpoint_id id) override { - static const auto msg = "marking address {} as dead for raft groups"; + static const auto msg = "marking Raft server {} as dead for raft groups"; - auto addr = co_await _fd_pinger.get_address(id); - _alive_set.erase(addr); + auto raft_id = co_await _fd_pinger.get_raft_id(id); + _alive_set.erase(raft_id); // As above. if (this_shard_id() == 0) { - rslog.info(msg, addr); + rslog.info(msg, raft_id); } else { - rslog.debug(msg, addr); + rslog.debug(msg, raft_id); } } bool is_alive(raft::server_id srv) override { - // We could yield between updating the list of servers in raft/fsm - // and updating the raft_address_map, e.g. in case of a set_configuration. - // If tick_leader happens before the raft_address_map is updated, - // is_alive will be called with server_id that is not in the map yet. - - const auto address = _address_map.find(srv); - return address && _alive_set.contains(*address); + return _alive_set.contains(srv); } direct_fd_pinger& get_fd_pinger() { return _fd_pinger; } @@ -78,7 +70,7 @@ raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map<>& ad , _ms(ms) , _srv_address_mappings{address_map} , _direct_fd(fd) - , _direct_fd_proxy(make_shared(pinger, _srv_address_mappings)) + , _direct_fd_proxy(make_shared(pinger)) { } @@ -336,40 +328,46 @@ direct_fd_pinger& raft_group_registry::get_fd_pinger() { return _direct_fd_proxy->get_fd_pinger(); } -direct_failure_detector::pinger::endpoint_id direct_fd_pinger::allocate_id(gms::inet_address addr) { +direct_failure_detector::pinger::endpoint_id direct_fd_pinger::allocate_id(raft::server_id srv) { assert(this_shard_id() == 0); - auto it = _addr_to_id.find(addr); - if (it == _addr_to_id.end()) { + auto it = _raft_id_to_ep_id.find(srv); + if (it == _raft_id_to_ep_id.end()) { auto id = _next_allocated_id++; - _id_to_addr.emplace(id, addr); - it = _addr_to_id.emplace(addr, id).first; - rslog.debug("direct_fd_pinger: assigned endpoint ID {} to address {}", id, addr); + _ep_id_to_raft_id.emplace(id, srv); + it = _raft_id_to_ep_id.emplace(srv, id).first; + rslog.debug("direct_fd_pinger: assigned endpoint ID {} to Raft server {}", id, srv); } return it->second; } -future direct_fd_pinger::get_address(direct_failure_detector::pinger::endpoint_id id) { - auto it = _id_to_addr.find(id); - if (it == _id_to_addr.end()) { +future direct_fd_pinger::get_raft_id(direct_failure_detector::pinger::endpoint_id id) { + auto it = _ep_id_to_raft_id.find(id); + if (it == _ep_id_to_raft_id.end()) { // Fetch the address from shard 0. By precondition it must be there. - auto addr = co_await container().invoke_on(0, [id] (direct_fd_pinger& pinger) { - auto it = pinger._id_to_addr.find(id); - if (it == pinger._id_to_addr.end()) { - on_internal_error(rslog, format("direct_fd_pinger: endpoint id {} has no corresponding address", id)); + auto srv = co_await container().invoke_on(0, [id] (direct_fd_pinger& pinger) { + auto it = pinger._ep_id_to_raft_id.find(id); + if (it == pinger._ep_id_to_raft_id.end()) { + on_internal_error(rslog, format("direct_fd_pinger: endpoint id {} has no corresponding Raft server", id)); } return it->second; }); - it = _id_to_addr.emplace(id, addr).first; + it = _ep_id_to_raft_id.emplace(id, srv).first; } co_return it->second; } future direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) { + auto raft_id = co_await get_raft_id(id); + auto addr = _address_map.find(raft_id); + if (!addr) { + co_return false; + } + try { - co_await _echo_pinger.ping(co_await get_address(id), as); + co_await _echo_pinger.ping(*addr, as); } catch (seastar::rpc::closed_error&) { co_return false; } diff --git a/service/raft/raft_group_registry.hh b/service/raft/raft_group_registry.hh index 62030c44ab..4333a83c99 100644 --- a/service/raft/raft_group_registry.hh +++ b/service/raft/raft_group_registry.hh @@ -122,7 +122,7 @@ public: }; // Implementation of `direct_failure_detector::pinger` which uses gossip echo messages for pinging. -// Stores a mapping between `direct_failure_detector::pinger::endpoint_id`s and `inet_address`es. +// Stores a mapping between `direct_failure_detector::pinger::endpoint_id`s and `raft::server_id`s. // The actual pinging is performed by `echo_pinger`. class direct_fd_pinger : public seastar::peering_sharded_service, public direct_failure_detector::pinger { gms::echo_pinger& _echo_pinger; @@ -132,12 +132,12 @@ class direct_fd_pinger : public seastar::peering_sharded_service _id_to_addr; + // when `ping` or `get_raft_id` is called with an unknown ID on a different shard, it will fetch the ID from shard 0. + std::unordered_map _ep_id_to_raft_id; - // Used to quickly check if given address already has an assigned ID. + // Used to quickly check if given Raft ID already has an assigned endpoint ID. // Used only on shard 0, not replicated. - std::unordered_map _addr_to_id; + std::unordered_map _raft_id_to_ep_id; public: direct_fd_pinger(gms::echo_pinger& pinger, raft_address_map<>& address_map) @@ -146,12 +146,12 @@ public: direct_fd_pinger(const direct_fd_pinger&) = delete; direct_fd_pinger(direct_fd_pinger&&) = delete; - // Allocate a new endpoint_id for `addr`, or if one already exists, return it. + // Allocate a new endpoint_id for `srv`, or if one already exists, return it. // Call only on shard 0. - direct_failure_detector::pinger::endpoint_id allocate_id(gms::inet_address addr); + direct_failure_detector::pinger::endpoint_id allocate_id(raft::server_id srv); // Precondition: `id` was returned from `allocate_id` on shard 0 earlier. - future get_address(direct_failure_detector::pinger::endpoint_id id); + future get_raft_id(direct_failure_detector::pinger::endpoint_id id); future ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override; }; diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 71216be7f8..c5745ee3fd 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -22,7 +22,7 @@ static seastar::logger rlogger("raft_rpc"); raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id, - noncopyable_function on_server_update) + noncopyable_function on_server_update) : _sm(sm), _group_id(std::move(gid)), _server_id(srv_id), _messaging(ms) , _address_map(address_map), _on_server_update(std::move(on_server_update)) {} @@ -163,13 +163,12 @@ void raft_rpc::add_server(raft::server_address addr) { // Entries explicitly managed via `rpc::add_server` and `rpc::remove_server` should never expire // while entries learnt upon receiving an rpc message should be expirable. _address_map.set(addr.id, inet_addr, false); - _on_server_update(inet_addr, addr.id, true); + _on_server_update(addr.id, true); } void raft_rpc::remove_server(raft::server_id id) { - if (auto inet_addr = _address_map.set_expiring_flag(id)) { - _on_server_update(*inet_addr, id, false); - } + _on_server_update(id, false); + _address_map.set_expiring_flag(id); } future<> raft_rpc::abort() { diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index 9fe7b04627..44fc83b6e9 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -26,7 +26,7 @@ class raft_rpc : public raft::rpc { raft::server_id _server_id; netw::messaging_service& _messaging; raft_address_map<>& _address_map; - noncopyable_function _on_server_update; + noncopyable_function _on_server_update; seastar::gate _shutdown_gate; raft_ticker_type::time_point timeout() { @@ -37,7 +37,7 @@ public: explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, raft_address_map<>& address_map, raft::group_id gid, raft::server_id srv_id, // Called when a server is added or removed from the RPC configuration. - noncopyable_function on_server_update); + noncopyable_function on_server_update); future send_snapshot(raft::server_id server_id, const raft::install_snapshot& snap, seastar::abort_source& as) override; future<> send_append_entries(raft::server_id id, const raft::append_request& append_request) override;