Merge 'raft: test read_barriers in randomized_nemesis_test' from Kamil Braun

Introduce a new operation, `raft_read`, which calls `read_barrier`
on a server, reads the state of the server's state machine, and returns
that state.

Extend the generator in `basic_generator_test` to generate `raft_read`s.
Only do it if forwarding is enabled (although it may make sense to test
read barriers in non-forwarding scenario as well - we may think about it
and do it in a follow-up).

Check the consistency of the read results by comparing them with the model
and using the result to extend the model with any newly observed elements.

The patchset includes some fixes for correctness (#10578)
and liveness (handling aborts correctly).

Closes #10561

* github.com:scylladb/scylla:
  test: raft: randomized_nemesis_test: check consistency of reads
  test: raft: randomized_nemesis_test: perform linearizable reads using read_barriers
  test: raft: randomized_nemesis_test: add flags for disabling nemeses
  raft: server: in `abort()`, abort read barriers before waiting for rpc abort
  raft: server: handle aborts correctly in `read_barrier`
  raft: fsm: don't advance commit index further than match_idx during read_quorum
This commit is contained in:
Tomasz Grabiec
2022-05-26 16:46:35 +02:00
3 changed files with 246 additions and 39 deletions

View File

@@ -1044,7 +1044,7 @@ void fsm::broadcast_read_quorum(read_id id) {
if (p.id == _my_id) {
handle_read_quorum_reply(_my_id, read_quorum_reply{_current_term, _commit_idx, id});
} else {
send_to(p.id, read_quorum{_current_term, _commit_idx, id});
send_to(p.id, read_quorum{_current_term, std::min(p.match_idx, _commit_idx), id});
}
}
}

View File

