diff --git a/raft/README.md b/raft/README.md index 764bac7216..b98b483f67 100644 --- a/raft/README.md +++ b/raft/README.md @@ -145,6 +145,11 @@ this: groups, hence failure detection RPC could run once on network peer level, not sepately for each Raft instance. The library expects an accurate `failure_detector` instance from a complying implementation. +- Since Raft leader no longer sends RPC every 0.1 second there can be a + situation when a follower may not know who the leader is for a long time + (if the leader is idle). Add an extension that allows a follower to actively + search for a leader by sending specially crafted append reply RPC to all voters. + A leader will reply with an empty append message to such a message. ### Pre-voting and protection against disruptive leaders diff --git a/raft/fsm.cc b/raft/fsm.cc index f742a2e0ce..c1d0c07083 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -158,6 +158,7 @@ void fsm::become_leader() { _state.emplace(_config.max_log_size, *this); leader_state().log_limiter_semaphore->consume(_log.in_memory_size()); _last_election_time = _clock.now(); + _ping_leader = false; // a new leader needs to commit at lease one entry to make sure that // all existing entries in its log are committed as well. Also it should // send append entries RPC as soon as possible to establish its leadership @@ -178,6 +179,7 @@ void fsm::become_follower(server_id leader) { // assigned. The exchange here guarantis that. std::exchange(_state, follower{.current_leader = leader}); if (leader != server_id{}) { + _ping_leader = false; _last_election_time = _clock.now(); } } @@ -583,6 +585,25 @@ void fsm::tick() { _current_term, _last_election_time, _clock.now()); become_candidate(_config.enable_prevoting); } + + if (is_follower() && !current_leader() && _ping_leader) { + // We are a follower but a leader is not known. It will not be known + // until a communication from a leader which (for an idle leader) may + // not happen any time soon since we use external failure detector and + // our leader does not send periodic empty append messages. By sending + // a special append message reject we solicit a reply from a leader. + // Non leaders will ignore the append reply. + auto& cfg = get_configuration(); + // If conf is joint it means a leader will send us a non joint one eventually + if (!cfg.is_joint() && cfg.current.contains(raft::server_address{_my_id})) { + for (auto s : cfg.current) { + if (s.can_vote && s.id != _my_id && _failure_detector.is_alive(s.id)) { + logger.trace("tick[{}]: searching for a leader. Pinging {}", _my_id, s.id); + send_to(s.id, append_reply{_current_term, _commit_idx, append_reply::rejected{index_t{0}, index_t{0}}}); + } + } + } + } } void fsm::append_entries(server_id from, append_request&& request) { @@ -687,6 +708,15 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) { logger.trace("append_entries_reply[{}->{}]: rejected match={} index={}", _my_id, from, progress.match_idx, rejected.non_matching_idx); + // If non_matching_idx and last_idx are zero it means that a follower is looking for a leader + // as such message cannot be a result of real missmatch. + // Send an empty append message to notify it that we are the leader + if (rejected.non_matching_idx == index_t{0} && rejected.last_idx == index_t{0}) { + logger.trace("append_entries_reply[{}->{}]: send empty append message", _my_id, from); + replicate_to(progress, true); + return; + } + // check reply validity if (progress.is_stray_reject(rejected)) { logger.trace("append_entries_reply[{}->{}]: drop stray append reject", _my_id, from); diff --git a/raft/fsm.hh b/raft/fsm.hh index c5864c1258..1908cc13b0 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -165,6 +165,9 @@ class fsm { fsm_config _config; // This is set to true when leadership transfer process is aborted due to a timeout bool _abort_leadership_transfer = false; + // Set if we want to actively search for a leader. + // Can be true only if the leader is not known + bool _ping_leader = false; // Stores the last state observed by get_output(). // Is updated with the actual state of the FSM after @@ -383,12 +386,18 @@ public: } } + // Ask to search for a leader if one is not known. + void ping_leader() { + assert(!current_leader()); + _ping_leader = true; + } + // Call this function to wait for the number of log entries to // go below max_log_size. // On abort throws `semaphore_aborted`. future<> wait_max_log_size(seastar::abort_source* as); - // Return current configuration. Throws if not a leader. + // Return current configuration. const configuration& get_configuration() const; // Add an entry to in-memory log. The entry has to be @@ -607,6 +616,7 @@ void fsm::step(server_id from, Message&& msg) { // leader to avoid starting an election if the // leader becomes idle. follower_state().current_leader = from; + _ping_leader = false; } // 3.4. Leader election diff --git a/raft/server.cc b/raft/server.cc index 8e8523ef78..b5e6374235 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -332,6 +332,12 @@ future<> server_impl::start() { } future<> server_impl::wait_for_leader(seastar::abort_source* as) { + if (_fsm->current_leader()) { + co_return; + } + + logger.trace("[{}] the leader is unknown, waiting through uncertainty", id()); + _fsm->ping_leader(); if (!_leader_promise) { _leader_promise.emplace(); } @@ -480,7 +486,6 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_ throw request_aborted(); } if (leader == server_id{}) { - logger.trace("[{}] the leader is unknown, waiting through uncertainty", id()); co_await wait_for_leader(as); leader = _fsm->current_leader(); } else { diff --git a/service/raft/raft_group_registry.cc b/service/raft/raft_group_registry.cc index a17c6b3353..481fedc28f 100644 --- a/service/raft/raft_group_registry.cc +++ b/service/raft/raft_group_registry.cc @@ -24,23 +24,8 @@ raft_group_registry::raft_group_registry(bool is_enabled, netw::messaging_servic : _is_enabled(is_enabled) , _ms(ms) , _fd(make_shared(gossiper, _srv_address_mappings)) - , _raft_support_listener(feat.cluster_supports_raft_cluster_mgmt().when_enabled([this, &feat, &gossiper] { - // If the `USES_RAFT_CLUSTER_MANAGEMENT` feature was already enabled, do nothing. - // - // This can happen if the current node is either a fresh node in an empty cluster - // or a restarting node, which is already part of an existing group0. - if (!_is_enabled || feat.cluster_uses_raft_cluster_mgmt()) { - return; - } - // When the cluster fully supports raft-based cluster management, - // we can re-enable support for the second gossip feature to trigger - // actual use of raft-based cluster management procedures. - feat.support(gms::features::USES_RAFT_CLUSTER_MANAGEMENT).get(); - if (this_shard_id() == 0) { - // When the supported feature set is dynamically extended, re-advertise it through the gossip. - gossiper.add_local_application_state(gms::application_state::SUPPORTED_FEATURES, - gms::versioned_value::supported_features(feat.supported_feature_set())).get(); - } + , _raft_support_listener(feat.cluster_supports_raft_cluster_mgmt().when_enabled([] { + // TODO: join group 0 on upgrade })) { } diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc index 5062b90b5c..8a2f9f6360 100644 --- a/test/raft/fsm_test.cc +++ b/test/raft/fsm_test.cc @@ -2270,3 +2270,37 @@ BOOST_AUTO_TEST_CASE(test_append_entry_inside_snapshot) { communicate(A, B, C); BOOST_CHECK(!C.get_log().empty()); } + +BOOST_AUTO_TEST_CASE(test_ping_leader) { + discrete_failure_detector fd; + server_id A_id = id(), B_id = id(), C_id = id(); + raft::configuration cfg(raft::server_address_set{ + raft::server_address{A_id}, + raft::server_address{B_id}, + raft::server_address{C_id, false}}); + + raft::log log(raft::snapshot_descriptor{.idx = index_t{0}, .config = cfg}); + auto A = create_follower(A_id, log, fd); + auto B = create_follower(B_id, log, fd); + auto C = create_follower(C_id, log, fd); + election_timeout(A); + communicate(A, B, C); + BOOST_CHECK(A.is_leader()); + // Check that non voter forgot a leader after election timeout. + // It does not have to be this way, but currently our impl behaves this + // way. + fd.mark_all_dead(); + election_timeout(C); + BOOST_CHECK(!C.current_leader()); + // Check that without any new input a node will not find out who leader is + // after network repairs. + fd.mark_all_alive(); + communicate(A, B, C); + BOOST_CHECK(!C.current_leader()); + // Check that is we request leader ping then a node is able to find out + // the leader after communicating with the cluster. + C.ping_leader(); + C.tick(); + communicate(A, B, C); + BOOST_CHECK(C.current_leader()); +}