From a1ebfcf0060bd6af416a31b855a399179fb92054 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 14 Nov 2023 10:28:56 +0100 Subject: [PATCH 1/5] raft: add server::is_alive Add a method which reports whether given raft server is running. In following commits, the information about whether the local raft group 0 is running or not will be included in the response to the failure detector ping, and the is_alive method will be used there. --- raft/server.cc | 12 ++++++++++++ raft/server.hh | 5 +++++ 2 files changed, 17 insertions(+) diff --git a/raft/server.cc b/raft/server.cc index 47f9a11f41..ffc2336570 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -84,6 +84,7 @@ public: raft::configuration get_configuration() const override; future<> start() override; future<> abort(sstring reason) override; + bool is_alive() const override; term_t get_current_term() const override; future<> read_barrier(seastar::abort_source* as = nullptr) override; void wait_until_candidate() override; @@ -123,6 +124,9 @@ private: // Set to abort reason when abort() is called std::optional _aborted; + // Becomes true during start(), becomes false on abort() or a background error + bool _is_alive = false; + // Signaled when apply index is changed condition_variable _applied_index_changed; @@ -367,6 +371,8 @@ future<> server_impl::start() { _rpc->on_configuration_change(get_rpc_config(), {}); } + _is_alive = true; + // start fiber to persist entries added to in-memory log _io_status = io_fiber(stable_idx); // start fiber to apply committed entries @@ -1412,6 +1418,7 @@ void server_impl::check_not_aborted() { } void server_impl::handle_background_error(const char* fiber_name) { + _is_alive = false; const auto e = std::current_exception(); logger.error("[{}] {} fiber stopped because of the error: {}", _id, fiber_name, e); if (_config.on_background_error) { @@ -1420,6 +1427,7 @@ void server_impl::handle_background_error(const char* fiber_name) { } future<> server_impl::abort(sstring reason) { + _is_alive = false; _aborted = std::move(reason); logger.trace("[{}]: abort() called", _id); _fsm->stop(); @@ -1497,6 +1505,10 @@ future<> server_impl::abort(sstring reason) { co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result(); } +bool server_impl::is_alive() const { + return _is_alive; +} + future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_source* as) { check_not_aborted(); const auto& cfg = _fsm->get_configuration(); diff --git a/raft/server.hh b/raft/server.hh index 4bd3872de5..c040e6d922 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -190,6 +190,11 @@ public: // replication. virtual future<> abort(sstring reason = "") = 0; + // Returns whether the server is running. + // A server becomes alive after start() and becomes dead after abort() + // is called or an error happens in one of the internal fibers. + virtual bool is_alive() const = 0; + // Return Raft protocol current term. virtual term_t get_current_term() const = 0; From a8ee4d543a693fe358862559902d6e65aa08fc4f Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 30 Oct 2023 16:05:11 +0100 Subject: [PATCH 2/5] raft: transfer information about group0 liveness in direct_fd_ping Add a new variant of the reply to the direct_fd_ping which specifies whether the local group0 is alive or not, and start actively using it. There is no need to introduce a cluster feature. Due to how our serialization framework works, nodes which do not recognize the new variant will treat it as the existing std::monostate. The std::monostate means "the node and group0 is alive"; nodes before the changes in this commit would send a std::monostate anyway, so this is completely transparent for the old nodes. --- idl/raft.idl.hh | 6 +++++- service/raft/group0_fwd.hh | 6 +++++- service/raft/raft_group_registry.cc | 24 +++++++++++++++++++----- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/idl/raft.idl.hh b/idl/raft.idl.hh index 658a190812..81845257c5 100644 --- a/idl/raft.idl.hh +++ b/idl/raft.idl.hh @@ -121,8 +121,12 @@ struct wrong_destination { raft::server_id reached_id; }; +struct group_liveness_info { + bool group0_alive; +}; + struct direct_fd_ping_reply { - std::variant result; + std::variant result; }; verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply; diff --git a/service/raft/group0_fwd.hh b/service/raft/group0_fwd.hh index 815b86abaf..bc42a5c42e 100644 --- a/service/raft/group0_fwd.hh +++ b/service/raft/group0_fwd.hh @@ -75,8 +75,12 @@ struct wrong_destination { raft::server_id reached_id; }; +struct group_liveness_info { + bool group0_alive; +}; + struct direct_fd_ping_reply { - std::variant result; + std::variant result; }; } // namespace service diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 4b3c9bcc51..3b7f4378f8 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -297,15 +297,27 @@ void raft_group_registry::init_rpc_verbs() { // XXX: update address map here as well? if (_my_id != dst) { - co_return direct_fd_ping_reply { + return make_ready_future(direct_fd_ping_reply { .result = wrong_destination { .reached_id = _my_id, }, - }; + }); } - co_return direct_fd_ping_reply { - .result = std::monostate{}, - }; + + return container().invoke_on(0, [] (raft_group_registry& me) -> future { + bool group0_alive = false; + if (me._group0_id) { + auto* group0_server = me.find_server(*me._group0_id); + if (group0_server && group0_server->is_alive()) { + group0_alive = true; + } + } + co_return direct_fd_ping_reply { + .result = service::group_liveness_info{ + .group0_alive = group0_alive, + } + }; + }); }); } @@ -513,6 +525,8 @@ future direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id rslog.trace("ping(id = {}, ip_addr = {}): wrong destination (reached {})", dst_id, *addr, wrong_dst->reached_id); co_return false; + } else if (auto* info = std::get_if(&reply.result)) { + co_return info->group0_alive; } } catch (seastar::rpc::closed_error&) { co_return false; From 3e32ee2d36bab3732e440817368dc039b861a3ea Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 14 Nov 2023 10:51:11 +0100 Subject: [PATCH 3/5] raft: pass raft::failure_detector to raft_rpc In following commits, raft_rpc will drop outgoing messages if the destination is not seen as alive by the failure detector. --- service/raft/raft_group0.cc | 6 +++--- service/raft/raft_rpc.cc | 4 ++-- service/raft/raft_rpc.hh | 3 ++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index e3f78eea3b..be56cdb904 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -101,8 +101,8 @@ class group0_rpc: public service::raft_rpc { public: explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd, raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id) - : raft_rpc(sm, ms, address_map, gid, srv_id) + raft_address_map& address_map, shared_ptr raft_fd, raft::group_id gid, raft::server_id srv_id) + : raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id) , _direct_fd(direct_fd) {} @@ -199,7 +199,7 @@ const raft::server_id& raft_group0::load_my_id() { raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) { auto state_machine = std::make_unique(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), topology_change_enabled); - auto rpc = std::make_unique(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id); + auto rpc = std::make_unique(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id); // Keep a reference to a specific RPC class. auto& rpc_ref = *rpc; auto storage = std::make_unique(qp, gid, my_id); diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 1c1da9d7de..3c84213e91 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -27,9 +27,9 @@ raft_ticker_type::time_point timeout() { } raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id my_id) + raft_address_map& address_map, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id) : _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms) - , _address_map(address_map) + , _address_map(address_map), _failure_detector(std::move(failure_detector)) {} diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index f84d50393e..3ae85aab82 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -28,10 +28,11 @@ protected: raft::server_id _my_id; // Raft server id of this node. netw::messaging_service& _messaging; raft_address_map& _address_map; + shared_ptr _failure_detector; seastar::gate _shutdown_gate; explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id my_id); + raft_address_map& address_map, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id); private: template void From ab42932ba487ea8f2315fd9a335df459836632dd Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 14 Nov 2023 10:51:47 +0100 Subject: [PATCH 4/5] raft: rpc: drop RPCs if the destination is not alive If the failure detector sees the destination as dead, there is no use to send the RPC so drop it silently. This only affects two-way RPCs and "request" one-way RPCs. The one-way RPCs used as responses to other one-way RPCs are not affected. --- service/raft/raft_rpc.cc | 31 ++++++++++++++++++++++--------- service/raft/raft_rpc.hh | 4 +++- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 3c84213e91..7daab55c1b 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -33,10 +33,15 @@ raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, {} -template void +template void raft_rpc::one_way_rpc(sloc loc, raft::server_id id, Verb&& verb, Msg&& msg) { (void)with_gate(_shutdown_gate, [this, loc = std::move(loc), id, &verb, &msg] () mutable { + if (rpc_kind == one_way_kind::request && !_failure_detector->is_alive(id)) { + rlogger.debug("{}:{}: {} dropping outgoing message to {} - node is not seen as alive by the failure detector", + loc.file_name(), loc.line(), loc.function_name(), id); + return make_ready_future<>(); + } auto ip_addr = _address_map.find(id); if (!ip_addr) { rlogger.debug("{}:{}: {} dropping outgoing message to {} - IP address not found", @@ -60,9 +65,13 @@ template auto raft_rpc::two_way_rpc(sloc loc, raft::server_id id, Verb&& verb, Args&&... args) { - auto ip_addr = _address_map.find(id); - using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); + using Fut = decltype(verb(&_messaging, netw::msg_addr(gms::inet_address()), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); using Ret = typename Fut::value_type; + if (!_failure_detector->is_alive(id)) { + const auto msg = format("Failed to send {} to {}: node is not seen as alive by the failure detector", loc.function_name(), id); + return make_exception_future(raft::transport_error(msg)); + } + auto ip_addr = _address_map.find(id); if (!ip_addr) { const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id); return make_exception_future(raft::transport_error(msg)); @@ -80,6 +89,10 @@ future raft_rpc::send_snapshot(raft::server_id id, const r } future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) { + if (!_failure_detector->is_alive(id)) { + rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id); + co_return; + } auto ip_addr = _address_map.find(id); if (!ip_addr) { const auto msg = format("Failed to send append_entires to {}: ip address not found", id); @@ -90,27 +103,27 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re } void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply); } void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request); } void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply); } void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now); } void raft_rpc::send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum); } void raft_rpc::send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& read_quorum_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply); } future raft_rpc::send_add_entry(raft::server_id id, const raft::command& cmd) { diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index 3ae85aab82..73f0c23c40 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -35,7 +35,9 @@ protected: raft_address_map& address_map, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id); private: - template void + enum class one_way_kind { request, reply }; + + template void one_way_rpc(std::source_location loc, raft::server_id id, Verb&& verb, Msg&& msg); template auto From c58ff554d89f4b1829308692857d6fa0ab4b98e0 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Thu, 23 Nov 2023 00:14:59 +0100 Subject: [PATCH 5/5] raft: rpc: introduce destination_not_alive_error Add a new destination_not_alive_error, thrown from two-way RPCs in case when the RPC is not issued because the destination is not reported as alive by the failure detector. In snapshot transfer code, lower the verbosity of the message printed in case it fails on the new error. This is done to prevent flakiness in the CI - in case of slow runs, nodes might get spuriously marked as dead if they are busy, and a message with the "error" verbosity can cause some tests to fail. --- configure.py | 2 +- raft/raft.hh | 9 +++++++++ raft/server.cc | 8 +++++++- service/raft/raft_rpc.cc | 3 +-- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/configure.py b/configure.py index ffa8fd5ad5..a06c3bdfee 100755 --- a/configure.py +++ b/configure.py @@ -1323,7 +1323,7 @@ scylla_tests_dependencies = scylla_core + alternator + idls + scylla_tests_gener 'test/lib/key_utils.cc', ] -scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc'] +scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc', 'utils/exceptions.cc'] scylla_tools = ['tools/scylla-types.cc', 'tools/scylla-sstable.cc', 'tools/scylla-nodetool.cc', 'tools/schema_loader.cc', 'tools/utils.cc', 'tools/lua_sstable_consumer.cc'] scylla_perfs = ['test/perf/perf_fast_forward.cc', diff --git a/raft/raft.hh b/raft/raft.hh index 39338e10bb..0103203296 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -300,6 +300,15 @@ struct transport_error: public error { using error::error; }; +// Can be thrown by two-way RPCs when the destination is not seen as alive by the failure detector. +// If thrown, the request was not sent at all. +struct destination_not_alive_error: public transport_error { + server_id destination; + + destination_not_alive_error(server_id destination, seastar::compat::source_location l = seastar::compat::source_location::current()) + : transport_error(fmt::format("Failed to send {} to {}: node is not seen as alive by the failure detector", l.function_name(), destination)) {} +}; + struct command_is_too_big_error: public error { size_t command_size; size_t limit; diff --git a/raft/server.cc b/raft/server.cc index ffc2336570..fc9f207caa 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -28,6 +28,8 @@ #include "log.hh" #include "raft.hh" +#include "utils/exceptions.hh" + using namespace std::chrono_literals; namespace raft { @@ -1149,7 +1151,11 @@ void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) { _snapshot_transfers.erase(dst); auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false}; if (f.failed()) { - logger.error("[{}] Transferring snapshot to {} failed with: {}", _id, dst, f.get_exception()); + auto eptr = f.get_exception(); + const log_level lvl = try_catch(eptr) != nullptr + ? log_level::debug + : log_level::error; + logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr); } else { logger.trace("[{}] Transferred snapshot to {}", _id, dst); reply = f.get(); diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 7daab55c1b..68af2dedd5 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -68,8 +68,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id, using Fut = decltype(verb(&_messaging, netw::msg_addr(gms::inet_address()), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); using Ret = typename Fut::value_type; if (!_failure_detector->is_alive(id)) { - const auto msg = format("Failed to send {} to {}: node is not seen as alive by the failure detector", loc.function_name(), id); - return make_exception_future(raft::transport_error(msg)); + return make_exception_future(raft::destination_not_alive_error(id, loc)); } auto ip_addr = _address_map.find(id); if (!ip_addr) {