From 38f65e5a2ea96fca65b7a3cf7729635f538dbd54 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 14 Mar 2022 17:17:20 +0100 Subject: [PATCH] main: start direct failure detector service We add the new direct failure detector to the list of services started in the Scylla process. To start the service, we need an implementation of `pinger` and `clock`. `pinger` is implemented using existing GOSSIP_ECHO verb. The gossip echo message requires the node's gossip generation number. We handle this by embedding the pinger implementation inside `gossiper`, and making `gossiper` update the generation number (cached inside the pinger class) periodically. `clock` is a simple implementation which uses `std::chrono::steady_clock` and `seastar::sleep_until` underneath. Translating `steady_clock` durations to `direct_failure_detector::clock` durations happens by taking the number of ticks. The service is currently not used, just initialized; no endpoints are added and no listeners are registered yet, but the following commits change that. --- gms/gossiper.cc | 70 +++++++++++++++++++++++++++++++++++++++- gms/gossiper.hh | 53 ++++++++++++++++++++++++++++++ main.cc | 12 +++++++ test/lib/cql_test_env.cc | 11 +++++++ 4 files changed, 145 insertions(+), 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b4c7d4e663..77358ab526 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -97,7 +97,8 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s , _sys_ks(sys_ks) , _failure_detector_timeout_ms(cfg.failure_detector_timeout_in_ms) , _force_gossip_generation(cfg.force_gossip_generation) - , _gcfg(std::move(gcfg)) { + , _gcfg(std::move(gcfg)) + , _direct_fd_pinger(*this) { // Gossiper's stuff below runs only on CPU0 if (this_shard_id() != 0) { return; @@ -940,6 +941,8 @@ void gossiper::run() { } }).get(); } + + _direct_fd_pinger.update_generation_number(endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation()).get(); }).then_wrapped([this] (auto&& f) { try { f.get(); @@ -1827,6 +1830,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map> @@ -2597,4 +2601,68 @@ inet_address_vector_replica_set gossiper::endpoint_filter(const sstring& local_r return result; } +future<> gossiper::direct_fd_pinger::update_generation_number(int64_t n) { + if (n <= _generation_number) { + return make_ready_future<>(); + } + + return _gossiper.container().invoke_on_all([n] (gossiper& g) { + g._direct_fd_pinger._generation_number = n; + }); +} + +direct_failure_detector::pinger::endpoint_id gossiper::direct_fd_pinger::allocate_id(gms::inet_address addr) { + assert(this_shard_id() == 0); + + auto it = _addr_to_id.find(addr); + if (it == _addr_to_id.end()) { + auto id = _next_allocated_id++; + _id_to_addr.emplace(id, addr); + it = _addr_to_id.emplace(addr, id).first; + logger.debug("gossiper::direct_fd_pinger: assigned endpoint ID {} to address {}", id, addr); + } + + return it->second; +} + +future gossiper::direct_fd_pinger::get_address(direct_failure_detector::pinger::endpoint_id id) { + auto it = _id_to_addr.find(id); + if (it == _id_to_addr.end()) { + // Fetch the address from shard 0. By precondition it must be there. + auto addr = co_await _gossiper.container().invoke_on(0, [id] (gossiper& g) { + auto it = g._direct_fd_pinger._id_to_addr.find(id); + if (it == g._direct_fd_pinger._id_to_addr.end()) { + on_internal_error(logger, format("gossiper::direct_fd_pinger: endpoint id {} has no corresponding address", id)); + } + return it->second; + }); + it = _id_to_addr.emplace(id, addr).first; + } + + co_return it->second; +} + +future gossiper::direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) { + try { + co_await _gossiper._messaging.send_gossip_echo(netw::msg_addr(co_await get_address(id)), _generation_number, as); + } catch (seastar::rpc::closed_error&) { + co_return false; + } + co_return true; +} + } // namespace gms + +direct_failure_detector::clock::timepoint_t direct_fd_clock::now() noexcept { + return base::now().time_since_epoch().count(); +} + +future<> direct_fd_clock::sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) { + auto t = base::time_point{base::duration{tp}}; + auto n = base::now(); + if (t <= n) { + return make_ready_future<>(); + } + + return sleep_abortable(t - n, as); +} diff --git a/gms/gossiper.hh b/gms/gossiper.hh index f7b9a389f9..f4fd96071b 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -30,6 +30,7 @@ #include "utils/updateable_value.hh" #include "utils/in.hh" #include "message/messaging_service_fwd.hh" +#include "direct_failure_detector/failure_detector.hh" #include #include #include @@ -617,6 +618,50 @@ private: future<> failure_detector_loop(); future<> failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version); future<> update_live_endpoints_version(); + +public: + // Implementation of `direct_failure_detector::pinger` which uses gossip echo messages for pinging. + // The gossip echo message must be provided this node's gossip generation number. + // It's an integer incremented when the node restarts or when the gossip subsystem restarts. + // We cache the generation number inside `direct_fd_pinger` on every shard and update it in the `gossiper` main loop. + // + // We also store a mapping between `direct_failure_detector::pinger::endpoint_id`s and `inet_address`es. + class direct_fd_pinger : public direct_failure_detector::pinger { + friend class gossiper; + gossiper& _gossiper; + + // Only used on shard 0 by `allocate_id`. + direct_failure_detector::pinger::endpoint_id _next_allocated_id{0}; + + // The mappings are created on shard 0 and lazily replicated to other shards: + // when `ping` or `get_address` is called with an unknown ID on a different shard, it will fetch the ID from shard 0. + std::unordered_map _id_to_addr; + + // Used to quickly check if given address already has an assigned ID. + // Used only on shard 0, not replicated. + std::unordered_map _addr_to_id; + + // This node's gossip generation number, updated by gossiper's loop and replicated to every shard. + int64_t _generation_number; + + future<> update_generation_number(int64_t n); + + direct_fd_pinger(gossiper& g) : _gossiper(g) {} + public: + // Allocate a new endpoint_id for `addr`, or if one already exists, return it. + // Call only on shard 0. + direct_failure_detector::pinger::endpoint_id allocate_id(gms::inet_address addr); + + // Precondition: `id` was returned from `allocate_id` on shard 0 earlier. + future get_address(direct_failure_detector::pinger::endpoint_id id); + + future ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override; + }; + + direct_fd_pinger& get_direct_fd_pinger() { return _direct_fd_pinger; } + +private: + direct_fd_pinger _direct_fd_pinger; }; inline future get_all_endpoint_states(gossiper& g) { @@ -683,3 +728,11 @@ struct gossip_get_endpoint_states_response { }; } // namespace gms + +// XXX: find a better place to put this? +struct direct_fd_clock : public direct_failure_detector::clock { + using base = std::chrono::steady_clock; + + direct_failure_detector::clock::timepoint_t now() noexcept override; + future<> sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) override; +}; diff --git a/main.cc b/main.cc index 4ae59e2084..dce14a46d1 100644 --- a/main.cc +++ b/main.cc @@ -879,6 +879,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // #293 - do not stop anything (unless snitch.on_all(start) fails) stop_snitch->cancel(); + static direct_fd_clock fd_clock; + static sharded fd; + supervisor::notify("starting direct failure detector service"); + fd.start( + sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)), + std::ref(fd_clock), + direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get(); + + auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] { + fd.stop().get(); + }); + raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT), std::ref(messaging), std::ref(gossiper), std::ref(feature_service)).get(); // XXX: stop_raft has to happen before query_processor diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 9ed5849d3a..8defbaa969 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -592,6 +592,17 @@ public: sharded raft_gr; sharded stream_manager; sharded forward_service; + sharded fd; + + direct_fd_clock fd_clock; + fd.start( + sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)), + std::ref(fd_clock), + direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get(); + + auto stop_fd = defer([&fd] { + fd.stop().get(); + }); raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT), std::ref(ms), std::ref(gossiper), std::ref(feature_service)).get();