From 324a08295dbdaa768e777d250bc64fdc49101cbe Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Fri, 8 May 2026 22:42:20 +0200 Subject: [PATCH] error_injection: add non-shared mode to wait_for_message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a 'share' parameter to wait_for_message (default true, preserving existing behavior). When share=false, each handler invocation requires its own dedicated message to proceed — a message consumed by one handler is not visible to others. Use share=false for the pause_before_barrier_and_drain injection in raft_topology_cmd_handler. The topology coordinator sends multiple barrier_and_drain RPCs during a single topology transition (one per state change). With share=true a single message_injection call releases all handlers. With share=false the test can release them one at a time, controlling exactly which topology state the write handler's ERM captures. --- service/storage_service.cc | 5 ++++- utils/error_injection.hh | 13 +++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 031c14923d..d028514f11 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4473,7 +4473,10 @@ future<> storage_service::local_topology_barrier() { throw std::runtime_error("raft_topology_barrier_and_drain_fail_before injected exception"); }); - co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5))); + // share=false: each barrier_and_drain invocation needs its own message + // to proceed. Without this, a single message_injection call would release + // all past and future handlers sharing the same injection. + co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5), nullptr, false)); if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) { for (auto& n : _topology_state_machine._topology.transition_nodes) { if (!_address_map.find(locator::host_id{n.first.uuid()})) { diff --git a/utils/error_injection.hh b/utils/error_injection.hh index bca37f188b..560c7feb7d 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -45,11 +45,16 @@ using error_injection_parameters = std::unordered_map; // in class error_injection below). Parameters: // timeout - the timeout after which the pause is aborted // as (optional) - abort_source used to abort the pause +// share - if true (default), injection handlers share received messages: +// every message can be received by all handlers (even if they start +// waiting in the future). If false, only one handler can receive a +// specific message; other handlers will wait for new messages. struct wait_for_message { std::chrono::milliseconds timeout; - abort_source* as = nullptr; - wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {} - wait_for_message(std::chrono::milliseconds tmo, abort_source* a) noexcept : timeout(tmo), as(a) {} + abort_source* as; + bool share; + wait_for_message(std::chrono::milliseconds tmo, abort_source* a = nullptr, bool share_ = true) noexcept + : timeout(tmo), as(a), share(share_) {} }; /** @@ -552,7 +557,7 @@ public: errinj_logger.info("{}: waiting for message", name); co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout, wfm.as); errinj_logger.info("{}: message received", name); - }); + }, wfm.share); } template