main: start direct failure detector service

We add the new direct failure detector to the list of services started
in the Scylla process.

To start the service, we need an implementation of `pinger` and `clock`.

`pinger` is implemented using existing GOSSIP_ECHO verb. The gossip echo
message requires the node's gossip generation number. We handle this by
embedding the pinger implementation inside `gossiper`, and making
`gossiper` update the generation number (cached inside the pinger class)
periodically.

`clock` is a simple implementation which uses `std::chrono::steady_clock`
and `seastar::sleep_until` underneath. Translating `steady_clock`
durations to `direct_failure_detector::clock` durations happens by taking
the number of ticks.

The service is currently not used, just initialized; no endpoints are
added and no listeners are registered yet, but the following commits
change that.
This commit is contained in:
Kamil Braun
2022-03-14 17:17:20 +01:00
parent 9551256e81
commit 38f65e5a2e
4 changed files with 145 additions and 1 deletions

View File

@@ -97,7 +97,8 @@ gossiper::gossiper(abort_source& as, feature_service& features, const locator::s
, _sys_ks(sys_ks)
, _failure_detector_timeout_ms(cfg.failure_detector_timeout_in_ms)
, _force_gossip_generation(cfg.force_gossip_generation)
, _gcfg(std::move(gcfg)) {
, _gcfg(std::move(gcfg))
, _direct_fd_pinger(*this) {
// Gossiper's stuff below runs only on CPU0
if (this_shard_id() != 0) {
return;
@@ -940,6 +941,8 @@ void gossiper::run() {
}
}).get();
}
_direct_fd_pinger.update_generation_number(endpoint_state_map[get_broadcast_address()].get_heart_beat_state().get_generation()).get();
}).then_wrapped([this] (auto&& f) {
try {
f.get();
@@ -1827,6 +1830,7 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
g._enabled = true;
g._failure_detector_loop_done = g.failure_detector_loop();
});
co_await _direct_fd_pinger.update_generation_number(generation_nbr);
}
future<std::unordered_map<gms::inet_address, int32_t>>
@@ -2597,4 +2601,68 @@ inet_address_vector_replica_set gossiper::endpoint_filter(const sstring& local_r
return result;
}
future<> gossiper::direct_fd_pinger::update_generation_number(int64_t n) {
if (n <= _generation_number) {
return make_ready_future<>();
}
return _gossiper.container().invoke_on_all([n] (gossiper& g) {
g._direct_fd_pinger._generation_number = n;
});
}
direct_failure_detector::pinger::endpoint_id gossiper::direct_fd_pinger::allocate_id(gms::inet_address addr) {
assert(this_shard_id() == 0);
auto it = _addr_to_id.find(addr);
if (it == _addr_to_id.end()) {
auto id = _next_allocated_id++;
_id_to_addr.emplace(id, addr);
it = _addr_to_id.emplace(addr, id).first;
logger.debug("gossiper::direct_fd_pinger: assigned endpoint ID {} to address {}", id, addr);
}
return it->second;
}
future<gms::inet_address> gossiper::direct_fd_pinger::get_address(direct_failure_detector::pinger::endpoint_id id) {
auto it = _id_to_addr.find(id);
if (it == _id_to_addr.end()) {
// Fetch the address from shard 0. By precondition it must be there.
auto addr = co_await _gossiper.container().invoke_on(0, [id] (gossiper& g) {
auto it = g._direct_fd_pinger._id_to_addr.find(id);
if (it == g._direct_fd_pinger._id_to_addr.end()) {
on_internal_error(logger, format("gossiper::direct_fd_pinger: endpoint id {} has no corresponding address", id));
}
return it->second;
});
it = _id_to_addr.emplace(id, addr).first;
}
co_return it->second;
}
future<bool> gossiper::direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) {
try {
co_await _gossiper._messaging.send_gossip_echo(netw::msg_addr(co_await get_address(id)), _generation_number, as);
} catch (seastar::rpc::closed_error&) {
co_return false;
}
co_return true;
}
} // namespace gms
direct_failure_detector::clock::timepoint_t direct_fd_clock::now() noexcept {
return base::now().time_since_epoch().count();
}
future<> direct_fd_clock::sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) {
auto t = base::time_point{base::duration{tp}};
auto n = base::now();
if (t <= n) {
return make_ready_future<>();
}
return sleep_abortable(t - n, as);
}

View File

