diff --git a/raft/raft.hh b/raft/raft.hh index f4ade0753b..59eab0fba0 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -587,6 +587,7 @@ public: // An endpoint on the leader to change configuration, // as requested by a remote follower. + // If the future resolves successfully, a dummy entry was committed after the configuration change. virtual future execute_modify_config(server_id from, std::vector add, std::vector del) = 0; diff --git a/raft/server.cc b/raft/server.cc index 14cee15963..66bd7370df 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -501,10 +501,12 @@ future server_impl::execute_modify_config(server_id from, cfg.erase(server_address{.id = to_remove}); } co_await enter_joint_configuration(cfg); - // Wait for transition joint->non-joint outside on the - // client. const log_entry& e = _fsm->add_entry(log_entry::dummy()); - co_return add_entry_reply{entry_id{.term = e.term, .idx = e.idx}}; + auto eid = entry_id{.term = e.term, .idx = e.idx}; + co_await wait_for_entry(eid, wait_type::committed); + // `modify_config` doesn't actually need the entry id for anything + // but we reuse the `add_entry` RPC verb which requires it. + co_return add_entry_reply{eid}; } catch (raft::error& e) { if (is_uncertainty(e)) { @@ -527,7 +529,7 @@ future<> server_impl::modify_config(std::vector add, std::vector } auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del)); if (std::holds_alternative(reply)) { - co_return co_await wait_for_entry(std::get(reply), wait_type::committed); + co_return; } throw raft::not_a_leader{_fsm->current_leader()}; } @@ -547,7 +549,10 @@ future<> server_impl::modify_config(std::vector add, std::vector } }(); if (std::holds_alternative(reply)) { - co_return co_await wait_for_entry(std::get(reply), wait_type::committed); + // Do not wait for the entry locally. The reply means that the leader committed it, + // and there is no reason to wait for our local commit index to match. + // See also #9981. + co_return; } leader = std::get(reply).leader; } diff --git a/raft/server.hh b/raft/server.hh index 39b790412b..e1a3398c8d 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -50,6 +50,11 @@ public: // May fail because of an internal error or because leader changed and an entry was either // replaced by the new leader or the server lost track of it. The former will result in // dropped_entry exception the later in commit_status_unknown. + // + // If forwarding is enabled and this is a follower, and the returned future resolves without exception, + // this means that the entry is committed/applied locally (depending on the wait type). + // Applied locally means the local state machine replica applied this command; + // committed locally means simply that the commit index is beyond this entry's index. virtual future<> add_entry(command command, wait_type type) = 0; // Set a new cluster configuration. If the configuration is @@ -78,8 +83,8 @@ public: // A simplified wrapper around set_configuration() which adds // and deletes servers. Unlike set_configuration(), - // works on a follower as well as on a leader (forwards the - // request to the current leader). If the added servers are + // works on a follower (if forwarding is enabled) as well as on a leader + // (forwards the request to the current leader). If the added servers are // already part of the configuration, or deleted are not // present, does nothing. The implementation forwards the // list of added or removed servers to the leader, where they @@ -88,6 +93,12 @@ public: // the log limiter semaphore. // This makes it possible to retry this command without // adverse effects to the configuration. + // + // If forwarding is enabled and this is a follower, and the returned future resolves without exception, + // this means that a dummy entry appended after non-joint configuration entry was committed by the leader. + // The local commit index is not necessarily up-to-date yet and the state of the local state machine + // replica may still come from before the configuration entry. + // (exception: if no server was actually added or removed, then nothing gets committed and the leader responds immediately). virtual future<> modify_config(std::vector add, std::vector del) = 0; diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index a04f081723..647d837ed4 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -1076,6 +1076,47 @@ future reconfigure( } } +future modify_config( + const std::vector& added, + std::vector deleted, + raft::logical_clock::time_point timeout, + logical_timer& timer, + raft::server& server) { + std::vector added_set; + for (auto id : added) { + added_set.push_back(raft::server_address { .id = id }); + } + + try { + co_await timer.with_timeout(timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] () mutable { + return server.modify_config(std::move(added_set), std::move(deleted)); + }()); + co_return std::monostate{}; + } catch (raft::not_a_leader e) { + co_return e; + } catch (raft::dropped_entry e) { + co_return e; + } catch (raft::commit_status_unknown e) { + co_return e; + } catch (raft::conf_change_in_progress e) { + co_return e; + } catch (raft::stopped_error e) { + co_return e; + } catch (logical_timer::timed_out e) { + (void)e.get_future().discard_result() + .handle_exception([] (std::exception_ptr eptr) { + try { + std::rethrow_exception(eptr); + } catch (const raft::dropped_entry&) { + } catch (const raft::commit_status_unknown&) { + } catch (const raft::not_a_leader&) { + } catch (const raft::stopped_error&) { + } + }); + co_return timed_out_error{}; + } +} + // Contains a `raft::server` and other facilities needed for it and the underlying // modules (persistence, rpc, etc.) to run, and to communicate with the external environment. template @@ -1199,6 +1240,21 @@ public: } } + future modify_config( + const std::vector& added, + std::vector deleted, + raft::logical_clock::time_point timeout, + logical_timer& timer) { + assert(_started); + try { + co_return co_await with_gate(_gate, [this, &added, deleted = std::move(deleted), timeout, &timer] { + return ::modify_config(added, std::move(deleted), timeout, timer, *_server); + }); + } catch (const gate_closed_exception&) { + co_return raft::stopped_error{}; + } + } + bool is_leader() const { return _server->is_leader(); } @@ -1562,6 +1618,32 @@ public: co_return res; } + future modify_config( + raft::server_id id, + const std::vector& added, + std::vector deleted, + raft::logical_clock::time_point timeout, + logical_timer& timer) { + auto& n = _routes.at(id); + if (!n._server) { + // A 'remote' caller doesn't know in general if the server is down or just slow to respond. + // Simulate this by timing out the call. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + + auto srv = n._server.get(); + auto res = co_await srv->modify_config(added, std::move(deleted), timeout, timer); + + if (srv != n._server.get()) { + // The server stopped while the call was happening. + // As above, we simulate a 'remote' call by timing it out in this case. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + co_return res; + } + std::optional get_configuration(raft::server_id id) { auto& n = _routes.at(id); if (!n._server) { @@ -1928,6 +2010,42 @@ SEASTAR_TEST_CASE(snapshotting_preserves_config_test) { }); } +// Regression test for #9981. +SEASTAR_TEST_CASE(removed_follower_with_forwarding_learns_about_removal) { + logical_timer timer; + environment_config cfg { + .rnd{0}, + .network_delay{1, 1}, + .fd_convict_threshold = 10_t, + }; + co_await with_env_and_ticker(cfg, [&timer] (environment& env, ticker& t) -> future<> { + t.start([&] (uint64_t tick) { + env.tick_network(); + timer.tick(); + if (tick % 10 == 0) { + env.tick_servers(); + } + }, 10'000); + + raft::server::configuration cfg { + .enable_forwarding = true, + }; + + auto id1 = co_await env.new_server(true, cfg); + assert(co_await wait_for_leader{}(env, {id1}, timer, timer.now() + 1000_t) == id1); + + auto id2 = co_await env.new_server(true, cfg); + assert(std::holds_alternative( + co_await env.reconfigure(id1, {id1, id2}, timer.now() + 100_t, timer))); + + // Server 2 forwards the entry that removes it to server 1. + // We want server 2 to eventually learn from server 1 that it was removed, + // so the call finishes (no timeout). + assert(std::holds_alternative( + co_await env.modify_config(id2, {}, {id2}, timer.now() + 100_t, timer))); + }); +} + // Given a function `F` which takes a `raft::server_id` argument and returns a variant type // which contains `not_a_leader`, repeatedly calls `F` until it returns something else than // `not_a_leader` or until we reach a limit, whichever happens first.