@@ -35,6 +35,11 @@ struct active_read {
optimized_optional<abort_source::subscription> abort;
};
struct awaited_index {
promise<> promise;
optimized_optional<abort_source::subscription> abort;
};
static const seastar::metrics::label server_id_label("id");
static const seastar::metrics::label log_entry_type("log_entry_type");
static const seastar::metrics::label message_type("message_type");
@@ -99,7 +104,7 @@ private:
// Index of the last entry applied to `_state_machine`.
index_t _applied_idx;
std::list<active_read> _reads;
std::multimap<index_t, promise<>> _awaited_indexes;
std::multimap<index_t, awaited_index> _awaited_indexes;
// Set to true when abort() is called
bool _aborted = false;
@@ -260,7 +265,7 @@ private:
// Wait for a read barrier index to be applied. The index
// is typically already committed, so we don't worry about the
// term.
future<> wait_for_apply(index_t idx);
future<> wait_for_apply(index_t idx, abort_source*);
// Set configuration but don't wait for transition joint ->
// non_joint.
future<> enter_joint_configuration(server_address_set c_new, seastar::abort_source* as);
@@ -714,7 +719,7 @@ void server_impl::signal_applied() {
if (it->first > _applied_idx) {
break;
}
it->second.set_value();
it->second.promise.set_value();
it = _awaited_indexes.erase(it);
}
}
@@ -1060,16 +1065,30 @@ term_t server_impl::get_current_term() const {
return _fsm->get_current_term();
}
future<> server_impl::wait_for_apply(index_t idx) {
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
if (as && as->abort_requested()) {
throw request_aborted();
}
if (idx > _applied_idx) {
// The index is not applied yet. Wait for it.
// This will be signalled when read_idx is applied
auto it = _awaited_indexes.emplace(idx, promise<>());
co_await it->second.get_future();
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
if (as) {
it->second.abort = as->subscribe([this, it] () noexcept {
it->second.promise.set_exception(request_aborted());
_awaited_indexes.erase(it);
});
assert(it->second.abort);
}
co_await it->second.promise.get_future();
}
}
future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) {
if (_aborted) {
throw stopped_error();
}
logger.trace("[{}] execute_read_barrier start", _id);
std::optional<std::pair<read_id, index_t>> rid;
@@ -1088,7 +1107,7 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
return make_exception_future<read_barrier_reply>(request_aborted());
}
_reads.push_back({rid->first, rid->second, {}, {}});
auto read = --_reads.end();
auto read = std::prev(_reads.end());
if (as) {
read->abort = as->subscribe([this, read] () noexcept {
read->promise.set_exception(request_aborted());
@@ -1129,7 +1148,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
// committed any entries yet, so wait for any entry to be
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied);
co_await wait_for_apply(++applied, as);
} else if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
} else {
@@ -1139,7 +1158,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
}
logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx);
co_return co_await wait_for_apply(read_idx);
co_return co_await wait_for_apply(read_idx, as);
}
void server_impl::abort_snapshot_transfer(server_id id) {
@@ -1188,8 +1207,6 @@ future<> server_impl::abort() {
// Destroy entry waiters before waiting for `abort_rpc`,
// since the RPC implementation may wait for forwarded `modify_config` calls to finish
// (and `modify_config` does not finish until the configuration entry is committed or an error occurs).
// FIXME: probably need to do the same with read barriers (`_reads`)
// (not doing it yet because I want to catch the problem first in nemesis test)
for (auto& ac: _awaited_commits) {
ac.second.done.set_exception(stopped_error());
}
@@ -1199,12 +1216,6 @@ future<> server_impl::abort() {
_awaited_commits.clear();
_awaited_applies.clear();
co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result();
if (_leader_promise) {
_leader_promise->set_exception(stopped_error());
}
// Complete all read attempts with not_a_leader
for (auto& r: _reads) {
r.promise.set_value(raft::not_a_leader{server_id{}});
@@ -1213,10 +1224,16 @@ future<> server_impl::abort() {
// Abort all read_barriers with an exception
for (auto& i : _awaited_indexes) {
i.second.set_exception(stopped_error());
i.second.promise.set_exception(stopped_error());
}
_awaited_indexes.clear();
co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result();
if (_leader_promise) {
_leader_promise->set_exception(stopped_error());
}
abort_snapshot_transfers();
auto snp_futures = _aborted_snapshot_transfers | boost::adaptors::map_values;

View File

@@ -334,6 +334,42 @@ future<call_result_t<M>> call(
});
}
template <PureStateMachine M>
using read_result_t = std::variant<typename M::state_t, timed_out_error, raft::stopped_error>;
// Performs a linearizable read by calling a `read_barrier` and then reading the local state of the server's state machine.
// Only to be used in forwarding mode.
// See `call` for the meanings of `timeout`, `timer`, `server` and `sm`.
template <PureStateMachine M>
future<read_result_t<M>> read(
raft::logical_clock::time_point timeout,
logical_timer& timer,
raft::server& server,
impure_state_machine<M>& sm) {
// FIXME: using lambda as workaround for clang bug #50345.
auto impl = [] (raft::logical_clock::time_point timeout, logical_timer& timer,
raft::server& server, impure_state_machine<M>& sm) -> future<read_result_t<M>> {
try {
co_await with_timeout(timer, timeout, [&] (abort_source& as) {
return server.read_barrier(&as);
});
co_return sm.state();
} catch (raft::stopped_error e) {
co_return e;
} catch (seastar::timed_out_error e) {
co_return e;
} catch (raft::request_aborted&) {
co_return timed_out_error{};
} catch (...) {
tlogger.error("unexpected exception from `read`: {}", std::current_exception());
assert(false);
}
};
return impl(timeout, timer, server, sm);
}
// Allows a Raft server to communicate with other servers.
// The implementation is mostly boilerplate. It assumes that there exists a method of message passing
// given by a `send_message_t` function (passed in the constructor) for sending and by the `receive`
@@ -742,7 +778,12 @@ public:
static const raft::logical_clock::duration execute_read_barrier_on_leader_timeout = 20_t;
// TODO: catch aborts from the abort_source as well
co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f));
try {
co_return co_await _timer.with_timeout(_timer.now() + execute_read_barrier_on_leader_timeout, std::move(f));
} catch (logical_timer::timed_out<raft::read_barrier_reply>& e) {
(void) e.get_future().discard_result().handle_exception_type([] (const broken_promise&) { });
throw timed_out_error{};
}
// co_await ensures that `guard` is destroyed before we leave `_gate`
});
}
@@ -1440,6 +1481,19 @@ public:
}
}
future<read_result_t<M>> read(
raft::logical_clock::time_point timeout,
logical_timer& timer) {
assert(_started);
try {
co_return co_await with_gate(_gate, [this, timeout, &timer] {
return ::read(timeout, timer, *_server, _sm);
});
} catch (const gate_closed_exception&) {
co_return raft::stopped_error{};
}
}
future<reconfigure_result_t> reconfigure(
const std::vector<raft::server_id>& ids,
raft::logical_clock::time_point timeout,
@@ -1784,6 +1838,28 @@ public:
co_return res;
}
future<read_result_t<M>> read(
raft::server_id id,
raft::logical_clock::time_point timeout,
logical_timer& timer) {
auto& n = _routes.at(id);
if (!n._server) {
// As in `call`.
co_await timer.sleep_until(timeout);
co_return timed_out_error{};
}
auto srv = n._server.get();
auto res = co_await srv->read(timeout, timer);
if (srv != n._server.get()) {
// As in `call`.
co_await timer.sleep_until(timeout);
co_return timed_out_error{};
}
co_return res;
}
future<reconfigure_result_t> reconfigure(
raft::server_id id,
const std::vector<raft::server_id>& ids,
@@ -2364,6 +2440,41 @@ struct raft_call {
}
};
// An operation representing a linearizable read from a Raft server.
// To be used only in forwarding mode. Doesn't bounce.
template <PureStateMachine M>
struct raft_read {
int32_t read_id;
raft::logical_clock::duration timeout;
using result_type = std::pair<int32_t, read_result_t<M>>;
struct state_type {
environment<M>& env;
const std::unordered_set<raft::server_id>& known;
logical_timer& timer;
};
future<result_type> execute(state_type& s, const operation::context& ctx) {
assert(s.known.size() > 0);
static std::mt19937 engine{0};
auto it = s.known.begin();
std::advance(it, std::uniform_int_distribution<size_t>{0, s.known.size() - 1}(engine));
auto contact = *it;
tlogger.debug("read start tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact);
auto res = co_await s.env.read(contact, s.timer.now() + timeout, s.timer);
tlogger.debug("read end tid {} start time {} current time {} contact {}", ctx.thread, ctx.start, s.timer.now(), contact);
co_return result_type{read_id, std::move(res)};
}
friend std::ostream& operator<<(std::ostream& os, const raft_read& r) {
return os << format("raft_read{{id:{}, timeout:{}}}", r.read_id, r.timeout);
}
};
// An operation that partitions the network in half.
// During the partition, no server from one half can contact any server from the other;
// the partition is symmetric.
@@ -2743,6 +2854,10 @@ struct append_reg_model {
std::unordered_set<elem_t> returned;
std::unordered_set<elem_t> in_progress;
// For each read, the element observed at the end of the model sequence
// at the moment the read has started.
std::unordered_map<int32_t, elem_t> reads;
void invocation(elem_t x) {
assert(!index.contains(x));
assert(!in_progress.contains(x));
@@ -2756,7 +2871,7 @@ struct append_reg_model {
try {
completion(x, prev);
} catch (inconsistency& e) {
e.what += format("\nwhen completing elem: {}\nprev: {}\nmodel: {}", x, prev, seq);
e.what += format("\nwhen completing append: {}\nprev: {}\nmodel: {}", x, prev, seq);
throw;
}
returned.insert(x);
@@ -2769,6 +2884,38 @@ struct append_reg_model {
in_progress.erase(x);
}
void start_read(int32_t id) {
auto [_, inserted] = reads.emplace(id, seq.back().elem);
assert(inserted);
}
void read_success(int32_t id, append_seq result) {
auto read = reads.find(id);
assert(read != reads.end());
size_t idx = 0;
for (; idx < result.size(); ++idx) {
if (result[idx] == read->second) {
break;
}
}
if (idx == result.size()) {
throw inconsistency{format(
"read {} observed last model elem {} at start not present in result: {}",
id, read->second, result)};
}
try {
auto [prev, x] = result.pop();
completion(x, prev);
} catch (inconsistency& e) {
e.what += format(
"\nwhen completing read id: {}, last model elem at start: {}\nread result: {}",
id, read->second, result);
}
}
private:
void completion(elem_t x, append_seq prev) {
if (prev.empty()) {
@@ -2860,6 +3007,7 @@ std::ostream& operator<<(std::ostream& os, const AppendReg::ret& r) {
SEASTAR_TEST_CASE(basic_generator_test) {
using op_type = operation::invocable<operation::either_of<
raft_call<AppendReg>,
raft_read<AppendReg>,
network_majority_grudge<AppendReg>,
reconfiguration<AppendReg>,
stop_crash<AppendReg>
@@ -2905,6 +3053,10 @@ SEASTAR_TEST_CASE(basic_generator_test) {
// threshold is 1024 log commands and we perform only 500 ops.
bool frequent_snapshotting = bdist(random_engine);
bool nemesis_partitions = true;
bool nemesis_reconfigurations = true;
bool nemesis_crashes = true;
// TODO: randomize the snapshot thresholds between different servers for more chaos.
auto srv_cfg = frequent_snapshotting
? raft::server::configuration {
@@ -2969,6 +3121,12 @@ SEASTAR_TEST_CASE(basic_generator_test) {
.timer = timer
};
raft_read<AppendReg>::state_type read_state {
.env = env,
.known = known_config,
.timer = timer
};
network_majority_grudge<AppendReg>::state_type network_majority_grudge_state {
.env = env,
.known = known_config,
@@ -2993,6 +3151,7 @@ SEASTAR_TEST_CASE(basic_generator_test) {
auto init_state = op_type::state_type{
std::move(db_call_state),
std::move(read_state),
std::move(network_majority_grudge_state),
std::move(reconfiguration_state),
std::move(crash_state)
@@ -3011,30 +3170,46 @@ SEASTAR_TEST_CASE(basic_generator_test) {
// ~= [4s, 8s] -> ~1/2 partitions should cause an election
// we will set request timeout 600_t ~= 6s and partition every 1200_t ~= 12s
auto gen = op_limit(500,
auto num_ops = 500;
auto gen = op_limit(num_ops,
pin(partition_thread,
stagger(seed, timer.now() + 200_t, 1200_t, 1200_t,
random(seed, [] (std::mt19937& engine) {
static std::uniform_int_distribution<raft::logical_clock::rep> dist{400, 800};
return op_type{network_majority_grudge<AppendReg>{raft::logical_clock::duration{dist(engine)}}};
})
op_limit(nemesis_partitions ? num_ops : 0,
stagger(seed, timer.now() + 200_t, 1200_t, 1200_t,
random(seed, [] (std::mt19937& engine) {
static std::uniform_int_distribution<raft::logical_clock::rep> dist{400, 800};
return op_type{network_majority_grudge<AppendReg>{raft::logical_clock::duration{dist(engine)}}};
})
)
),
pin(reconfig_thread,
stagger(seed, timer.now() + 1000_t, 500_t, 500_t,
constant([] () { return op_type{reconfiguration<AppendReg>{500_t}}; })
op_limit(nemesis_reconfigurations ? num_ops : 0,
stagger(seed, timer.now() + 1000_t, 500_t, 500_t,
constant([] () { return op_type{reconfiguration<AppendReg>{500_t}}; })
)
),
pin(crash_thread,
stagger(seed, timer.now() + 200_t, 100_t, 200_t,
random(seed, [] (std::mt19937& engine) {
static std::uniform_int_distribution<raft::logical_clock::rep> dist{0, 100};
return op_type{stop_crash<AppendReg>{raft::logical_clock::duration{dist(engine)}}};
})
op_limit(nemesis_crashes ? num_ops : 0,
stagger(seed, timer.now() + 200_t, 100_t, 200_t,
random(seed, [] (std::mt19937& engine) {
static std::uniform_int_distribution<raft::logical_clock::rep> dist{0, 100};
return op_type{stop_crash<AppendReg>{raft::logical_clock::duration{dist(engine)}}};
})
)
),
stagger(seed, timer.now(), 0_t, 50_t,
sequence(1, [] (int32_t i) {
assert(i > 0);
return op_type{raft_call<AppendReg>{AppendReg::append{i}, 200_t}};
})
either(
stagger(seed, timer.now(), 0_t, 50_t,
sequence(1, [] (int32_t i) {
assert(i > 0);
return op_type{raft_call<AppendReg>{AppendReg::append{i}, 200_t}};
})
),
op_limit(forwarding ? num_ops : 0 /* only produce raft_reads in forwarding mode */,
stagger(seed, timer.now(), 0_t, 200_t,
sequence(1, [] (int32_t i) {
return op_type{raft_read<AppendReg>{i, 200_t}};
})
)
)
)
)
)
@@ -3060,6 +3235,9 @@ SEASTAR_TEST_CASE(basic_generator_test) {
if (auto call_op = std::get_if<raft_call<AppendReg>>(&o.op)) {
++_stats.invocations;
_model.invocation(call_op->input.x);
} else if (auto read_op = std::get_if<raft_read<AppendReg>>(&o.op)) {
++_stats.invocations;
_model.start_read(read_op->read_id);
}
}
@@ -3089,6 +3267,18 @@ SEASTAR_TEST_CASE(basic_generator_test) {
++_stats.failures;
}
), *call_res);
} else if (auto read_res = std::get_if<raft_read<AppendReg>::result_type>(res)) {
std::visit(make_visitor(
[this, id = read_res->first] (AppendReg::state_t& s) {
tlogger.debug("read completion id: {} digest: {}", id, s.digest());
++_stats.successes;
_model.read_success(id, std::move(s));
},
[this] (auto&) {
++_stats.failures;
}
), read_res->second);
} else {
tlogger.debug("completion {}", c);
}