Merge "raft: let modify_config finish on a follower that removes itself" from Kamil
When forwarding a reconfiguration request from follower to a leader in `modify_config`, there is no reason to wait for the follower's commit index to be updated. The only useful information is that the leader committed the configuration change - so `modify_config` should return as soon as we know that. There is a reason *not* to wait for the follower's commit index to be updated: if the configuration change removes the follower, the follower will never learn about it, so a local waiter will never be resolved. `execute_modify_config` - the part of `modify_config` executed on the leader - is thus modified to finish when the configuration change is fully complete (including the dummy entry appended at the end), and `modify_config` - which does the forwarding - no longer creates a local waiter, but returns as soon as the RPC call to the leader confirms that the entry was committed on the leader. We still return an `entry_id` from `execute_modify_config` but that's just an artifact of the implementation. Fixes #9981. A regression test was also added in randomized_nemesis_test. * kbr/modify-config-finishes-v1: test: raft: randomized_nemesis_test: regression test for #9981 raft: server: don't create local waiter in `modify_config`
This commit is contained in:
@@ -587,6 +587,7 @@ public:
|
||||
|
||||
// An endpoint on the leader to change configuration,
|
||||
// as requested by a remote follower.
|
||||
// If the future resolves successfully, a dummy entry was committed after the configuration change.
|
||||
virtual future<add_entry_reply> execute_modify_config(server_id from,
|
||||
std::vector<server_address> add,
|
||||
std::vector<server_id> del) = 0;
|
||||
|
||||
@@ -501,10 +501,12 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
|
||||
cfg.erase(server_address{.id = to_remove});
|
||||
}
|
||||
co_await enter_joint_configuration(cfg);
|
||||
// Wait for transition joint->non-joint outside on the
|
||||
// client.
|
||||
const log_entry& e = _fsm->add_entry(log_entry::dummy());
|
||||
co_return add_entry_reply{entry_id{.term = e.term, .idx = e.idx}};
|
||||
auto eid = entry_id{.term = e.term, .idx = e.idx};
|
||||
co_await wait_for_entry(eid, wait_type::committed);
|
||||
// `modify_config` doesn't actually need the entry id for anything
|
||||
// but we reuse the `add_entry` RPC verb which requires it.
|
||||
co_return add_entry_reply{eid};
|
||||
|
||||
} catch (raft::error& e) {
|
||||
if (is_uncertainty(e)) {
|
||||
@@ -527,7 +529,7 @@ future<> server_impl::modify_config(std::vector<server_address> add, std::vector
|
||||
}
|
||||
auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del));
|
||||
if (std::holds_alternative<raft::entry_id>(reply)) {
|
||||
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), wait_type::committed);
|
||||
co_return;
|
||||
}
|
||||
throw raft::not_a_leader{_fsm->current_leader()};
|
||||
}
|
||||
@@ -547,7 +549,10 @@ future<> server_impl::modify_config(std::vector<server_address> add, std::vector
|
||||
}
|
||||
}();
|
||||
if (std::holds_alternative<raft::entry_id>(reply)) {
|
||||
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), wait_type::committed);
|
||||
// Do not wait for the entry locally. The reply means that the leader committed it,
|
||||
// and there is no reason to wait for our local commit index to match.
|
||||
// See also #9981.
|
||||
co_return;
|
||||
}
|
||||
leader = std::get<raft::not_a_leader>(reply).leader;
|
||||
}
|
||||
|
||||
@@ -50,6 +50,11 @@ public:
|
||||
// May fail because of an internal error or because leader changed and an entry was either
|
||||
// replaced by the new leader or the server lost track of it. The former will result in
|
||||
// dropped_entry exception the later in commit_status_unknown.
|
||||
//
|
||||
// If forwarding is enabled and this is a follower, and the returned future resolves without exception,
|
||||
// this means that the entry is committed/applied locally (depending on the wait type).
|
||||
// Applied locally means the local state machine replica applied this command;
|
||||
// committed locally means simply that the commit index is beyond this entry's index.
|
||||
virtual future<> add_entry(command command, wait_type type) = 0;
|
||||
|
||||
// Set a new cluster configuration. If the configuration is
|
||||
@@ -78,8 +83,8 @@ public:
|
||||
|
||||
// A simplified wrapper around set_configuration() which adds
|
||||
// and deletes servers. Unlike set_configuration(),
|
||||
// works on a follower as well as on a leader (forwards the
|
||||
// request to the current leader). If the added servers are
|
||||
// works on a follower (if forwarding is enabled) as well as on a leader
|
||||
// (forwards the request to the current leader). If the added servers are
|
||||
// already part of the configuration, or deleted are not
|
||||
// present, does nothing. The implementation forwards the
|
||||
// list of added or removed servers to the leader, where they
|
||||
@@ -88,6 +93,12 @@ public:
|
||||
// the log limiter semaphore.
|
||||
// This makes it possible to retry this command without
|
||||
// adverse effects to the configuration.
|
||||
//
|
||||
// If forwarding is enabled and this is a follower, and the returned future resolves without exception,
|
||||
// this means that a dummy entry appended after non-joint configuration entry was committed by the leader.
|
||||
// The local commit index is not necessarily up-to-date yet and the state of the local state machine
|
||||
// replica may still come from before the configuration entry.
|
||||
// (exception: if no server was actually added or removed, then nothing gets committed and the leader responds immediately).
|
||||
virtual future<> modify_config(std::vector<server_address> add,
|
||||
std::vector<server_id> del) = 0;
|
||||
|
||||
|
||||
@@ -1076,6 +1076,47 @@ future<reconfigure_result_t> reconfigure(
|
||||
}
|
||||
}
|
||||
|
||||
future<reconfigure_result_t> modify_config(
|
||||
const std::vector<raft::server_id>& added,
|
||||
std::vector<raft::server_id> deleted,
|
||||
raft::logical_clock::time_point timeout,
|
||||
logical_timer& timer,
|
||||
raft::server& server) {
|
||||
std::vector<raft::server_address> added_set;
|
||||
for (auto id : added) {
|
||||
added_set.push_back(raft::server_address { .id = id });
|
||||
}
|
||||
|
||||
try {
|
||||
co_await timer.with_timeout(timeout, [&server, added_set = std::move(added_set), deleted = std::move(deleted)] () mutable {
|
||||
return server.modify_config(std::move(added_set), std::move(deleted));
|
||||
}());
|
||||
co_return std::monostate{};
|
||||
} catch (raft::not_a_leader e) {
|
||||
co_return e;
|
||||
} catch (raft::dropped_entry e) {
|
||||
co_return e;
|
||||
} catch (raft::commit_status_unknown e) {
|
||||
co_return e;
|
||||
} catch (raft::conf_change_in_progress e) {
|
||||
co_return e;
|
||||
} catch (raft::stopped_error e) {
|
||||
co_return e;
|
||||
} catch (logical_timer::timed_out<void> e) {
|
||||
(void)e.get_future().discard_result()
|
||||
.handle_exception([] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const raft::dropped_entry&) {
|
||||
} catch (const raft::commit_status_unknown&) {
|
||||
} catch (const raft::not_a_leader&) {
|
||||
} catch (const raft::stopped_error&) {
|
||||
}
|
||||
});
|
||||
co_return timed_out_error{};
|
||||
}
|
||||
}
|
||||
|
||||
// Contains a `raft::server` and other facilities needed for it and the underlying
|
||||
// modules (persistence, rpc, etc.) to run, and to communicate with the external environment.
|
||||
template <PureStateMachine M>
|
||||
@@ -1199,6 +1240,21 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<reconfigure_result_t> modify_config(
|
||||
const std::vector<raft::server_id>& added,
|
||||
std::vector<raft::server_id> deleted,
|
||||
raft::logical_clock::time_point timeout,
|
||||
logical_timer& timer) {
|
||||
assert(_started);
|
||||
try {
|
||||
co_return co_await with_gate(_gate, [this, &added, deleted = std::move(deleted), timeout, &timer] {
|
||||
return ::modify_config(added, std::move(deleted), timeout, timer, *_server);
|
||||
});
|
||||
} catch (const gate_closed_exception&) {
|
||||
co_return raft::stopped_error{};
|
||||
}
|
||||
}
|
||||
|
||||
bool is_leader() const {
|
||||
return _server->is_leader();
|
||||
}
|
||||
@@ -1562,6 +1618,32 @@ public:
|
||||
co_return res;
|
||||
}
|
||||
|
||||
future<reconfigure_result_t> modify_config(
|
||||
raft::server_id id,
|
||||
const std::vector<raft::server_id>& added,
|
||||
std::vector<raft::server_id> deleted,
|
||||
raft::logical_clock::time_point timeout,
|
||||
logical_timer& timer) {
|
||||
auto& n = _routes.at(id);
|
||||
if (!n._server) {
|
||||
// A 'remote' caller doesn't know in general if the server is down or just slow to respond.
|
||||
// Simulate this by timing out the call.
|
||||
co_await timer.sleep_until(timeout);
|
||||
co_return timed_out_error{};
|
||||
}
|
||||
|
||||
auto srv = n._server.get();
|
||||
auto res = co_await srv->modify_config(added, std::move(deleted), timeout, timer);
|
||||
|
||||
if (srv != n._server.get()) {
|
||||
// The server stopped while the call was happening.
|
||||
// As above, we simulate a 'remote' call by timing it out in this case.
|
||||
co_await timer.sleep_until(timeout);
|
||||
co_return timed_out_error{};
|
||||
}
|
||||
co_return res;
|
||||
}
|
||||
|
||||
std::optional<raft::configuration> get_configuration(raft::server_id id) {
|
||||
auto& n = _routes.at(id);
|
||||
if (!n._server) {
|
||||
@@ -1928,6 +2010,42 @@ SEASTAR_TEST_CASE(snapshotting_preserves_config_test) {
|
||||
});
|
||||
}
|
||||
|
||||
// Regression test for #9981.
|
||||
SEASTAR_TEST_CASE(removed_follower_with_forwarding_learns_about_removal) {
|
||||
logical_timer timer;
|
||||
environment_config cfg {
|
||||
.rnd{0},
|
||||
.network_delay{1, 1},
|
||||
.fd_convict_threshold = 10_t,
|
||||
};
|
||||
co_await with_env_and_ticker<ExReg>(cfg, [&timer] (environment<ExReg>& env, ticker& t) -> future<> {
|
||||
t.start([&] (uint64_t tick) {
|
||||
env.tick_network();
|
||||
timer.tick();
|
||||
if (tick % 10 == 0) {
|
||||
env.tick_servers();
|
||||
}
|
||||
}, 10'000);
|
||||
|
||||
raft::server::configuration cfg {
|
||||
.enable_forwarding = true,
|
||||
};
|
||||
|
||||
auto id1 = co_await env.new_server(true, cfg);
|
||||
assert(co_await wait_for_leader<ExReg>{}(env, {id1}, timer, timer.now() + 1000_t) == id1);
|
||||
|
||||
auto id2 = co_await env.new_server(true, cfg);
|
||||
assert(std::holds_alternative<std::monostate>(
|
||||
co_await env.reconfigure(id1, {id1, id2}, timer.now() + 100_t, timer)));
|
||||
|
||||
// Server 2 forwards the entry that removes it to server 1.
|
||||
// We want server 2 to eventually learn from server 1 that it was removed,
|
||||
// so the call finishes (no timeout).
|
||||
assert(std::holds_alternative<std::monostate>(
|
||||
co_await env.modify_config(id2, {}, {id2}, timer.now() + 100_t, timer)));
|
||||
});
|
||||
}
|
||||
|
||||
// Given a function `F` which takes a `raft::server_id` argument and returns a variant type
|
||||
// which contains `not_a_leader`, repeatedly calls `F` until it returns something else than
|
||||
// `not_a_leader` or until we reach a limit, whichever happens first.
|
||||
|
||||
Reference in New Issue
Block a user