@@ -30,6 +30,7 @@
#include "utils/updateable_value.hh"
#include "utils/in.hh"
#include "message/messaging_service_fwd.hh"
#include "direct_failure_detector/failure_detector.hh"
#include <optional>
#include <algorithm>
#include <chrono>
@@ -617,6 +618,50 @@ private:
future<> failure_detector_loop();
future<> failure_detector_loop_for_node(gms::inet_address node, int64_t gossip_generation, uint64_t live_endpoints_version);
future<> update_live_endpoints_version();
public:
// Implementation of `direct_failure_detector::pinger` which uses gossip echo messages for pinging.
// The gossip echo message must be provided this node's gossip generation number.
// It's an integer incremented when the node restarts or when the gossip subsystem restarts.
// We cache the generation number inside `direct_fd_pinger` on every shard and update it in the `gossiper` main loop.
//
// We also store a mapping between `direct_failure_detector::pinger::endpoint_id`s and `inet_address`es.
class direct_fd_pinger : public direct_failure_detector::pinger {
friend class gossiper;
gossiper& _gossiper;
// Only used on shard 0 by `allocate_id`.
direct_failure_detector::pinger::endpoint_id _next_allocated_id{0};
// The mappings are created on shard 0 and lazily replicated to other shards:
// when `ping` or `get_address` is called with an unknown ID on a different shard, it will fetch the ID from shard 0.
std::unordered_map<direct_failure_detector::pinger::endpoint_id, inet_address> _id_to_addr;
// Used to quickly check if given address already has an assigned ID.
// Used only on shard 0, not replicated.
std::unordered_map<inet_address, direct_failure_detector::pinger::endpoint_id> _addr_to_id;
// This node's gossip generation number, updated by gossiper's loop and replicated to every shard.
int64_t _generation_number;
future<> update_generation_number(int64_t n);
direct_fd_pinger(gossiper& g) : _gossiper(g) {}
public:
// Allocate a new endpoint_id for `addr`, or if one already exists, return it.
// Call only on shard 0.
direct_failure_detector::pinger::endpoint_id allocate_id(gms::inet_address addr);
// Precondition: `id` was returned from `allocate_id` on shard 0 earlier.
future<gms::inet_address> get_address(direct_failure_detector::pinger::endpoint_id id);
future<bool> ping(direct_failure_detector::pinger::endpoint_id id, abort_source& as) override;
};
direct_fd_pinger& get_direct_fd_pinger() { return _direct_fd_pinger; }
private:
direct_fd_pinger _direct_fd_pinger;
};
inline future<sstring> get_all_endpoint_states(gossiper& g) {
@@ -683,3 +728,11 @@ struct gossip_get_endpoint_states_response {
};
} // namespace gms
// XXX: find a better place to put this?
struct direct_fd_clock : public direct_failure_detector::clock {
using base = std::chrono::steady_clock;
direct_failure_detector::clock::timepoint_t now() noexcept override;
future<> sleep_until(direct_failure_detector::clock::timepoint_t tp, abort_source& as) override;
};

12
main.cc
View File

@@ -879,6 +879,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// #293 - do not stop anything (unless snitch.on_all(start) fails)
stop_snitch->cancel();
static direct_fd_clock fd_clock;
static sharded<direct_failure_detector::failure_detector> fd;
supervisor::notify("starting direct failure detector service");
fd.start(
sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)),
std::ref(fd_clock),
direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get();
auto stop_fd = defer_verbose_shutdown("direct_failure_detector", [] {
fd.stop().get();
});
raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT),
std::ref(messaging), std::ref(gossiper), std::ref(feature_service)).get();
// XXX: stop_raft has to happen before query_processor

View File

@@ -592,6 +592,17 @@ public:
sharded<service::raft_group_registry> raft_gr;
sharded<streaming::stream_manager> stream_manager;
sharded<service::forward_service> forward_service;
sharded<direct_failure_detector::failure_detector> fd;
direct_fd_clock fd_clock;
fd.start(
sharded_parameter([] (gms::gossiper& g) { return std::ref(g.get_direct_fd_pinger()); }, std::ref(gossiper)),
std::ref(fd_clock),
direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count()).get();
auto stop_fd = defer([&fd] {
fd.stop().get();
});
raft_gr.start(cfg->check_experimental(db::experimental_features_t::RAFT),
std::ref(ms), std::ref(gossiper), std::ref(feature_service)).get();