mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
Merge 'raft: send group0 RPCs only if the destination group0 server is seen as alive' from Piotr Dulikowski
In topology on raft mode, the events "new node starts its group0 server"
and "new node is added to group0 configuration" are not synchronized
with each other. Therefore it might happen that the cluster starts
sending commands to the new node before the node starts its server. This
might lead to harmless, but ugly messages like:
INFO 2023-09-27 15:42:42,611 [shard 0:stat] rpc - client
127.0.0.1:56352 msg_id 2: exception "Raft group
b8542540-5d3b-11ee-99b8-1052801f2975 not found" in no_wait handler
ignored
In order to solve this, the failure detector verb is extended to report
information about whether group0 is alive. The raft rpc layer will drop
messages to nodes whose group0 is not seen as alive.
Tested by adding a delay before group0 is started on the joining node,
running all topology tests and grepping for the aforementioned log
messages.
Fixes: scylladb/scylladb#15853
Fixes: scylladb/scylladb#15167
Closes scylladb/scylladb#16071
* github.com:scylladb/scylladb:
raft: rpc: introduce destination_not_alive_error
raft: rpc: drop RPCs if the destination is not alive
raft: pass raft::failure_detector to raft_rpc
raft: transfer information about group0 liveness in direct_fd_ping
raft: add server::is_alive
This commit is contained in:
@@ -1323,7 +1323,7 @@ scylla_tests_dependencies = scylla_core + alternator + idls + scylla_tests_gener
|
||||
'test/lib/key_utils.cc',
|
||||
]
|
||||
|
||||
scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc']
|
||||
scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc', 'utils/exceptions.cc']
|
||||
|
||||
scylla_tools = ['tools/scylla-types.cc', 'tools/scylla-sstable.cc', 'tools/scylla-nodetool.cc', 'tools/schema_loader.cc', 'tools/utils.cc', 'tools/lua_sstable_consumer.cc']
|
||||
scylla_perfs = ['test/perf/perf_fast_forward.cc',
|
||||
|
||||
@@ -121,8 +121,12 @@ struct wrong_destination {
|
||||
raft::server_id reached_id;
|
||||
};
|
||||
|
||||
struct group_liveness_info {
|
||||
bool group0_alive;
|
||||
};
|
||||
|
||||
struct direct_fd_ping_reply {
|
||||
std::variant<std::monostate, service::wrong_destination> result;
|
||||
std::variant<std::monostate, service::wrong_destination, service::group_liveness_info> result;
|
||||
};
|
||||
|
||||
verb [[with_client_info, cancellable]] direct_fd_ping (raft::server_id dst_id) -> service::direct_fd_ping_reply;
|
||||
|
||||
@@ -300,6 +300,15 @@ struct transport_error: public error {
|
||||
using error::error;
|
||||
};
|
||||
|
||||
// Can be thrown by two-way RPCs when the destination is not seen as alive by the failure detector.
|
||||
// If thrown, the request was not sent at all.
|
||||
struct destination_not_alive_error: public transport_error {
|
||||
server_id destination;
|
||||
|
||||
destination_not_alive_error(server_id destination, seastar::compat::source_location l = seastar::compat::source_location::current())
|
||||
: transport_error(fmt::format("Failed to send {} to {}: node is not seen as alive by the failure detector", l.function_name(), destination)) {}
|
||||
};
|
||||
|
||||
struct command_is_too_big_error: public error {
|
||||
size_t command_size;
|
||||
size_t limit;
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "log.hh"
|
||||
#include "raft.hh"
|
||||
|
||||
#include "utils/exceptions.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace raft {
|
||||
@@ -84,6 +86,7 @@ public:
|
||||
raft::configuration get_configuration() const override;
|
||||
future<> start() override;
|
||||
future<> abort(sstring reason) override;
|
||||
bool is_alive() const override;
|
||||
term_t get_current_term() const override;
|
||||
future<> read_barrier(seastar::abort_source* as = nullptr) override;
|
||||
void wait_until_candidate() override;
|
||||
@@ -123,6 +126,9 @@ private:
|
||||
// Set to abort reason when abort() is called
|
||||
std::optional<sstring> _aborted;
|
||||
|
||||
// Becomes true during start(), becomes false on abort() or a background error
|
||||
bool _is_alive = false;
|
||||
|
||||
// Signaled when apply index is changed
|
||||
condition_variable _applied_index_changed;
|
||||
|
||||
@@ -367,6 +373,8 @@ future<> server_impl::start() {
|
||||
_rpc->on_configuration_change(get_rpc_config(), {});
|
||||
}
|
||||
|
||||
_is_alive = true;
|
||||
|
||||
// start fiber to persist entries added to in-memory log
|
||||
_io_status = io_fiber(stable_idx);
|
||||
// start fiber to apply committed entries
|
||||
@@ -1143,7 +1151,11 @@ void server_impl::send_snapshot(server_id dst, install_snapshot&& snp) {
|
||||
_snapshot_transfers.erase(dst);
|
||||
auto reply = raft::snapshot_reply{.current_term = _fsm->get_current_term(), .success = false};
|
||||
if (f.failed()) {
|
||||
logger.error("[{}] Transferring snapshot to {} failed with: {}", _id, dst, f.get_exception());
|
||||
auto eptr = f.get_exception();
|
||||
const log_level lvl = try_catch<raft::destination_not_alive_error>(eptr) != nullptr
|
||||
? log_level::debug
|
||||
: log_level::error;
|
||||
logger.log(lvl, "[{}] Transferring snapshot to {} failed with: {}", _id, dst, eptr);
|
||||
} else {
|
||||
logger.trace("[{}] Transferred snapshot to {}", _id, dst);
|
||||
reply = f.get();
|
||||
@@ -1412,6 +1424,7 @@ void server_impl::check_not_aborted() {
|
||||
}
|
||||
|
||||
void server_impl::handle_background_error(const char* fiber_name) {
|
||||
_is_alive = false;
|
||||
const auto e = std::current_exception();
|
||||
logger.error("[{}] {} fiber stopped because of the error: {}", _id, fiber_name, e);
|
||||
if (_config.on_background_error) {
|
||||
@@ -1420,6 +1433,7 @@ void server_impl::handle_background_error(const char* fiber_name) {
|
||||
}
|
||||
|
||||
future<> server_impl::abort(sstring reason) {
|
||||
_is_alive = false;
|
||||
_aborted = std::move(reason);
|
||||
logger.trace("[{}]: abort() called", _id);
|
||||
_fsm->stop();
|
||||
@@ -1497,6 +1511,10 @@ future<> server_impl::abort(sstring reason) {
|
||||
co_await seastar::when_all_succeed(all_with_gate.begin(), all_with_gate.end()).discard_result();
|
||||
}
|
||||
|
||||
bool server_impl::is_alive() const {
|
||||
return _is_alive;
|
||||
}
|
||||
|
||||
future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
const auto& cfg = _fsm->get_configuration();
|
||||
|
||||
@@ -190,6 +190,11 @@ public:
|
||||
// replication.
|
||||
virtual future<> abort(sstring reason = "") = 0;
|
||||
|
||||
// Returns whether the server is running.
|
||||
// A server becomes alive after start() and becomes dead after abort()
|
||||
// is called or an error happens in one of the internal fibers.
|
||||
virtual bool is_alive() const = 0;
|
||||
|
||||
// Return Raft protocol current term.
|
||||
virtual term_t get_current_term() const = 0;
|
||||
|
||||
|
||||
@@ -75,8 +75,12 @@ struct wrong_destination {
|
||||
raft::server_id reached_id;
|
||||
};
|
||||
|
||||
struct group_liveness_info {
|
||||
bool group0_alive;
|
||||
};
|
||||
|
||||
struct direct_fd_ping_reply {
|
||||
std::variant<std::monostate, wrong_destination> result;
|
||||
std::variant<std::monostate, wrong_destination, group_liveness_info> result;
|
||||
};
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -101,8 +101,8 @@ class group0_rpc: public service::raft_rpc {
|
||||
public:
|
||||
explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd,
|
||||
raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, raft::group_id gid, raft::server_id srv_id)
|
||||
: raft_rpc(sm, ms, address_map, gid, srv_id)
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id)
|
||||
: raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id)
|
||||
, _direct_fd(direct_fd)
|
||||
{}
|
||||
|
||||
@@ -199,7 +199,7 @@ const raft::server_id& raft_group0::load_my_id() {
|
||||
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) {
|
||||
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), topology_change_enabled);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), gid, my_id);
|
||||
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id);
|
||||
// Keep a reference to a specific RPC class.
|
||||
auto& rpc_ref = *rpc;
|
||||
auto storage = std::make_unique<raft_sys_table_storage>(qp, gid, my_id);
|
||||
|
||||
@@ -297,15 +297,27 @@ void raft_group_registry::init_rpc_verbs() {
|
||||
// XXX: update address map here as well?
|
||||
|
||||
if (_my_id != dst) {
|
||||
co_return direct_fd_ping_reply {
|
||||
return make_ready_future<direct_fd_ping_reply>(direct_fd_ping_reply {
|
||||
.result = wrong_destination {
|
||||
.reached_id = _my_id,
|
||||
},
|
||||
};
|
||||
});
|
||||
}
|
||||
co_return direct_fd_ping_reply {
|
||||
.result = std::monostate{},
|
||||
};
|
||||
|
||||
return container().invoke_on(0, [] (raft_group_registry& me) -> future<direct_fd_ping_reply> {
|
||||
bool group0_alive = false;
|
||||
if (me._group0_id) {
|
||||
auto* group0_server = me.find_server(*me._group0_id);
|
||||
if (group0_server && group0_server->is_alive()) {
|
||||
group0_alive = true;
|
||||
}
|
||||
}
|
||||
co_return direct_fd_ping_reply {
|
||||
.result = service::group_liveness_info{
|
||||
.group0_alive = group0_alive,
|
||||
}
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -513,6 +525,8 @@ future<bool> direct_fd_pinger::ping(direct_failure_detector::pinger::endpoint_id
|
||||
rslog.trace("ping(id = {}, ip_addr = {}): wrong destination (reached {})",
|
||||
dst_id, *addr, wrong_dst->reached_id);
|
||||
co_return false;
|
||||
} else if (auto* info = std::get_if<group_liveness_info>(&reply.result)) {
|
||||
co_return info->group0_alive;
|
||||
}
|
||||
} catch (seastar::rpc::closed_error&) {
|
||||
co_return false;
|
||||
|
||||
@@ -27,16 +27,21 @@ raft_ticker_type::time_point timeout() {
|
||||
}
|
||||
|
||||
raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, raft::group_id gid, raft::server_id my_id)
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> failure_detector, raft::group_id gid, raft::server_id my_id)
|
||||
: _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms)
|
||||
, _address_map(address_map)
|
||||
, _address_map(address_map), _failure_detector(std::move(failure_detector))
|
||||
{}
|
||||
|
||||
|
||||
template <typename Verb, typename Msg> void
|
||||
template <raft_rpc::one_way_kind rpc_kind, typename Verb, typename Msg> void
|
||||
raft_rpc::one_way_rpc(sloc loc, raft::server_id id,
|
||||
Verb&& verb, Msg&& msg) {
|
||||
(void)with_gate(_shutdown_gate, [this, loc = std::move(loc), id, &verb, &msg] () mutable {
|
||||
if (rpc_kind == one_way_kind::request && !_failure_detector->is_alive(id)) {
|
||||
rlogger.debug("{}:{}: {} dropping outgoing message to {} - node is not seen as alive by the failure detector",
|
||||
loc.file_name(), loc.line(), loc.function_name(), id);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto ip_addr = _address_map.find(id);
|
||||
if (!ip_addr) {
|
||||
rlogger.debug("{}:{}: {} dropping outgoing message to {} - IP address not found",
|
||||
@@ -60,9 +65,12 @@ template <typename Verb, typename... Args>
|
||||
auto
|
||||
raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
|
||||
Verb&& verb, Args&&... args) {
|
||||
auto ip_addr = _address_map.find(id);
|
||||
using Fut = decltype(verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...));
|
||||
using Fut = decltype(verb(&_messaging, netw::msg_addr(gms::inet_address()), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...));
|
||||
using Ret = typename Fut::value_type;
|
||||
if (!_failure_detector->is_alive(id)) {
|
||||
return make_exception_future<Ret>(raft::destination_not_alive_error(id, loc));
|
||||
}
|
||||
auto ip_addr = _address_map.find(id);
|
||||
if (!ip_addr) {
|
||||
const auto msg = format("Failed to send {} {}: ip address not found", loc.function_name(), id);
|
||||
return make_exception_future<Ret>(raft::transport_error(msg));
|
||||
@@ -80,6 +88,10 @@ future<raft::snapshot_reply> raft_rpc::send_snapshot(raft::server_id id, const r
|
||||
}
|
||||
|
||||
future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) {
|
||||
if (!_failure_detector->is_alive(id)) {
|
||||
rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id);
|
||||
co_return;
|
||||
}
|
||||
auto ip_addr = _address_map.find(id);
|
||||
if (!ip_addr) {
|
||||
const auto msg = format("Failed to send append_entires to {}: ip address not found", id);
|
||||
@@ -90,27 +102,27 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re
|
||||
}
|
||||
|
||||
void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply);
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply);
|
||||
}
|
||||
|
||||
void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request);
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request);
|
||||
}
|
||||
|
||||
void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply);
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply);
|
||||
}
|
||||
|
||||
void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now);
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now);
|
||||
}
|
||||
|
||||
void raft_rpc::send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum);
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum);
|
||||
}
|
||||
|
||||
void raft_rpc::send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& read_quorum_reply) {
|
||||
one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply);
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply);
|
||||
}
|
||||
|
||||
future<raft::add_entry_reply> raft_rpc::send_add_entry(raft::server_id id, const raft::command& cmd) {
|
||||
|
||||
@@ -28,13 +28,16 @@ protected:
|
||||
raft::server_id _my_id; // Raft server id of this node.
|
||||
netw::messaging_service& _messaging;
|
||||
raft_address_map& _address_map;
|
||||
shared_ptr<raft::failure_detector> _failure_detector;
|
||||
seastar::gate _shutdown_gate;
|
||||
|
||||
explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
|
||||
raft_address_map& address_map, raft::group_id gid, raft::server_id my_id);
|
||||
raft_address_map& address_map, shared_ptr<raft::failure_detector> failure_detector, raft::group_id gid, raft::server_id my_id);
|
||||
|
||||
private:
|
||||
template <typename Verb, typename Msg> void
|
||||
enum class one_way_kind { request, reply };
|
||||
|
||||
template <one_way_kind rpc_kind, typename Verb, typename Msg> void
|
||||
one_way_rpc(std::source_location loc, raft::server_id id, Verb&& verb, Msg&& msg);
|
||||
|
||||
template <typename Verb, typename... Args> auto
|
||||
|
||||
Reference in New Issue
Block a user