diff --git a/gms/gossiper.cc b/gms/gossiper.cc index b4c7d4e663..77358ab526 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -97,7 +97,8 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s , _sys_ks(sys_ks) , _failure_detector_timeout_ms(cfg.failure_detector_timeout_in_ms) , _force_gossip_generation(cfg.force_gossip_generation) - , _gcfg(std::move(gcfg)) { + , _gcfg(std::move(gcfg)) + , _direct_fd_pinger(*this) { // Gossiper's stuff below runs only on CPU0 if (this_shard_id() != 0) { return; @@ -940,6 +941,8 @@ void gossiper::run() { } }).get(); } + + _direct_fd_pinger.update_generation_number(endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation()).get(); }).then_wrapped([this] (auto&& f) { try { f.get(); @@ -1827,6 +1830,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map> @@ -2597,4 +2601,68 @@ inet_address_vector_replica_set gossiper::endpoint_filter(const sstring& local_r return result; } +future<> gossiper::direct_fd_pinger::update_generation_number(int64_t n) { + if (n <= _generation_number) { + return make_ready_future<>(); + } + + return _gossiper.container().invoke_on_all([n] (gossiper& g) { + g._direct_fd_pinger._generation_number = n; + }); +} + +direct_failure_detector::pinger::endpoint_id gossiper::direct_fd_pinger::allocate_id(gms::inet_address addr) { + assert(this_shard_id() == 0); + + auto it = _addr_to_id.find(addr); + if (it == _addr_to_id.end()) { + auto id = _next_allocated_id++; + _id_to_addr.emplace(id, addr); + it = _addr_to_id.emplace(addr, id).first; + logger.debug("gossiper::direct_fd_pinger: assigned endpoint ID {} to address {}", id, addr); + } + + return it->second; +} + +future gossiper::direct_fd_pinger::get_address(direct_failure_detector::pinger::endpoint_id id) { + auto it = _id_to_addr.find(id); + if (it == _id_to_addr.end()) { + // Fetch the address from shard 0. By precondition it must be there. + auto addr = co_await _gossiper.container().invoke_on(0, [id] (gossiper& g) { + auto it = g._direct_fd_pinger._id_to_addr.find(id); + if (it == g._direct_fd_pinger._id_to_addr.end()) { + on_internal_error(logger, format("gossiper::direct_fd_pinger: endpoint id {} has no corresponding address", id)); + } + return it->second; + }); + it = _id_to_addr.emplace(id, addr).first; + } + + co_return it->second; +} + +future gossiper::direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) { + try { + co_await _gossiper._messaging.send_gossip_echo(netw::msg_addr(co_await get_address(id)), _generation_number, as); + } catch (seastar::rpc::closed_error&) { + co_return false; + } + co_return true; +} + } // namespace gms + +direct_failure_detector::clock::timepoint_t direct_fd_clock::now() noexcept { + return base::now().time_since_epoch().count(); +} + +future<> direct_fd_clock::sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) { + auto t = base::time_point{base::duration{tp}}; + auto n = base::now(); + if (t <= n) { + return make_ready_future<>(); + } + + return sleep_abortable(t - n, as); +} diff --git a/gms/gossiper.hh b/gms/gossiper.hh index f7b9a389f9..f4fd96071b 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -30,6 +30,7 @@ #include "utils/updateable_value.hh" #include "utils/in.hh" #include "message/messaging_service_fwd.hh" +#include "direct_failure_detector/failure_detector.hh" #include #include #include @@ -617,6 +618,50 @@ private: future<> failure_detector_loop(); future<> failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version); future<> update_live_endpoints_version(); + +public: + // Implementation of `direct_failure_detector::pinger` which uses gossip echo messages for pinging. + // The gossip echo message must be provided this node's gossip generation number. + // It's an integer incremented when the node restarts or when the gossip subsystem restarts. + // We cache the generation number inside `direct_fd_pinger` on every shard and update it in the `gossiper` main loop. + // + // We also store a mapping between `direct_failure_detector::pinger::endpoint_id`s and `inet_address`es. + class direct_fd_pinger : public direct_failure_detector::pinger { + friend class gossiper; + gossiper& _gossiper; + + // Only used on shard 0 by `allocate_id`. + direct_failure_detector::pinger::endpoint_id _next_allocated_id{0}; + + // The mappings are created on shard 0 and lazily replicated to other shards: + // when `ping` or `get_address` is called with an unknown ID on a different shard, it will fetch the ID from shard 0. + std::unordered_map _id_to_addr; + + // Used to quickly check if given address already has an assigned ID. + // Used only on shard 0, not replicated. + std::unordered_map _addr_to_id; + + // This node's gossip generation number, updated by gossiper's loop and replicated to every shard. + int64_t _generation_number; + + future<> update_generation_number(int64_t n); + + direct_fd_pinger(gossiper& g) : _gossiper(g) {} + public: + // Allocate a new endpoint_id for `addr`, or if one already exists, return it. + // Call only on shard 0. + direct_failure_detector::pinger::endpoint_id allocate_id(gms::inet_address addr); + + // Precondition: `id` was returned from `allocate_id` on shard 0 earlier. + future get_address(direct_failure_detector::pinger::endpoint_id id); + + future ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override; + }; + + direct_fd_pinger& get_direct_fd_pinger() { return _direct_fd_pinger; } + +private: + direct_fd_pinger _direct_fd_pinger; }; inline future get_all_endpoint_states(gossiper& g) { @@ -683,3 +728,11 @@ struct gossip_get_endpoint_states_response { }; } // namespace gms + +// XXX: find a better place to put this? +struct direct_fd_clock : public direct_failure_detector::clock { + using base = std::chrono::steady_clock; + + direct_failure_detector::clock::timepoint_t now() noexcept override; + future<> sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) override; +}; diff --git a/main.cc b/main.cc index 4ae59e2084..dce14a46d1 100644 --- a/main.cc +++ b/main.cc @@ -879,6 +879,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // #293 - do not stop anything (unless snitch.on_all(start) fails) stop_snitch->cancel(); + static direct_fd_clock fd_clock; + static sharded fd; + supervisor::notify("starting direct failure detector service"); + fd.start( + sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)), + std::ref(fd_clock), + direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get(); + + auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] { + fd.stop().get(); + }); + raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT), std::ref(messaging), std::ref(gossiper), std::ref(feature_service)).get(); // XXX: stop_raft has to happen before query_processor diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 9ed5849d3a..8defbaa969 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -592,6 +592,17 @@ public: sharded raft_gr; sharded stream_manager; sharded forward_service; + sharded fd; + + direct_fd_clock fd_clock; + fd.start( + sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)), + std::ref(fd_clock), + direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get(); + + auto stop_fd = defer([&fd] { + fd.stop().get(); + }); raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT), std::ref(ms), std::ref(gossiper), std::ref(feature_service)).get();