diff --git a/raft/fsm.cc b/raft/fsm.cc index c1d0c07083..9f7e523b98 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -1044,7 +1044,7 @@ void fsm::broadcast_read_quorum(read_id id) { if (p.id == _my_id) { handle_read_quorum_reply(_my_id, read_quorum_reply{_current_term, _commit_idx, id}); } else { - send_to(p.id, read_quorum{_current_term, _commit_idx, id}); + send_to(p.id, read_quorum{_current_term, std::min(p.match_idx, _commit_idx), id}); } } } diff --git a/raft/server.cc b/raft/server.cc index 7025ccf3d7..56fbdf53d8 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -35,6 +35,11 @@ struct active_read { optimized_optional abort; }; +struct awaited_index { + promise<> promise; + optimized_optional abort; +}; + static const seastar::metrics::label server_id_label("id"); static const seastar::metrics::label log_entry_type("log_entry_type"); static const seastar::metrics::label message_type("message_type"); @@ -99,7 +104,7 @@ private: // Index of the last entry applied to `_state_machine`. index_t _applied_idx; std::list _reads; - std::multimap> _awaited_indexes; + std::multimap _awaited_indexes; // Set to true when abort() is called bool _aborted = false; @@ -260,7 +265,7 @@ private: // Wait for a read barrier index to be applied. The index // is typically already committed, so we don't worry about the // term. - future<> wait_for_apply(index_t idx); + future<> wait_for_apply(index_t idx, abort_source*); // Set configuration but don't wait for transition joint -> // non_joint. future<> enter_joint_configuration(server_address_set c_new, seastar::abort_source* as); @@ -714,7 +719,7 @@ void server_impl::signal_applied() { if (it->first > _applied_idx) { break; } - it->second.set_value(); + it->second.promise.set_value(); it = _awaited_indexes.erase(it); } } @@ -1060,16 +1065,30 @@ term_t server_impl::get_current_term() const { return _fsm->get_current_term(); } -future<> server_impl::wait_for_apply(index_t idx) { +future<> server_impl::wait_for_apply(index_t idx, abort_source* as) { + if (as && as->abort_requested()) { + throw request_aborted(); + } if (idx > _applied_idx) { // The index is not applied yet. Wait for it. // This will be signalled when read_idx is applied - auto it = _awaited_indexes.emplace(idx, promise<>()); - co_await it->second.get_future(); + auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}}); + if (as) { + it->second.abort = as->subscribe([this, it] () noexcept { + it->second.promise.set_exception(request_aborted()); + _awaited_indexes.erase(it); + }); + assert(it->second.abort); + } + co_await it->second.promise.get_future(); } } future server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) { + if (_aborted) { + throw stopped_error(); + } + logger.trace("[{}] execute_read_barrier start", _id); std::optional> rid; @@ -1088,7 +1107,7 @@ future server_impl::execute_read_barrier(server_id from, sea return make_exception_future(request_aborted()); } _reads.push_back({rid->first, rid->second, {}, {}}); - auto read = --_reads.end(); + auto read = std::prev(_reads.end()); if (as) { read->abort = as->subscribe([this, read] () noexcept { read->promise.set_exception(request_aborted()); @@ -1129,7 +1148,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { // committed any entries yet, so wait for any entry to be // committed (if non were since start of the attempt) and retry. logger.trace("[{}] read_barrier leader not ready", _id); - co_await wait_for_apply(++applied); + co_await wait_for_apply(++applied, as); } else if (std::holds_alternative(res)) { leader = std::get(res).leader; } else { @@ -1139,7 +1158,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { } logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx); - co_return co_await wait_for_apply(read_idx); + co_return co_await wait_for_apply(read_idx, as); } void server_impl::abort_snapshot_transfer(server_id id) { @@ -1188,8 +1207,6 @@ future<> server_impl::abort() { // Destroy entry waiters before waiting for `abort_rpc`, // since the RPC implementation may wait for forwarded `modify_config` calls to finish // (and `modify_config` does not finish until the configuration entry is committed or an error occurs). - // FIXME: probably need to do the same with read barriers (`_reads`) - // (not doing it yet because I want to catch the problem first in nemesis test) for (auto& ac: _awaited_commits) { ac.second.done.set_exception(stopped_error()); } @@ -1199,12 +1216,6 @@ future<> server_impl::abort() { _awaited_commits.clear(); _awaited_applies.clear(); - co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result(); - - if (_leader_promise) { - _leader_promise->set_exception(stopped_error()); - } - // Complete all read attempts with not_a_leader for (auto& r: _reads) { r.promise.set_value(raft::not_a_leader{server_id{}}); @@ -1213,10 +1224,16 @@ future<> server_impl::abort() { // Abort all read_barriers with an exception for (auto& i : _awaited_indexes) { - i.second.set_exception(stopped_error()); + i.second.promise.set_exception(stopped_error()); } _awaited_indexes.clear(); + co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result(); + + if (_leader_promise) { + _leader_promise->set_exception(stopped_error()); + } + abort_snapshot_transfers(); auto snp_futures = _aborted_snapshot_transfers | boost::adaptors::map_values; diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 9499341725..3f3ad58174 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -334,6 +334,42 @@ future> call( }); } +template +using read_result_t = std::variant; + +// Performs a linearizable read by calling a `read_barrier` and then reading the local state of the server's state machine. +// Only to be used in forwarding mode. +// See `call` for the meanings of `timeout`, `timer`, `server` and `sm`. +template +future> read( + raft::logical_clock::time_point timeout, + logical_timer& timer, + raft::server& server, + impure_state_machine& sm) { + // FIXME: using lambda as workaround for clang bug #50345. + auto impl = [] (raft::logical_clock::time_point timeout, logical_timer& timer, + raft::server& server, impure_state_machine& sm) -> future> { + try { + co_await with_timeout(timer, timeout, [&] (abort_source& as) { + return server.read_barrier(&as); + }); + + co_return sm.state(); + } catch (raft::stopped_error e) { + co_return e; + } catch (seastar::timed_out_error e) { + co_return e; + } catch (raft::request_aborted&) { + co_return timed_out_error{}; + } catch (...) { + tlogger.error("unexpected exception from `read`: {}", std::current_exception()); + assert(false); + } + }; + + return impl(timeout, timer, server, sm); +} + // Allows a Raft server to communicate with other servers. // The implementation is mostly boilerplate. It assumes that there exists a method of message passing // given by a `send_message_t` function (passed in the constructor) for sending and by the `receive` @@ -742,7 +778,12 @@ public: static const raft::logical_clock::duration execute_read_barrier_on_leader_timeout = 20_t; // TODO: catch aborts from the abort_source as well - co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f)); + try { + co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f)); + } catch (logical_timer::timed_out& e) { + (void) e.get_future().discard_result().handle_exception_type([] (const broken_promise&) { }); + throw timed_out_error{}; + } // co_await ensures that `guard` is destroyed before we leave `_gate` }); } @@ -1440,6 +1481,19 @@ public: } } + future> read( + raft::logical_clock::time_point timeout, + logical_timer& timer) { + assert(_started); + try { + co_return co_await with_gate(_gate, [this, timeout, &timer] { + return ::read(timeout, timer, *_server, _sm); + }); + } catch (const gate_closed_exception&) { + co_return raft::stopped_error{}; + } + } + future reconfigure( const std::vector& ids, raft::logical_clock::time_point timeout, @@ -1784,6 +1838,28 @@ public: co_return res; } + future> read( + raft::server_id id, + raft::logical_clock::time_point timeout, + logical_timer& timer) { + auto& n = _routes.at(id); + if (!n._server) { + // As in `call`. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + + auto srv = n._server.get(); + auto res = co_await srv->read(timeout, timer); + + if (srv != n._server.get()) { + // As in `call`. + co_await timer.sleep_until(timeout); + co_return timed_out_error{}; + } + co_return res; + } + future reconfigure( raft::server_id id, const std::vector& ids, @@ -2364,6 +2440,41 @@ struct raft_call { } }; +// An operation representing a linearizable read from a Raft server. +// To be used only in forwarding mode. Doesn't bounce. +template +struct raft_read { + int32_t read_id; + raft::logical_clock::duration timeout; + + using result_type = std::pair>; + + struct state_type { + environment& env; + const std::unordered_set& known; + logical_timer& timer; + }; + + future execute(state_type& s, const operation::context& ctx) { + assert(s.known.size() > 0); + static std::mt19937 engine{0}; + + auto it = s.known.begin(); + std::advance(it, std::uniform_int_distribution{0, s.known.size() - 1}(engine)); + auto contact = *it; + + tlogger.debug("read start tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact); + auto res = co_await s.env.read(contact, s.timer.now() + timeout, s.timer); + tlogger.debug("read end tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact); + + co_return result_type{read_id, std::move(res)}; + } + + friend std::ostream& operator<<(std::ostream& os, const raft_read& r) { + return os << format("raft_read{{id:{}, timeout:{}}}", r.read_id, r.timeout); + } +}; + // An operation that partitions the network in half. // During the partition, no server from one half can contact any server from the other; // the partition is symmetric. @@ -2743,6 +2854,10 @@ struct append_reg_model { std::unordered_set returned; std::unordered_set in_progress; + // For each read, the element observed at the end of the model sequence + // at the moment the read has started. + std::unordered_map reads; + void invocation(elem_t x) { assert(!index.contains(x)); assert(!in_progress.contains(x)); @@ -2756,7 +2871,7 @@ struct append_reg_model { try { completion(x, prev); } catch (inconsistency& e) { - e.what += format("\nwhen completing elem: {}\nprev: {}\nmodel: {}", x, prev, seq); + e.what += format("\nwhen completing append: {}\nprev: {}\nmodel: {}", x, prev, seq); throw; } returned.insert(x); @@ -2769,6 +2884,38 @@ struct append_reg_model { in_progress.erase(x); } + void start_read(int32_t id) { + auto [_, inserted] = reads.emplace(id, seq.back().elem); + assert(inserted); + } + + void read_success(int32_t id, append_seq result) { + auto read = reads.find(id); + assert(read != reads.end()); + + size_t idx = 0; + for (; idx < result.size(); ++idx) { + if (result[idx] == read->second) { + break; + } + } + + if (idx == result.size()) { + throw inconsistency{format( + "read {} observed last model elem {} at start not present in result: {}", + id, read->second, result)}; + } + + try { + auto [prev, x] = result.pop(); + completion(x, prev); + } catch (inconsistency& e) { + e.what += format( + "\nwhen completing read id: {}, last model elem at start: {}\nread result: {}", + id, read->second, result); + } + } + private: void completion(elem_t x, append_seq prev) { if (prev.empty()) { @@ -2860,6 +3007,7 @@ std::ostream& operator<<(std::ostream& os, const AppendReg::ret& r) { SEASTAR_TEST_CASE(basic_generator_test) { using op_type = operation::invocable, + raft_read, network_majority_grudge, reconfiguration, stop_crash @@ -2905,6 +3053,10 @@ SEASTAR_TEST_CASE(basic_generator_test) { // threshold is 1024 log commands and we perform only 500 ops. bool frequent_snapshotting = bdist(random_engine); + bool nemesis_partitions = true; + bool nemesis_reconfigurations = true; + bool nemesis_crashes = true; + // TODO: randomize the snapshot thresholds between different servers for more chaos. auto srv_cfg = frequent_snapshotting ? raft::server::configuration { @@ -2969,6 +3121,12 @@ SEASTAR_TEST_CASE(basic_generator_test) { .timer = timer }; + raft_read::state_type read_state { + .env = env, + .known = known_config, + .timer = timer + }; + network_majority_grudge::state_type network_majority_grudge_state { .env = env, .known = known_config, @@ -2993,6 +3151,7 @@ SEASTAR_TEST_CASE(basic_generator_test) { auto init_state = op_type::state_type{ std::move(db_call_state), + std::move(read_state), std::move(network_majority_grudge_state), std::move(reconfiguration_state), std::move(crash_state) @@ -3011,30 +3170,46 @@ SEASTAR_TEST_CASE(basic_generator_test) { // ~= [4s, 8s] -> ~1/2 partitions should cause an election // we will set request timeout 600_t ~= 6s and partition every 1200_t ~= 12s - auto gen = op_limit(500, + auto num_ops = 500; + auto gen = op_limit(num_ops, pin(partition_thread, - stagger(seed, timer.now() + 200_t, 1200_t, 1200_t, - random(seed, [] (std::mt19937& engine) { - static std::uniform_int_distribution dist{400, 800}; - return op_type{network_majority_grudge{raft::logical_clock::duration{dist(engine)}}}; - }) + op_limit(nemesis_partitions ? num_ops : 0, + stagger(seed, timer.now() + 200_t, 1200_t, 1200_t, + random(seed, [] (std::mt19937& engine) { + static std::uniform_int_distribution dist{400, 800}; + return op_type{network_majority_grudge{raft::logical_clock::duration{dist(engine)}}}; + }) + ) ), pin(reconfig_thread, - stagger(seed, timer.now() + 1000_t, 500_t, 500_t, - constant([] () { return op_type{reconfiguration{500_t}}; }) + op_limit(nemesis_reconfigurations ? num_ops : 0, + stagger(seed, timer.now() + 1000_t, 500_t, 500_t, + constant([] () { return op_type{reconfiguration{500_t}}; }) + ) ), pin(crash_thread, - stagger(seed, timer.now() + 200_t, 100_t, 200_t, - random(seed, [] (std::mt19937& engine) { - static std::uniform_int_distribution dist{0, 100}; - return op_type{stop_crash{raft::logical_clock::duration{dist(engine)}}}; - }) + op_limit(nemesis_crashes ? num_ops : 0, + stagger(seed, timer.now() + 200_t, 100_t, 200_t, + random(seed, [] (std::mt19937& engine) { + static std::uniform_int_distribution dist{0, 100}; + return op_type{stop_crash{raft::logical_clock::duration{dist(engine)}}}; + }) + ) ), - stagger(seed, timer.now(), 0_t, 50_t, - sequence(1, [] (int32_t i) { - assert(i > 0); - return op_type{raft_call{AppendReg::append{i}, 200_t}}; - }) + either( + stagger(seed, timer.now(), 0_t, 50_t, + sequence(1, [] (int32_t i) { + assert(i > 0); + return op_type{raft_call{AppendReg::append{i}, 200_t}}; + }) + ), + op_limit(forwarding ? num_ops : 0 /* only produce raft_reads in forwarding mode */, + stagger(seed, timer.now(), 0_t, 200_t, + sequence(1, [] (int32_t i) { + return op_type{raft_read{i, 200_t}}; + }) + ) + ) ) ) ) @@ -3060,6 +3235,9 @@ SEASTAR_TEST_CASE(basic_generator_test) { if (auto call_op = std::get_if>(&o.op)) { ++_stats.invocations; _model.invocation(call_op->input.x); + } else if (auto read_op = std::get_if>(&o.op)) { + ++_stats.invocations; + _model.start_read(read_op->read_id); } } @@ -3089,6 +3267,18 @@ SEASTAR_TEST_CASE(basic_generator_test) { ++_stats.failures; } ), *call_res); + } else if (auto read_res = std::get_if::result_type>(res)) { + std::visit(make_visitor( + [this, id = read_res->first] (AppendReg::state_t& s) { + tlogger.debug("read completion id: {} digest: {}", id, s.digest()); + + ++_stats.successes; + _model.read_success(id, std::move(s)); + }, + [this] (auto&) { + ++_stats.failures; + } + ), read_res->second); } else { tlogger.debug("completion {}", c); }