Merge 'raft: fix startup hangs' from Kamil Braun

Fix hangs on Scylla node startup with Raft enabled that were caused by:
- a deadlock when enabling the USES_RAFT feature,
- a non-voter server forgetting who the leader is and not being able to forward a `modify_config` entry to become a voter.

Read the commit messages for details.

Fixes: #10379
Refs: #10355

Closes #10380

* github.com:scylladb/scylla:
  raft: actively search for a leader if it is not known for a tick duration
  raft: server: return immediately from `wait_for_leader` if leader is known
  service: raft: don't support/advertise USES_RAFT feature
This commit is contained in:
Tomasz Grabiec
2022-04-29 19:47:09 +02:00
6 changed files with 88 additions and 19 deletions

View File

@@ -145,6 +145,11 @@ this:
groups, hence failure detection RPC could run once on network peer level,
not sepately for each Raft instance. The library expects an accurate
`failure_detector` instance from a complying implementation.
- Since Raft leader no longer sends RPC every 0.1 second there can be a
situation when a follower may not know who the leader is for a long time
(if the leader is idle). Add an extension that allows a follower to actively
search for a leader by sending specially crafted append reply RPC to all voters.
A leader will reply with an empty append message to such a message.
### Pre-voting and protection against disruptive leaders

View File

