diff --git a/test/boost/error_injection_test.cc b/test/boost/error_injection_test.cc index 711a96d637..44c69605c7 100644 --- a/test/boost/error_injection_test.cc +++ b/test/boost/error_injection_test.cc @@ -344,6 +344,102 @@ SEASTAR_TEST_CASE(test_inject_message) { } } +SEASTAR_TEST_CASE(test_inject_unshared_message) { + testing::scoped_no_abort_on_internal_error abort_guard; + utils::error_injection errinj; + + auto timeout = db::timeout_clock::now() + 5s; + + errinj.enable("injection1"); + { + // Test receiving enough unshared messages + auto f1 = errinj.inject("injection1", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout), false); + auto f2 = errinj.inject("injection1", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout), false); + + for (size_t i = 0; i < 4; ++i) { + errinj.receive_message("injection1"); + } + + BOOST_REQUIRE_NO_THROW(co_await std::move(f1)); + BOOST_REQUIRE_NO_THROW(co_await std::move(f2)); + } + errinj.disable("injection1"); + + errinj.enable("injection2"); + { + // Test receiving enough unshared messages before waiting for them + errinj.receive_message("injection2"); + errinj.receive_message("injection2"); + + auto f1 = errinj.inject("injection2", [timeout] (auto& handler) { + return handler.wait_for_message(timeout); + }, false); + auto f2 = errinj.inject("injection2", [timeout] (auto& handler) { + return handler.wait_for_message(timeout); + }, false); + + BOOST_REQUIRE_NO_THROW(co_await std::move(f1)); + BOOST_REQUIRE_NO_THROW(co_await std::move(f2)); + } + errinj.disable("injection2"); + + errinj.enable("injection3"); + { + // Test receiving not enough unshared messages + auto timeout_1s = db::timeout_clock::now() + 1s; + + auto f1 = errinj.inject("injection3", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout_1s), false); + auto f2 = errinj.inject("injection3", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout_1s), false); + + for (size_t i = 0; i < 3; ++i) { + errinj.receive_message("injection3"); + } + + BOOST_REQUIRE_THROW(co_await when_all_succeed(std::move(f1), std::move(f2)).discard_result(), std::runtime_error); + } + errinj.disable("injection3"); + + errinj.enable("injection4"); + { + // Test handlers sharing messages are independent of the not sharing ones + auto f1 = errinj.inject("injection4", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout), true); + auto f2 = errinj.inject("injection4", std::bind_front([] (auto timeout, auto& handler) -> future<> { + co_await handler.wait_for_message(timeout); + co_await handler.wait_for_message(timeout); + }, timeout), true); + auto f3 = errinj.inject("injection4", [timeout] (auto& handler) { + return handler.wait_for_message(timeout); + }, false); + auto f4 = errinj.inject("injection4", [timeout] (auto& handler) { + return handler.wait_for_message(timeout); + }, false); + + errinj.receive_message("injection4"); + errinj.receive_message("injection4"); + + BOOST_REQUIRE_NO_THROW(co_await std::move(f1)); + BOOST_REQUIRE_NO_THROW(co_await std::move(f2)); + BOOST_REQUIRE_NO_THROW(co_await std::move(f3)); + BOOST_REQUIRE_NO_THROW(co_await std::move(f4)); + } + errinj.disable("injection4"); +} + SEASTAR_TEST_CASE(test_inject_with_parameters) { utils::error_injection errinj; diff --git a/utils/error_injection.hh b/utils/error_injection.hh index 40bc28345b..06a9ee0e9c 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -98,10 +98,11 @@ using error_injection_parameters = std::unordered_map; * }, f); * Expected use case: emulate custom errors like timeouts. * - * 4. inject(name, future<> func(injection_handler&)) + * 4. inject(name, future<> func(injection_handler&), share_messages) * Inserts code that can wait for an event. * Requires func to be a function taking an injection_handler reference and - * returning a future<>. + * returning a future<>. Depending on the share_messages value, + * handlers can share events or not. * Expected use case: wait for an event from tests. */ @@ -116,6 +117,7 @@ class error_injection { */ struct injection_shared_data { size_t received_message_count{0}; + size_t shared_read_message_count{0}; condition_variable received_message_cv; error_injection_parameters parameters; @@ -129,13 +131,27 @@ public: * The injection handler class is used to wait for events inside the injected code. * If multiple inject (with handler) are called concurrently for the same injection_name, * all of them will have separate handlers. + * + * Handlers can be of two types depending on the share_messages value passed to inject + * (with handler): + * 1. By default, handlers share received messages. It means that every message can be + * received by all handlers (even if they start waiting in the future). + * 2. When handlers do not share received messages, only one can receive a specific + * message. Other handlers will wait for new messages. + * + * For a single injection, these two types of handlers are independent. A handler of one + * type never impacts a handler of the second type. + * + * In most cases, using the default type is sufficient or required. The second type + * allows waiting for new messages during every execution of the injected code. */ class injection_handler: public bi::list_base_hook> { lw_shared_ptr _shared_data; size_t _read_messages_counter{0}; + bool _share_messages; - explicit injection_handler(lw_shared_ptr shared_data) - : _shared_data(std::move(shared_data)) {} + explicit injection_handler(lw_shared_ptr shared_data, bool share_messages) + : _shared_data(std::move(shared_data)), _share_messages(share_messages) {} public: template @@ -146,6 +162,15 @@ public: try { co_await _shared_data->received_message_cv.wait(timeout, [&] { + if (!_share_messages) { + bool wakes_up = _shared_data->shared_read_message_count < _shared_data->received_message_count; + if (wakes_up) { + // Increase shared_read_message_count here, so other sharing handlers don't wake up. + ++_shared_data->shared_read_message_count; + } + return wakes_up; + } + return _read_messages_counter < _shared_data->received_message_count; }); } @@ -162,10 +187,14 @@ public: on_internal_error(errinj_logger, "injection_shared_data is not initialized"); } - if (_read_messages_counter < _shared_data->received_message_count) { + if (_share_messages && _read_messages_counter < _shared_data->received_message_count) { ++_read_messages_counter; return true; } + if (!_share_messages && _shared_data->shared_read_message_count < _shared_data->received_message_count) { + ++_shared_data->shared_read_message_count; + return true; + } return false; } @@ -192,9 +221,17 @@ private: * which is created once when enabling an injection on a given shard, and all injection_handlers, * that are created separately for each firing of this injection. * - the counter is incremented when receiving a message from the REST endpoint and the condition variable is signaled. + * + * Handlers sharing messages: * - each injection_handler (separate for each firing) stores its own private counter, _read_messages_counter. * - that private counter is incremented whenever we wait for a message, and compared to the received counter. * We sleep on the condition variable if not enough messages were received. + * + * Handlers not sharing messages: + * - injection_shared_data stores a counter, shared_read_message_count, which is shared by all handlers. + * - that shared counter is incremented whenever a handler finishes waiting for a message. While waiting for + * a message, a handler compares this counter to the received counter. It sleeps on the condition variable if + * they are equal. */ struct injection_data { bool one_shot; @@ -393,7 +430,8 @@ public: // \brief Inject exception // \param func function returning a future and taking an injection handler - future<> inject(const std::string_view& name, waiting_handler_fun func) { + // \param share_messages if true, injection handlers share received messages + future<> inject(const std::string_view& name, waiting_handler_fun func, bool share_messages = true) { auto* data = get_data(name); if (!data) { co_return; @@ -407,7 +445,7 @@ public: } errinj_logger.debug("Triggering injection \"{}\" with injection handler", name); - injection_handler handler(data->shared_data); + injection_handler handler(data->shared_data, share_messages); data->handlers.push_back(handler); auto disable_one_shot = defer([this, one_shot, name = sstring(name)] { @@ -535,7 +573,7 @@ public: // \brief Inject exception // \param func function returning a future and taking an injection handler [[gnu::always_inline]] - future<> inject(const std::string_view& name, waiting_handler_fun func) { + future<> inject(const std::string_view& name, waiting_handler_fun func, bool share_messages = true) { return make_ready_future<>(); }