diff --git a/configure.py b/configure.py index 0736d93b8c..afa12475db 100755 --- a/configure.py +++ b/configure.py @@ -1323,7 +1323,7 @@ scylla_tests_dependencies = scylla_core + alternator + idls + scylla_tests_gener 'test/lib/key_utils.cc', ] -scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc'] +scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc', 'utils/exceptions.cc'] scylla_tools = ['tools/scylla-types.cc', 'tools/scylla-sstable.cc', 'tools/scylla-nodetool.cc', 'tools/schema_loader.cc', 'tools/utils.cc', 'tools/lua_sstable_consumer.cc'] scylla_perfs = ['test/perf/perf_fast_forward.cc', diff --git a/idl/raft.idl.hh b/idl/raft.idl.hh index 658a190812..81845257c5 100644 --- a/idl/raft.idl.hh +++ b/idl/raft.idl.hh @@ -121,8 +121,12 @@ struct wrong_destination { raft::server_id reached_id; }; +struct group_liveness_info { + bool group0_alive; +}; + struct direct_fd_ping_reply { - std::variant result; + std::variant result; }; verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply; diff --git a/raft/raft.hh b/raft/raft.hh index 39338e10bb..0103203296 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -300,6 +300,15 @@ struct transport_error: public error { using error::error; }; +// Can be thrown by two-way RPCs when the destination is not seen as alive by the failure detector. +// If thrown, the request was not sent at all. +struct destination_not_alive_error: public transport_error { + server_id destination; + + destination_not_alive_error(server_id destination, seastar::compat::source_location l = seastar::compat::source_location::current()) + : transport_error(fmt::format("Failed to send {} to {}: node is not seen as alive by the failure detector", l.function_name(), destination)) {} +}; + struct command_is_too_big_error: public error { size_t command_size; size_t limit; diff --git a/raft/server.cc b/raft/server.cc index 47f9a11f41..fc9f207caa 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -28,6 +28,8 @@ #include "log.hh" #include "raft.hh" +#include "utils/exceptions.hh" + using namespace std::chrono_literals; namespace raft { @@ -84,6 +86,7 @@ public: raft::configuration get_configuration() const override; future<> start() override; future<> abort(sstring reason) override; + bool is_alive() const override; term_t get_current_term() const override; future<> read_barrier(seastar::abort_source* as = nullptr) override; void wait_until_candidate() override; @@ -123,6 +126,9 @@ private: // Set to abort reason when abort() is called std::optional _aborted; + // Becomes true during start(), becomes false on abort() or a background error + bool _is_alive = false; + // Signaled when apply index is changed condition_variable _applied_index_changed; @@ -367,6 +373,8 @@ future<> server_impl::start() { _rpc->on_configuration_change(get_rpc_config(), {}); } + _is_alive = true; + // start fiber to persist entries added to in-memory log _io_status = io_fiber(stable_idx); // start fiber to apply committed entries @@ -1143,7 +1151,11 @@ void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) { _snapshot_transfers.erase(dst); auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false}; if (f.failed()) { - logger.error("[{}] Transferring snapshot to {} failed with: {}", _id, dst, f.get_exception()); + auto eptr = f.get_exception(); + const log_level lvl = try_catch(eptr) != nullptr + ? log_level::debug + : log_level::error; + logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr); } else { logger.trace("[{}] Transferred snapshot to {}", _id, dst); reply = f.get(); @@ -1412,6 +1424,7 @@ void server_impl::check_not_aborted() { } void server_impl::handle_background_error(const char* fiber_name) { + _is_alive = false; const auto e = std::current_exception(); logger.error("[{}] {} fiber stopped because of the error: {}", _id, fiber_name, e); if (_config.on_background_error) { @@ -1420,6 +1433,7 @@ void server_impl::handle_background_error(const char* fiber_name) { } future<> server_impl::abort(sstring reason) { + _is_alive = false; _aborted = std::move(reason); logger.trace("[{}]: abort() called", _id); _fsm->stop(); @@ -1497,6 +1511,10 @@ future<> server_impl::abort(sstring reason) { co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result(); } +bool server_impl::is_alive() const { + return _is_alive; +} + future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_source* as) { check_not_aborted(); const auto& cfg = _fsm->get_configuration(); diff --git a/raft/server.hh b/raft/server.hh index 4bd3872de5..c040e6d922 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -190,6 +190,11 @@ public: // replication. virtual future<> abort(sstring reason = "") = 0; + // Returns whether the server is running. + // A server becomes alive after start() and becomes dead after abort() + // is called or an error happens in one of the internal fibers. + virtual bool is_alive() const = 0; + // Return Raft protocol current term. virtual term_t get_current_term() const = 0; diff --git a/service/raft/group0_fwd.hh b/service/raft/group0_fwd.hh index 815b86abaf..bc42a5c42e 100644 --- a/service/raft/group0_fwd.hh +++ b/service/raft/group0_fwd.hh @@ -75,8 +75,12 @@ struct wrong_destination { raft::server_id reached_id; }; +struct group_liveness_info { + bool group0_alive; +}; + struct direct_fd_ping_reply { - std::variant result; + std::variant result; }; } // namespace service diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index e3f78eea3b..be56cdb904 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -101,8 +101,8 @@ class group0_rpc: public service::raft_rpc { public: explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd, raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id) - : raft_rpc(sm, ms, address_map, gid, srv_id) + raft_address_map& address_map, shared_ptr raft_fd, raft::group_id gid, raft::server_id srv_id) + : raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id) , _direct_fd(direct_fd) {} @@ -199,7 +199,7 @@ const raft::server_id& raft_group0::load_my_id() { raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) { auto state_machine = std::make_unique(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), topology_change_enabled); - auto rpc = std::make_unique(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id); + auto rpc = std::make_unique(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id); // Keep a reference to a specific RPC class. auto& rpc_ref = *rpc; auto storage = std::make_unique(qp, gid, my_id); diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index 4b3c9bcc51..3b7f4378f8 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -297,15 +297,27 @@ void raft_group_registry::init_rpc_verbs() { // XXX: update address map here as well? if (_my_id != dst) { - co_return direct_fd_ping_reply { + return make_ready_future(direct_fd_ping_reply { .result = wrong_destination { .reached_id = _my_id, }, - }; + }); } - co_return direct_fd_ping_reply { - .result = std::monostate{}, - }; + + return container().invoke_on(0, [] (raft_group_registry& me) -> future { + bool group0_alive = false; + if (me._group0_id) { + auto* group0_server = me.find_server(*me._group0_id); + if (group0_server && group0_server->is_alive()) { + group0_alive = true; + } + } + co_return direct_fd_ping_reply { + .result = service::group_liveness_info{ + .group0_alive = group0_alive, + } + }; + }); }); } @@ -513,6 +525,8 @@ future direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id rslog.trace("ping(id = {}, ip_addr = {}): wrong destination (reached {})", dst_id, *addr, wrong_dst->reached_id); co_return false; + } else if (auto* info = std::get_if(&reply.result)) { + co_return info->group0_alive; } } catch (seastar::rpc::closed_error&) { co_return false; diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 1c1da9d7de..68af2dedd5 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -27,16 +27,21 @@ raft_ticker_type::time_point timeout() { } raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id my_id) + raft_address_map& address_map, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id) : _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms) - , _address_map(address_map) + , _address_map(address_map), _failure_detector(std::move(failure_detector)) {} -template void +template void raft_rpc::one_way_rpc(sloc loc, raft::server_id id, Verb&& verb, Msg&& msg) { (void)with_gate(_shutdown_gate, [this, loc = std::move(loc), id, &verb, &msg] () mutable { + if (rpc_kind == one_way_kind::request && !_failure_detector->is_alive(id)) { + rlogger.debug("{}:{}: {} dropping outgoing message to {} - node is not seen as alive by the failure detector", + loc.file_name(), loc.line(), loc.function_name(), id); + return make_ready_future<>(); + } auto ip_addr = _address_map.find(id); if (!ip_addr) { rlogger.debug("{}:{}: {} dropping outgoing message to {} - IP address not found", @@ -60,9 +65,12 @@ template auto raft_rpc::two_way_rpc(sloc loc, raft::server_id id, Verb&& verb, Args&&... args) { - auto ip_addr = _address_map.find(id); - using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); + using Fut = decltype(verb(&_messaging, netw::msg_addr(gms::inet_address()), db::no_timeout, _group_id, _my_id, id, std::forward(args)...)); using Ret = typename Fut::value_type; + if (!_failure_detector->is_alive(id)) { + return make_exception_future(raft::destination_not_alive_error(id, loc)); + } + auto ip_addr = _address_map.find(id); if (!ip_addr) { const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id); return make_exception_future(raft::transport_error(msg)); @@ -80,6 +88,10 @@ future raft_rpc::send_snapshot(raft::server_id id, const r } future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) { + if (!_failure_detector->is_alive(id)) { + rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id); + co_return; + } auto ip_addr = _address_map.find(id); if (!ip_addr) { const auto msg = format("Failed to send append_entires to {}: ip address not found", id); @@ -90,27 +102,27 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re } void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply); } void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request); } void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply); } void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now); } void raft_rpc::send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum); } void raft_rpc::send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& read_quorum_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply); + one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply); } future raft_rpc::send_add_entry(raft::server_id id, const raft::command& cmd) { diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index f84d50393e..73f0c23c40 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -28,13 +28,16 @@ protected: raft::server_id _my_id; // Raft server id of this node. netw::messaging_service& _messaging; raft_address_map& _address_map; + shared_ptr _failure_detector; seastar::gate _shutdown_gate; explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, - raft_address_map& address_map, raft::group_id gid, raft::server_id my_id); + raft_address_map& address_map, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id); private: - template void + enum class one_way_kind { request, reply }; + + template void one_way_rpc(std::source_location loc, raft::server_id id, Verb&& verb, Msg&& msg); template auto