From 1eb849c3d715475f21bb893f24409d98c331447d Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 16 May 2022 17:57:02 +0200 Subject: [PATCH 1/6] raft: fsm: don't advance commit index further than match_idx during read_quorum It's not safe to advance the commit index further than match_idx since beyond that point the follower's log may be outdated. Fixes #10578. --- raft/fsm.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/fsm.cc b/raft/fsm.cc index c1d0c07083..9f7e523b98 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -1044,7 +1044,7 @@ void fsm::broadcast_read_quorum(read_id id) { if (p.id == _my_id) { handle_read_quorum_reply(_my_id, read_quorum_reply{_current_term, _commit_idx, id}); } else { - send_to(p.id, read_quorum{_current_term, _commit_idx, id}); + send_to(p.id, read_quorum{_current_term, std::min(p.match_idx, _commit_idx), id}); } } } From 86c5036353153af1a185aac7f5a4167a4e5ed14f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 17 May 2022 16:35:27 +0200 Subject: [PATCH 2/6] raft: server: handle aborts correctly in `read_barrier` The `wait_for_apply` function, called from `read_barrier`, didn't handle aborts. Fix that. --- raft/server.cc | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index 7025ccf3d7..f5dba0bf78 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -35,6 +35,11 @@ struct active_read { optimized_optional abort; }; +struct awaited_index { + promise<> promise; + optimized_optional abort; +}; + static const seastar::metrics::label server_id_label("id"); static const seastar::metrics::label log_entry_type("log_entry_type"); static const seastar::metrics::label message_type("message_type"); @@ -99,7 +104,7 @@ private: // Index of the last entry applied to `_state_machine`. index_t _applied_idx; std::list _reads; - std::multimap> _awaited_indexes; + std::multimap _awaited_indexes; // Set to true when abort() is called bool _aborted = false; @@ -260,7 +265,7 @@ private: // Wait for a read barrier index to be applied. The index // is typically already committed, so we don't worry about the // term. - future<> wait_for_apply(index_t idx); + future<> wait_for_apply(index_t idx, abort_source*); // Set configuration but don't wait for transition joint -> // non_joint. future<> enter_joint_configuration(server_address_set c_new, seastar::abort_source* as); @@ -714,7 +719,7 @@ void server_impl::signal_applied() { if (it->first > _applied_idx) { break; } - it->second.set_value(); + it->second.promise.set_value(); it = _awaited_indexes.erase(it); } } @@ -1060,12 +1065,22 @@ term_t server_impl::get_current_term() const { return _fsm->get_current_term(); } -future<> server_impl::wait_for_apply(index_t idx) { +future<> server_impl::wait_for_apply(index_t idx, abort_source* as) { + if (as && as->abort_requested()) { + throw request_aborted(); + } if (idx > _applied_idx) { // The index is not applied yet. Wait for it. // This will be signalled when read_idx is applied - auto it = _awaited_indexes.emplace(idx, promise<>()); - co_await it->second.get_future(); + auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}}); + if (as) { + it->second.abort = as->subscribe([this, it] () noexcept { + it->second.promise.set_exception(request_aborted()); + _awaited_indexes.erase(it); + }); + assert(it->second.abort); + } + co_await it->second.promise.get_future(); } } @@ -1129,7 +1144,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { // committed any entries yet, so wait for any entry to be // committed (if non were since start of the attempt) and retry. logger.trace("[{}] read_barrier leader not ready", _id); - co_await wait_for_apply(++applied); + co_await wait_for_apply(++applied, as); } else if (std::holds_alternative(res)) { leader = std::get(res).leader; } else { @@ -1139,7 +1154,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { } logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx); - co_return co_await wait_for_apply(read_idx); + co_return co_await wait_for_apply(read_idx, as); } void server_impl::abort_snapshot_transfer(server_id id) { @@ -1213,7 +1228,7 @@ future<> server_impl::abort() { // Abort all read_barriers with an exception for (auto& i : _awaited_indexes) { - i.second.set_exception(stopped_error()); + i.second.promise.set_exception(stopped_error()); } _awaited_indexes.clear(); From c8237d405e0ce25d7a15d102ecd5ad3623750043 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 12 May 2022 16:36:51 +0200 Subject: [PATCH 3/6] raft: server: in `abort()`, abort read barriers before waiting for rpc abort `rpc::abort` may need to wait until all read barriers finish, so abort read barrier before waiting for `rpc::abort` to finish to avoid a deadlock on shutdown. `rpc::abort` is still called before the read barriers are aborted, only waited for after. Calling it first prevents new read barriers from being started by `rpc` (see `rpc::abort` comment). Also prevent new read barriers from being started after abort starts directly on a leader by checking the `_aborted` flag at the beginning of `execute_read_barrier`. Finally, use the opportunity to remove some compiler-dependent code. --- raft/server.cc | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index f5dba0bf78..56fbdf53d8 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -1085,6 +1085,10 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) { } future server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) { + if (_aborted) { + throw stopped_error(); + } + logger.trace("[{}] execute_read_barrier start", _id); std::optional> rid; @@ -1103,7 +1107,7 @@ future server_impl::execute_read_barrier(server_id from, sea return make_exception_future(request_aborted()); } _reads.push_back({rid->first, rid->second, {}, {}}); - auto read = --_reads.end(); + auto read = std::prev(_reads.end()); if (as) { read->abort = as->subscribe([this, read] () noexcept { read->promise.set_exception(request_aborted()); @@ -1203,8 +1207,6 @@ future<> server_impl::abort() { // Destroy entry waiters before waiting for `abort_rpc`, // since the RPC implementation may wait for forwarded `modify_config` calls to finish // (and `modify_config` does not finish until the configuration entry is committed or an error occurs). - // FIXME: probably need to do the same with read barriers (`_reads`) - // (not doing it yet because I want to catch the problem first in nemesis test) for (auto& ac: _awaited_commits) { ac.second.done.set_exception(stopped_error()); } @@ -1214,12 +1216,6 @@ future<> server_impl::abort() { _awaited_commits.clear(); _awaited_applies.clear(); - co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result(); - - if (_leader_promise) { - _leader_promise->set_exception(stopped_error()); - } - // Complete all read attempts with not_a_leader for (auto& r: _reads) { r.promise.set_value(raft::not_a_leader{server_id{}}); @@ -1232,6 +1228,12 @@ future<> server_impl::abort() { } _awaited_indexes.clear(); + co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result(); + + if (_leader_promise) { + _leader_promise->set_exception(stopped_error()); + } + abort_snapshot_transfers(); auto snp_futures = _aborted_snapshot_transfers | boost::adaptors::map_values; From 4ea58078621b3cb20f3496c8773abb5894adc8dc Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 16 May 2022 16:36:10 +0200 Subject: [PATCH 4/6] test: raft: randomized_nemesis_test: add flags for disabling nemeses Makes it easier to debug stuff. --- test/raft/randomized_nemesis_test.cc | 37 ++++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 9499341725..26d10761e1 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -2905,6 +2905,10 @@ SEASTAR_TEST_CASE(basic_generator_test) { // threshold is 1024 log commands and we perform only 500 ops. bool frequent_snapshotting = bdist(random_engine); + bool nemesis_partitions = true; + bool nemesis_reconfigurations = true; + bool nemesis_crashes = true; + // TODO: randomize the snapshot thresholds between different servers for more chaos. auto srv_cfg = frequent_snapshotting ? raft::server::configuration { @@ -3011,24 +3015,31 @@ SEASTAR_TEST_CASE(basic_generator_test) { // ~= [4s, 8s] -> ~1/2 partitions should cause an election // we will set request timeout 600_t ~= 6s and partition every 1200_t ~= 12s - auto gen = op_limit(500, + auto num_ops = 500; + auto gen = op_limit(num_ops, pin(partition_thread, - stagger(seed, timer.now() + 200_t, 1200_t, 1200_t, - random(seed, [] (std::mt19937& engine) { - static std::uniform_int_distribution dist{400, 800}; - return op_type{network_majority_grudge{raft::logical_clock::duration{dist(engine)}}}; - }) + op_limit(nemesis_partitions ? num_ops : 0, + stagger(seed, timer.now() + 200_t, 1200_t, 1200_t, + random(seed, [] (std::mt19937& engine) { + static std::uniform_int_distribution dist{400, 800}; + return op_type{network_majority_grudge{raft::logical_clock::duration{dist(engine)}}}; + }) + ) ), pin(reconfig_thread, - stagger(seed, timer.now() + 1000_t, 500_t, 500_t, - constant([] () { return op_type{reconfiguration{500_t}}; }) + op_limit(nemesis_reconfigurations ? num_ops : 0, + stagger(seed, timer.now() + 1000_t, 500_t, 500_t, + constant([] () { return op_type{reconfiguration{500_t}}; }) + ) ), pin(crash_thread, - stagger(seed, timer.now() + 200_t, 100_t, 200_t, - random(seed, [] (std::mt19937& engine) { - static std::uniform_int_distribution dist{0, 100}; - return op_type{stop_crash{raft::logical_clock::duration{dist(engine)}}}; - }) + op_limit(nemesis_crashes ? num_ops : 0, + stagger(seed, timer.now() + 200_t, 100_t, 200_t, + random(seed, [] (std::mt19937& engine) { + static std::uniform_int_distribution dist{0, 100}; + return op_type{stop_crash{raft::logical_clock::duration{dist(engine)}}}; + }) + ) ), stagger(seed, timer.now(), 0_t, 50_t, sequence(1, [] (int32_t i) { From 6b2b400143309c98dbce5925c5fc8799296c265c Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 12 May 2022 16:10:46 +0200 Subject: [PATCH 5/6] test: raft: randomized_nemesis_test: perform linearizable reads using read_barriers Introduce a new operation, `raft_read`, which calls `read_barrier` on a server, reads the state of the server's state machine, and returns that state. Extend the generator in `basic_generator_test` to generate `raft_read`s. Only do it if forwarding is enabled (although it may make sense to test read barriers in non-forwarding scenario as well - we may think about it and do it in a follow-up). For now, we don't check the consistency of the results of the reads. They do return the observed state, but we don't compare it yet with the model. For now we simply issue the reads concurrently with other operations to introduce some more chaos to the cluster and check liveness and consistency of existing operations. --- test/raft/randomized_nemesis_test.cc | 137 +++++++++++++++++++++++++-- 1 file changed, 131 insertions(+), 6 deletions(-) diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 26d10761e1..97d8632782 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -334,6 +334,42 @@ future> call( }); } +template +using read_result_t = std::variant; + +// Performs a linearizable read by calling a `read_barrier` and then reading the local state of the server's state machine. +// Only to be used in forwarding mode. +// See `call` for the meanings of `timeout`, `timer`, `server` and `sm`. +template +future> read( + raft::logical_clock::time_point timeout, + logical_timer& timer, + raft::server& server, + impure_state_machine& sm) { + // FIXME: using lambda as workaround for clang bug #50345. + auto impl = [] (raft::logical_clock::time_point timeout, logical_timer& timer, + raft::server& server, impure_state_machine& sm) -> future> { + try { + co_await with_timeout(timer, timeout, [&] (abort_source& as) { + return server.read_barrier(&as); + }); + + co_return sm.state(); + } catch (raft::stopped_error e) { + co_return e; + } catch (seastar::timed_out_error e) { + co_return e; + } catch (raft::request_aborted&) { + co_return timed_out_error{}; + } catch (...) { + tlogger.error("unexpected exception from `read`: {}", std::current_exception()); + assert(false); + } + }; + + return impl(timeout, timer, server, sm); +} + // Allows a Raft server to communicate with other servers. // The implementation is mostly boilerplate. It assumes that there exists a method of message passing // given by a `send_message_t` function (passed in the constructor) for sending and by the `receive` @@ -742,7 +778,12 @@ public: static const raft::logical_clock::duration execute_read_barrier_on_leader_timeout = 20_t; // TODO: catch aborts from the abort_source as well - co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f)); + try { + co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f)); + } catch (logical_timer::timed_out& e) { + (void) e.get_future().discard_result().handle_exception_type([] (const broken_promise&) { }); + throw timed_out_error{}; + } // co_await ensures that `guard` is destroyed before we leave `_gate` }); } @@ -1440,6 +1481,19 @@ public: } } + future> read( + raft::logical_clock::time_point timeout, + logical_timer& timer) { + assert(_started); + try { + co_return co_await with_gate(_gate, [this, timeout, &timer] { + return ::read(timeout, timer, *_server, _sm); + }); + } catch (const gate_closed_exception&) { + co_return raft::stopped_error{}; + } + } + future reconfigure( const std::vector& ids, raft::logical_clock::time_point timeout, @@ -1784,6 +1838,28 @@ public: co_return res; } + future> read( + raft::server_id id, + raft::logical_clock::time_point timeout, + logical_timer& timer) { + auto& n = _routes.at(id); + if (!n._server) { + // As in `call`. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + + auto srv = n._server.get(); + auto res = co_await srv->read(timeout, timer); + + if (srv != n._server.get()) { + // As in `call`. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + co_return res; + } + future reconfigure( raft::server_id id, const std::vector& ids, @@ -2364,6 +2440,40 @@ struct raft_call { } }; +// An operation representing a linearizable read from a Raft server. +// To be used only in forwarding mode. Doesn't bounce. +template +struct raft_read { + raft::logical_clock::duration timeout; + + using result_type = read_result_t; + + struct state_type { + environment& env; + const std::unordered_set& known; + logical_timer& timer; + }; + + future execute(state_type& s, const operation::context& ctx) { + assert(s.known.size() > 0); + static std::mt19937 engine{0}; + + auto it = s.known.begin(); + std::advance(it, std::uniform_int_distribution{0, s.known.size() - 1}(engine)); + auto contact = *it; + + tlogger.debug("read start tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact); + auto res = co_await s.env.read(contact, s.timer.now() + timeout, s.timer); + tlogger.debug("read end tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact); + + co_return res; + } + + friend std::ostream& operator<<(std::ostream& os, const raft_read& r) { + return os << format("raft_read{{timeout:{}}}", r.timeout); + } +}; + // An operation that partitions the network in half. // During the partition, no server from one half can contact any server from the other; // the partition is symmetric. @@ -2860,6 +2970,7 @@ std::ostream& operator<<(std::ostream& os, const AppendReg::ret& r) { SEASTAR_TEST_CASE(basic_generator_test) { using op_type = operation::invocable, + raft_read, network_majority_grudge, reconfiguration, stop_crash @@ -2973,6 +3084,12 @@ SEASTAR_TEST_CASE(basic_generator_test) { .timer = timer }; + raft_read::state_type read_state { + .env = env, + .known = known_config, + .timer = timer + }; + network_majority_grudge::state_type network_majority_grudge_state { .env = env, .known = known_config, @@ -2997,6 +3114,7 @@ SEASTAR_TEST_CASE(basic_generator_test) { auto init_state = op_type::state_type{ std::move(db_call_state), + std::move(read_state), std::move(network_majority_grudge_state), std::move(reconfiguration_state), std::move(crash_state) @@ -3041,11 +3159,18 @@ SEASTAR_TEST_CASE(basic_generator_test) { }) ) ), - stagger(seed, timer.now(), 0_t, 50_t, - sequence(1, [] (int32_t i) { - assert(i > 0); - return op_type{raft_call{AppendReg::append{i}, 200_t}}; - }) + either( + stagger(seed, timer.now(), 0_t, 50_t, + sequence(1, [] (int32_t i) { + assert(i > 0); + return op_type{raft_call{AppendReg::append{i}, 200_t}}; + }) + ), + op_limit(forwarding ? num_ops : 0 /* only produce raft_reads in forwarding mode */, + stagger(seed, timer.now(), 0_t, 200_t, + constant([] () { return op_type{raft_read{200_t}}; }) + ) + ) ) ) ) From 6268c637393adcdd60433608dbd9b00255266cd0 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 18 May 2022 16:19:06 +0200 Subject: [PATCH 6/6] test: raft: randomized_nemesis_test: check consistency of reads The test would perform `read_barrier`s but not check the correctness of the reads: whether the state observed by a read is consistent with the model and recent enough (in short, check linearizability). This commit adds the correctness checks. --- test/raft/randomized_nemesis_test.cc | 64 +++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 5 deletions(-) diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 97d8632782..3f3ad58174 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -2444,9 +2444,10 @@ struct raft_call { // To be used only in forwarding mode. Doesn't bounce. template struct raft_read { + int32_t read_id; raft::logical_clock::duration timeout; - using result_type = read_result_t; + using result_type = std::pair>; struct state_type { environment& env; @@ -2466,11 +2467,11 @@ struct raft_read { auto res = co_await s.env.read(contact, s.timer.now() + timeout, s.timer); tlogger.debug("read end tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact); - co_return res; + co_return result_type{read_id, std::move(res)}; } friend std::ostream& operator<<(std::ostream& os, const raft_read& r) { - return os << format("raft_read{{timeout:{}}}", r.timeout); + return os << format("raft_read{{id:{}, timeout:{}}}", r.read_id, r.timeout); } }; @@ -2853,6 +2854,10 @@ struct append_reg_model { std::unordered_set returned; std::unordered_set in_progress; + // For each read, the element observed at the end of the model sequence + // at the moment the read has started. + std::unordered_map reads; + void invocation(elem_t x) { assert(!index.contains(x)); assert(!in_progress.contains(x)); @@ -2866,7 +2871,7 @@ struct append_reg_model { try { completion(x, prev); } catch (inconsistency& e) { - e.what += format("\nwhen completing elem: {}\nprev: {}\nmodel: {}", x, prev, seq); + e.what += format("\nwhen completing append: {}\nprev: {}\nmodel: {}", x, prev, seq); throw; } returned.insert(x); @@ -2879,6 +2884,38 @@ struct append_reg_model { in_progress.erase(x); } + void start_read(int32_t id) { + auto [_, inserted] = reads.emplace(id, seq.back().elem); + assert(inserted); + } + + void read_success(int32_t id, append_seq result) { + auto read = reads.find(id); + assert(read != reads.end()); + + size_t idx = 0; + for (; idx < result.size(); ++idx) { + if (result[idx] == read->second) { + break; + } + } + + if (idx == result.size()) { + throw inconsistency{format( + "read {} observed last model elem {} at start not present in result: {}", + id, read->second, result)}; + } + + try { + auto [prev, x] = result.pop(); + completion(x, prev); + } catch (inconsistency& e) { + e.what += format( + "\nwhen completing read id: {}, last model elem at start: {}\nread result: {}", + id, read->second, result); + } + } + private: void completion(elem_t x, append_seq prev) { if (prev.empty()) { @@ -3168,7 +3205,9 @@ SEASTAR_TEST_CASE(basic_generator_test) { ), op_limit(forwarding ? num_ops : 0 /* only produce raft_reads in forwarding mode */, stagger(seed, timer.now(), 0_t, 200_t, - constant([] () { return op_type{raft_read{200_t}}; }) + sequence(1, [] (int32_t i) { + return op_type{raft_read{i, 200_t}}; + }) ) ) ) @@ -3196,6 +3235,9 @@ SEASTAR_TEST_CASE(basic_generator_test) { if (auto call_op = std::get_if>(&o.op)) { ++_stats.invocations; _model.invocation(call_op->input.x); + } else if (auto read_op = std::get_if>(&o.op)) { + ++_stats.invocations; + _model.start_read(read_op->read_id); } } @@ -3225,6 +3267,18 @@ SEASTAR_TEST_CASE(basic_generator_test) { ++_stats.failures; } ), *call_res); + } else if (auto read_res = std::get_if::result_type>(res)) { + std::visit(make_visitor( + [this, id = read_res->first] (AppendReg::state_t& s) { + tlogger.debug("read completion id: {} digest: {}", id, s.digest()); + + ++_stats.successes; + _model.read_success(id, std::move(s)); + }, + [this] (auto&) { + ++_stats.failures; + } + ), read_res->second); } else { tlogger.debug("completion {}", c); }