error_injection: allow injection handlers to not share messages

For a single injection, all created injection handlers share all
received messages. In particular, it means that one received message
unblocks all handlers waiting for the first message. This behavior
is often desired, for example, if multiple fibers execute the
injected code and we want to unblock them all with a single message.
However, there is a problem if we want to block every execution
of the injected code. Apart from the first created handler, all
handlers will be instantly unblocked by messages from the past that
have already unblocked the first handler.

In one of the following commits, we add a test that needs to block
the CDC generation publisher's loop twice. Since it looks like there
are no good workarounds for this arguably general problem, we extend
injections with handlers in a way that solves it. We introduce the
new `share_messages` parameter. Depending on its value, handlers
will share messages or not. The details are described in the new
comments in `error_injection.hh`.

We also add some basic unit tests for the new funcionality.
This commit is contained in:
Patryk Jędrzejczak
2024-03-18 15:36:50 +01:00
parent 628017c810
commit c5c4cc7d00
2 changed files with 142 additions and 8 deletions

View File

@@ -344,6 +344,102 @@ SEASTAR_TEST_CASE(test_inject_message) {
}
}
SEASTAR_TEST_CASE(test_inject_unshared_message) {
testing::scoped_no_abort_on_internal_error abort_guard;
utils::error_injection<true> errinj;
auto timeout = db::timeout_clock::now() + 5s;
errinj.enable("injection1");
{
// Test receiving enough unshared messages
auto f1 = errinj.inject("injection1", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout), false);
auto f2 = errinj.inject("injection1", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout), false);
for (size_t i = 0; i < 4; ++i) {
errinj.receive_message("injection1");
}
BOOST_REQUIRE_NO_THROW(co_await std::move(f1));
BOOST_REQUIRE_NO_THROW(co_await std::move(f2));
}
errinj.disable("injection1");
errinj.enable("injection2");
{
// Test receiving enough unshared messages before waiting for them
errinj.receive_message("injection2");
errinj.receive_message("injection2");
auto f1 = errinj.inject("injection2", [timeout] (auto& handler) {
return handler.wait_for_message(timeout);
}, false);
auto f2 = errinj.inject("injection2", [timeout] (auto& handler) {
return handler.wait_for_message(timeout);
}, false);
BOOST_REQUIRE_NO_THROW(co_await std::move(f1));
BOOST_REQUIRE_NO_THROW(co_await std::move(f2));
}
errinj.disable("injection2");
errinj.enable("injection3");
{
// Test receiving not enough unshared messages
auto timeout_1s = db::timeout_clock::now() + 1s;
auto f1 = errinj.inject("injection3", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout_1s), false);
auto f2 = errinj.inject("injection3", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout_1s), false);
for (size_t i = 0; i < 3; ++i) {
errinj.receive_message("injection3");
}
BOOST_REQUIRE_THROW(co_await when_all_succeed(std::move(f1), std::move(f2)).discard_result(), std::runtime_error);
}
errinj.disable("injection3");
errinj.enable("injection4");
{
// Test handlers sharing messages are independent of the not sharing ones
auto f1 = errinj.inject("injection4", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout), true);
auto f2 = errinj.inject("injection4", std::bind_front([] (auto timeout, auto& handler) -> future<> {
co_await handler.wait_for_message(timeout);
co_await handler.wait_for_message(timeout);
}, timeout), true);
auto f3 = errinj.inject("injection4", [timeout] (auto& handler) {
return handler.wait_for_message(timeout);
}, false);
auto f4 = errinj.inject("injection4", [timeout] (auto& handler) {
return handler.wait_for_message(timeout);
}, false);
errinj.receive_message("injection4");
errinj.receive_message("injection4");
BOOST_REQUIRE_NO_THROW(co_await std::move(f1));
BOOST_REQUIRE_NO_THROW(co_await std::move(f2));
BOOST_REQUIRE_NO_THROW(co_await std::move(f3));
BOOST_REQUIRE_NO_THROW(co_await std::move(f4));
}
errinj.disable("injection4");
}
SEASTAR_TEST_CASE(test_inject_with_parameters) {
utils::error_injection<true> errinj;

View File

@@ -98,10 +98,11 @@ using error_injection_parameters = std::unordered_map<sstring, sstring>;
* }, f);
* Expected use case: emulate custom errors like timeouts.
*
* 4. inject(name, future<> func(injection_handler&))
* 4. inject(name, future<> func(injection_handler&), share_messages)
* Inserts code that can wait for an event.
* Requires func to be a function taking an injection_handler reference and
* returning a future<>.
* returning a future<>. Depending on the share_messages value,
* handlers can share events or not.
* Expected use case: wait for an event from tests.
*/
@@ -116,6 +117,7 @@ class error_injection {
*/
struct injection_shared_data {
size_t received_message_count{0};
size_t shared_read_message_count{0};
condition_variable received_message_cv;
error_injection_parameters parameters;
@@ -129,13 +131,27 @@ public:
* The injection handler class is used to wait for events inside the injected code.
* If multiple inject (with handler) are called concurrently for the same injection_name,
* all of them will have separate handlers.
*
* Handlers can be of two types depending on the share_messages value passed to inject
* (with handler):
* 1. By default, handlers share received messages. It means that every message can be
* received by all handlers (even if they start waiting in the future).
* 2. When handlers do not share received messages, only one can receive a specific
* message. Other handlers will wait for new messages.
*
* For a single injection, these two types of handlers are independent. A handler of one
* type never impacts a handler of the second type.
*
* In most cases, using the default type is sufficient or required. The second type
* allows waiting for new messages during every execution of the injected code.
*/
class injection_handler: public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
lw_shared_ptr<injection_shared_data> _shared_data;
size_t _read_messages_counter{0};
bool _share_messages;
explicit injection_handler(lw_shared_ptr<injection_shared_data> shared_data)
: _shared_data(std::move(shared_data)) {}
explicit injection_handler(lw_shared_ptr<injection_shared_data> shared_data, bool share_messages)
: _shared_data(std::move(shared_data)), _share_messages(share_messages) {}
public:
template <typename Clock, typename Duration>
@@ -146,6 +162,15 @@ public:
try {
co_await _shared_data->received_message_cv.wait(timeout, [&] {
if (!_share_messages) {
bool wakes_up = _shared_data->shared_read_message_count < _shared_data->received_message_count;
if (wakes_up) {
// Increase shared_read_message_count here, so other sharing handlers don't wake up.
++_shared_data->shared_read_message_count;
}
return wakes_up;
}
return _read_messages_counter < _shared_data->received_message_count;
});
}
@@ -162,10 +187,14 @@ public:
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
if (_read_messages_counter < _shared_data->received_message_count) {
if (_share_messages && _read_messages_counter < _shared_data->received_message_count) {
++_read_messages_counter;
return true;
}
if (!_share_messages && _shared_data->shared_read_message_count < _shared_data->received_message_count) {
++_shared_data->shared_read_message_count;
return true;
}
return false;
}
@@ -192,9 +221,17 @@ private:
* which is created once when enabling an injection on a given shard, and all injection_handlers,
* that are created separately for each firing of this injection.
* - the counter is incremented when receiving a message from the REST endpoint and the condition variable is signaled.
*
* Handlers sharing messages:
* - each injection_handler (separate for each firing) stores its own private counter, _read_messages_counter.
* - that private counter is incremented whenever we wait for a message, and compared to the received counter.
* We sleep on the condition variable if not enough messages were received.
*
* Handlers not sharing messages:
* - injection_shared_data stores a counter, shared_read_message_count, which is shared by all handlers.
* - that shared counter is incremented whenever a handler finishes waiting for a message. While waiting for
* a message, a handler compares this counter to the received counter. It sleeps on the condition variable if
* they are equal.
*/
struct injection_data {
bool one_shot;
@@ -393,7 +430,8 @@ public:
// \brief Inject exception
// \param func function returning a future and taking an injection handler
future<> inject(const std::string_view& name, waiting_handler_fun func) {
// \param share_messages if true, injection handlers share received messages
future<> inject(const std::string_view& name, waiting_handler_fun func, bool share_messages = true) {
auto* data = get_data(name);
if (!data) {
co_return;
@@ -407,7 +445,7 @@ public:
}
errinj_logger.debug("Triggering injection \"{}\" with injection handler", name);
injection_handler handler(data->shared_data);
injection_handler handler(data->shared_data, share_messages);
data->handlers.push_back(handler);
auto disable_one_shot = defer([this, one_shot, name = sstring(name)] {
@@ -535,7 +573,7 @@ public:
// \brief Inject exception
// \param func function returning a future and taking an injection handler
[[gnu::always_inline]]
future<> inject(const std::string_view& name, waiting_handler_fun func) {
future<> inject(const std::string_view& name, waiting_handler_fun func, bool share_messages = true) {
return make_ready_future<>();
}