diff --git a/service/storage_service.cc b/service/storage_service.cc index 031c14923d..d028514f11 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4473,7 +4473,10 @@ future<> storage_service::local_topology_barrier() { throw std::runtime_error("raft_topology_barrier_and_drain_fail_before injected exception"); }); - co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5))); + // share=false: each barrier_and_drain invocation needs its own message + // to proceed. Without this, a single message_injection call would release + // all past and future handlers sharing the same injection. + co_await utils::get_local_injector().inject("pause_before_barrier_and_drain", utils::wait_for_message(std::chrono::minutes(5), nullptr, false)); if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) { for (auto& n : _topology_state_machine._topology.transition_nodes) { if (!_address_map.find(locator::host_id{n.first.uuid()})) { diff --git a/utils/error_injection.hh b/utils/error_injection.hh index bca37f188b..560c7feb7d 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -45,11 +45,16 @@ using error_injection_parameters = std::unordered_map; // in class error_injection below). Parameters: // timeout - the timeout after which the pause is aborted // as (optional) - abort_source used to abort the pause +// share - if true (default), injection handlers share received messages: +// every message can be received by all handlers (even if they start +// waiting in the future). If false, only one handler can receive a +// specific message; other handlers will wait for new messages. struct wait_for_message { std::chrono::milliseconds timeout; - abort_source* as = nullptr; - wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {} - wait_for_message(std::chrono::milliseconds tmo, abort_source* a) noexcept : timeout(tmo), as(a) {} + abort_source* as; + bool share; + wait_for_message(std::chrono::milliseconds tmo, abort_source* a = nullptr, bool share_ = true) noexcept + : timeout(tmo), as(a), share(share_) {} }; /** @@ -552,7 +557,7 @@ public: errinj_logger.info("{}: waiting for message", name); co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout, wfm.as); errinj_logger.info("{}: message received", name); - }); + }, wfm.share); } template