mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-21 23:32:15 +00:00
error_injection: add non-shared mode to wait_for_message
Add a 'share' parameter to wait_for_message (default true, preserving existing behavior). When share=false, each handler invocation requires its own dedicated message to proceed — a message consumed by one handler is not visible to others. Use share=false for the pause_before_barrier_and_drain injection in raft_topology_cmd_handler. The topology coordinator sends multiple barrier_and_drain RPCs during a single topology transition (one per state change). With share=true a single message_injection call releases all handlers. With share=false the test can release them one at a time, controlling exactly which topology state the write handler's ERM captures.
This commit is contained in:
@@ -4473,7 +4473,10 @@ future<> storage_service::local_topology_barrier() {
|
||||
throw std::runtime_error("raft_topology_barrier_and_drain_fail_before injected exception");
|
||||
});
|
||||
|
||||
co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5)));
|
||||
// share=false: each barrier_and_drain invocation needs its own message
|
||||
// to proceed. Without this, a single message_injection call would release
|
||||
// all past and future handlers sharing the same injection.
|
||||
co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5), nullptr, false));
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) {
|
||||
for (auto& n : _topology_state_machine._topology.transition_nodes) {
|
||||
if (!_address_map.find(locator::host_id{n.first.uuid()})) {
|
||||
|
||||
@@ -45,11 +45,16 @@ using error_injection_parameters = std::unordered_map<sstring, sstring>;
|
||||
// in class error_injection below). Parameters:
|
||||
// timeout - the timeout after which the pause is aborted
|
||||
// as (optional) - abort_source used to abort the pause
|
||||
// share - if true (default), injection handlers share received messages:
|
||||
// every message can be received by all handlers (even if they start
|
||||
// waiting in the future). If false, only one handler can receive a
|
||||
// specific message; other handlers will wait for new messages.
|
||||
struct wait_for_message {
|
||||
std::chrono::milliseconds timeout;
|
||||
abort_source* as = nullptr;
|
||||
wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {}
|
||||
wait_for_message(std::chrono::milliseconds tmo, abort_source* a) noexcept : timeout(tmo), as(a) {}
|
||||
abort_source* as;
|
||||
bool share;
|
||||
wait_for_message(std::chrono::milliseconds tmo, abort_source* a = nullptr, bool share_ = true) noexcept
|
||||
: timeout(tmo), as(a), share(share_) {}
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -552,7 +557,7 @@ public:
|
||||
errinj_logger.info("{}: waiting for message", name);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout, wfm.as);
|
||||
errinj_logger.info("{}: message received", name);
|
||||
});
|
||||
}, wfm.share);
|
||||
}
|
||||
|
||||
template <typename T = std::string_view>
|
||||
|
||||
Reference in New Issue
Block a user