@@ -158,6 +158,7 @@ void fsm::become_leader() {
_state.emplace<leader>(_config.max_log_size, *this);
leader_state().log_limiter_semaphore->consume(_log.in_memory_size());
_last_election_time = _clock.now();
_ping_leader = false;
// a new leader needs to commit at lease one entry to make sure that
// all existing entries in its log are committed as well. Also it should
// send append entries RPC as soon as possible to establish its leadership
@@ -178,6 +179,7 @@ void fsm::become_follower(server_id leader) {
// assigned. The exchange here guarantis that.
std::exchange(_state, follower{.current_leader = leader});
if (leader != server_id{}) {
_ping_leader = false;
_last_election_time = _clock.now();
}
}
@@ -583,6 +585,25 @@ void fsm::tick() {
_current_term, _last_election_time, _clock.now());
become_candidate(_config.enable_prevoting);
}
if (is_follower() && !current_leader() && _ping_leader) {
// We are a follower but a leader is not known. It will not be known
// until a communication from a leader which (for an idle leader) may
// not happen any time soon since we use external failure detector and
// our leader does not send periodic empty append messages. By sending
// a special append message reject we solicit a reply from a leader.
// Non leaders will ignore the append reply.
auto& cfg = get_configuration();
// If conf is joint it means a leader will send us a non joint one eventually
if (!cfg.is_joint() && cfg.current.contains(raft::server_address{_my_id})) {
for (auto s : cfg.current) {
if (s.can_vote && s.id != _my_id && _failure_detector.is_alive(s.id)) {
logger.trace("tick[{}]: searching for a leader. Pinging {}", _my_id, s.id);
send_to(s.id, append_reply{_current_term, _commit_idx, append_reply::rejected{index_t{0}, index_t{0}}});
}
}
}
}
}
void fsm::append_entries(server_id from, append_request&& request) {
@@ -687,6 +708,15 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) {
logger.trace("append_entries_reply[{}->{}]: rejected match={} index={}",
_my_id, from, progress.match_idx, rejected.non_matching_idx);
// If non_matching_idx and last_idx are zero it means that a follower is looking for a leader
// as such message cannot be a result of real missmatch.
// Send an empty append message to notify it that we are the leader
if (rejected.non_matching_idx == index_t{0} && rejected.last_idx == index_t{0}) {
logger.trace("append_entries_reply[{}->{}]: send empty append message", _my_id, from);
replicate_to(progress, true);
return;
}
// check reply validity
if (progress.is_stray_reject(rejected)) {
logger.trace("append_entries_reply[{}->{}]: drop stray append reject", _my_id, from);

View File

@@ -165,6 +165,9 @@ class fsm {
fsm_config _config;
// This is set to true when leadership transfer process is aborted due to a timeout
bool _abort_leadership_transfer = false;
// Set if we want to actively search for a leader.
// Can be true only if the leader is not known
bool _ping_leader = false;
// Stores the last state observed by get_output().
// Is updated with the actual state of the FSM after
@@ -383,12 +386,18 @@ public:
}
}
// Ask to search for a leader if one is not known.
void ping_leader() {
assert(!current_leader());
_ping_leader = true;
}
// Call this function to wait for the number of log entries to
// go below max_log_size.
// On abort throws `semaphore_aborted`.
future<> wait_max_log_size(seastar::abort_source* as);
// Return current configuration. Throws if not a leader.
// Return current configuration.
const configuration& get_configuration() const;
// Add an entry to in-memory log. The entry has to be
@@ -607,6 +616,7 @@ void fsm::step(server_id from, Message&& msg) {
// leader to avoid starting an election if the
// leader becomes idle.
follower_state().current_leader = from;
_ping_leader = false;
}
// 3.4. Leader election

View File

@@ -332,6 +332,12 @@ future<> server_impl::start() {
}
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
if (_fsm->current_leader()) {
co_return;
}
logger.trace("[{}] the leader is unknown, waiting through uncertainty", id());
_fsm->ping_leader();
if (!_leader_promise) {
_leader_promise.emplace();
}
@@ -480,7 +486,6 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
throw request_aborted();
}
if (leader == server_id{}) {
logger.trace("[{}] the leader is unknown, waiting through uncertainty", id());
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {

View File

@@ -24,23 +24,8 @@ raft_group_registry::raft_group_registry(bool is_enabled, netw::messaging_servic
: _is_enabled(is_enabled)
, _ms(ms)
, _fd(make_shared<raft_gossip_failure_detector>(gossiper, _srv_address_mappings))
, _raft_support_listener(feat.cluster_supports_raft_cluster_mgmt().when_enabled([this, &feat, &gossiper] {
// If the `USES_RAFT_CLUSTER_MANAGEMENT` feature was already enabled, do nothing.
//
// This can happen if the current node is either a fresh node in an empty cluster
// or a restarting node, which is already part of an existing group0.
if (!_is_enabled || feat.cluster_uses_raft_cluster_mgmt()) {
return;
}
// When the cluster fully supports raft-based cluster management,
// we can re-enable support for the second gossip feature to trigger
// actual use of raft-based cluster management procedures.
feat.support(gms::features::USES_RAFT_CLUSTER_MANAGEMENT).get();
if (this_shard_id() == 0) {
// When the supported feature set is dynamically extended, re-advertise it through the gossip.
gossiper.add_local_application_state(gms::application_state::SUPPORTED_FEATURES,
gms::versioned_value::supported_features(feat.supported_feature_set())).get();
}
, _raft_support_listener(feat.cluster_supports_raft_cluster_mgmt().when_enabled([] {
// TODO: join group 0 on upgrade
}))
{
}

View File

@@ -2270,3 +2270,37 @@ BOOST_AUTO_TEST_CASE(test_append_entry_inside_snapshot) {
communicate(A, B, C);
BOOST_CHECK(!C.get_log().empty());
}
BOOST_AUTO_TEST_CASE(test_ping_leader) {
discrete_failure_detector fd;
server_id A_id = id(), B_id = id(), C_id = id();
raft::configuration cfg(raft::server_address_set{
raft::server_address{A_id},
raft::server_address{B_id},
raft::server_address{C_id, false}});
raft::log log(raft::snapshot_descriptor{.idx = index_t{0}, .config = cfg});
auto A = create_follower(A_id, log, fd);
auto B = create_follower(B_id, log, fd);
auto C = create_follower(C_id, log, fd);
election_timeout(A);
communicate(A, B, C);
BOOST_CHECK(A.is_leader());
// Check that non voter forgot a leader after election timeout.
// It does not have to be this way, but currently our impl behaves this
// way.
fd.mark_all_dead();
election_timeout(C);
BOOST_CHECK(!C.current_leader());
// Check that without any new input a node will not find out who leader is
// after network repairs.
fd.mark_all_alive();
communicate(A, B, C);
BOOST_CHECK(!C.current_leader());
// Check that is we request leader ping then a node is able to find out
// the leader after communicating with the cluster.
C.ping_leader();
C.tick();
communicate(A, B, C);
BOOST_CHECK(C.current_leader());
}