Merge 'Generalize "breakpoint" type of error injection' from Pavel Emelyanov

This pattern is -- if requested (by test) suspend code execution until requestor (the test) explicitly wakes it up. For that the injected place should inject a lambda that is called with so called "handler" at hand and try to read message from the handler. In many cases the inner lambda additionally prints a message into logs that tests waits upon to make sure injection was stepped on. In the end of the day this "breakpoint" is injected like

```
    co_await inject("foo", [] (auto& handler) {
        log.info("foo waiting");
        co_await handler.wait_for_message(timeout);
    });
```

This PR makes breakpoints shorter and more unified, like this

```
    co_await inject("foo", wait_for_message(timeout));
```

where `wait_for_message` is a wrapper structure used to pick new `inject()` overload.

Closes scylladb/scylladb#21342

* github.com:scylladb/scylladb:
  sstables: Use inject(wait_for_message_overload)
  treewide,error_injection: Use inject(wait_for_message) and fix tests
  treewide,error_injection: Use inject(wait_for_message) overload
  error_injection: Add inject() overload with wait_for_message wrapper
This commit is contained in:
Nadav Har'El
2024-10-31 21:56:27 +02:00
15 changed files with 54 additions and 85 deletions

View File

@@ -459,12 +459,7 @@ future<> compaction_task_executor::update_history(table_state& t, const sstables
if (_cm._sys_ks) {
auto sys_ks = _cm._sys_ks; // hold pointer on sys_ks
co_await utils::get_local_injector().inject("update_history_wait", [](auto& handler) -> future<> {
cmlog.info("update_history_wait: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{120});
cmlog.info("update_history_wait: released");
});
co_await utils::get_local_injector().inject("update_history_wait", utils::wait_for_message(120s));
std::unordered_map<int32_t, int64_t> rows_merged;
for (size_t id=0; id<res.stats.reader_statistics.rows_merged_histogram.size(); ++id) {
if (res.stats.reader_statistics.rows_merged_histogram[id] <= 0) {
@@ -1251,8 +1246,7 @@ protected:
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
if (!is_system_keyspace(_status.keyspace)) {
co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", utils::wait_for_message(10s));
}
co_await coroutine::switch_to(_cm.compaction_sg());

View File

@@ -378,8 +378,7 @@ future<> global_major_compaction_task_impl::run() {
}
future<> major_keyspace_compaction_task_impl::run() {
co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", utils::wait_for_message(10s));
if (_cv) {
co_await wait_for_your_turn(*_cv, *_current_task, _status.id);

View File

@@ -89,10 +89,7 @@ future<> backup_task_impl::do_backup() {
}
}).finally([gh = std::move(gh)] {});
co_await coroutine::maybe_yield();
co_await utils::get_local_injector().inject("backup_task_pause", [] (auto& handler) {
snap_log.info("backup task: waiting");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2)));
if (impl::_as.abort_requested()) {
ex = impl::_as.abort_requested_exception_ptr();
break;

View File

@@ -2356,8 +2356,7 @@ static future<> announce_with_raft(
future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_name) {
co_await write_view_build_status(
[&] () -> future<> {
co_await utils::get_local_injector().inject("view_builder_pause_add_new_view",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min));
const sstring query_string = format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)",
db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2);
auto host_id = _db.get_token_metadata().get_my_id();
@@ -2366,8 +2365,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam
"view builder: mark view build STARTED");
},
[&] () -> future<> {
co_await utils::get_local_injector().inject("view_builder_pause_add_new_view",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min));
co_await _sys_dist_ks.start_view_build(std::move(ks_name), std::move(view_name));
}
);
@@ -2376,8 +2374,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam
future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_name) {
co_await write_view_build_status(
[&] () -> future<> {
co_await utils::get_local_injector().inject("view_builder_pause_mark_success",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min));
const sstring query_string = format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?",
db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2);
auto host_id = _db.get_token_metadata().get_my_id();
@@ -2386,8 +2383,7 @@ future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_nam
"view builder: mark view build SUCCESS");
},
[&] () -> future<> {
co_await utils::get_local_injector().inject("view_builder_pause_mark_success",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min));
co_await _sys_dist_ks.finish_view_build(std::move(ks_name), std::move(view_name));
}
);
@@ -2654,8 +2650,7 @@ future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::syst
{},
cql3::query_processor::cache_internal::no);
co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2", utils::wait_for_message(5min));
auto col_names = boost::copy_range<std::vector<sstring>>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); }));
auto col_names_str = boost::algorithm::join(col_names, ", ");

View File

@@ -1038,8 +1038,7 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() {
auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>();
co_await repair_range(range, table_info);
if (2 * (_ranges_complete + 1) > ranges_size()) {
co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges", utils::wait_for_message(10s));
}
++_ranges_complete;
if (_reason == streaming::stream_reason::bootstrap) {

View File

@@ -5718,9 +5718,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0};
auto task = co_await get_task_manager_module().make_and_start_task<node_ops::streaming_task_impl>(parent_info,
parent_info.id, streaming::stream_reason::decommission, _decommission_result, coroutine::lambda([this] () -> future<> {
co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 60s); });
co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", utils::wait_for_message(60s));
co_await unbootstrap();
}));
co_await task->done();
@@ -6671,11 +6669,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
format("raft topology: placing join request for {}", params.host_id));
co_await utils::get_local_injector().inject("join-node-before-add-entry", [] (auto& handler) -> future<> {
rtlogger.info("join-node-before-add-entry injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("join-node-before-add-entry injection done");
});
co_await utils::get_local_injector().inject("join-node-before-add-entry", utils::wait_for_message(5min));
try {
// Make replaced node and ignored nodes non voters earlier for better HA
@@ -6739,11 +6733,7 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
// the replacing node that is alive.
co_await _gossiper.advertise_to_nodes({});
co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", [] (auto& handler) -> future<> {
rtlogger.info("join-node-response_handler-before-read-barrier injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("join-node-response_handler-before-read-barrier injection done");
});
co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", utils::wait_for_message(5min));
// Do a read barrier to read/initialize the topology state
co_await _group0->group0_server_with_timeouts().read_barrier(&_group0_as, raft_timeout{});

View File

@@ -842,11 +842,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.trace("do update {} reason {}", updates, reason);
mixed_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> {
rtlogger.info("wait-before-committing-rf-change-event injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
rtlogger.info("wait-before-committing-rf-change-event injection done");
});
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", utils::wait_for_message(30s));
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
}
break;
@@ -1332,11 +1328,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// which happens outside the topology coordinator.
bool has_updates = !updates.empty();
if (has_updates) {
co_await utils::get_local_injector().inject("tablet_transition_updates", [] (auto& handler) {
rtlogger.info("tablet_transition_updates: start");
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
});
co_await utils::get_local_injector().inject("tablet_transition_updates", utils::wait_for_message(2min));
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_version(_topo_sm._topology.version + 1)
@@ -1665,12 +1657,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
"bootstrap: insert tokens and CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard_), {std::move(mutation), builder.build()}, reason);
co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation",
[] (auto& handler) -> future<> {
rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: done");
});
co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", utils::wait_for_message(5min));
}
break;
case node_state::replacing: {
@@ -3010,8 +2997,7 @@ future<> topology_coordinator::run() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); });
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min));
auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation());
if (_rollback) {
@@ -3028,12 +3014,7 @@ future<> topology_coordinator::run() {
co_await await_event();
rtlogger.debug("topology coordinator fiber got an event");
}
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", [] (auto& handler) -> future<> {
rtlogger.info("wait-after-topology-coordinator-gets-event injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
rtlogger.info("wait-after-topology-coordinator-gets-event injection done");
});
co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s));
} catch (...) {
sleep = handle_topology_coordinator_error(std::current_exception());
}

View File

@@ -1549,12 +1549,7 @@ future<> sstable::reload_reclaimed_components() {
co_return;
}
co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", [] (auto& handler) {
sstlog.info("reload_reclaimed_components/pause init");
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
sstlog.info("reload_reclaimed_components/pause done");
return ret;
});
co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", utils::wait_for_message(std::chrono::seconds(5)));
co_await read_filter();
_total_reclaimable_memory.reset();
@@ -1958,12 +1953,7 @@ future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
if (errors) {
co_return errors;
}
co_await utils::get_local_injector().inject("sstable_validate/pause", [] (auto& handler) {
sstlog.info("sstable_validate/pause init");
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
sstlog.info("sstable_validate/pause done");
return ret;
});
co_await utils::get_local_injector().inject("sstable_validate/pause", utils::wait_for_message(std::chrono::seconds(5)));
if (_version >= sstable_version_types::mc) {
co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor);

View File

@@ -158,8 +158,7 @@ is_internal task_manager::task::impl::is_internal() const noexcept {
}
static future<> abort_children(task_manager::module_ptr module, task_id parent_id) noexcept {
co_await utils::get_local_injector().inject("tasks_abort_children",
[] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); });
co_await utils::get_local_injector().inject("tasks_abort_children", utils::wait_for_message(10s));
auto entered = module->async_gate().try_enter();
if (!entered) {

View File

@@ -134,7 +134,7 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server):
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix)
print(f'Started task {tid}, aborting it early')
await log.wait_for('backup task: waiting', from_mark=mark)
await log.wait_for('backup_task_pause: waiting', from_mark=mark)
await manager.api.abort_task(server.ip_addr, tid)
await manager.api.message_injection(server.ip_addr, "backup_task_pause")
status = await manager.api.wait_task(server.ip_addr, tid)

View File

@@ -105,8 +105,7 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
logger.info(f"waiting for the leader node {servers[0]} to start handling the join request")
log_file = await manager.server_open_log(servers[0].server_id)
await log_file.wait_for("join-node-before-add-entry injection hit",
timeout=60)
await log_file.wait_for("join-node-before-add-entry: waiting", timeout=60)
logger.info("stopping the second node")
await manager.server_stop_gracefully(servers[1].server_id)
@@ -153,7 +152,7 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
logger.info(f"waiting for the third node {servers[2]} to hit join-node-response_handler-before-read-barrier")
log_file = await manager.server_open_log(servers[2].server_id)
await log_file.wait_for("join-node-response_handler-before-read-barrier injection hit", timeout=60)
await log_file.wait_for("join-node-response_handler-before-read-barrier: waiting", timeout=60)
logger.info("stopping the second node")
await manager.server_stop_gracefully(servers[1].server_id)

View File

@@ -51,7 +51,7 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
leader_log_file = await manager.server_open_log(servers[0].server_id)
await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event injection hit", timeout=10)
await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event: waiting", timeout=10)
logger.info(f"dropping KS from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
f"wakes up with the drop applied")
@@ -106,7 +106,7 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
leader_log_file = await manager.server_open_log(servers[0].server_id)
await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10)
await leader_log_file.wait_for("wait-before-committing-rf-change-event: waiting", timeout=10)
logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
f"wakes up with a changed schema")

View File

@@ -65,7 +65,7 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
replace_cfg = ReplaceConfig(replaced_id = server_to_replace.server_id, reuse_ip_addr = False, use_host_id = True)
replace_task = asyncio.create_task(manager.server_add(replace_cfg))
await coord_log.wait_for('tablet_transition_updates: start', from_mark=coord_mark)
await coord_log.wait_for('tablet_transition_updates: waiting', from_mark=coord_mark)
if server_to_down.server_id != server_to_replace.server_id:
await manager.server_stop(server_to_down.server_id)

View File

@@ -115,7 +115,7 @@ async def test_random_failures(manager: ManagerClient,
coordinator_log_mark = await coordinator_log.mark()
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED)
await coordinator_log.wait_for(
pattern="topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes",
pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting",
from_mark=coordinator_log_mark,
)
await manager.server_pause(server_id=s_info.server_id)

View File

@@ -43,6 +43,14 @@ extern logging::logger errinj_logger;
using error_injection_parameters = std::unordered_map<sstring, sstring>;
// Wraps the argument to breakpoint injection (see the relevant inject() overload
// in class error_injection below). The only parameter is the timeout after which
// the pause is aborted
struct wait_for_message {
std::chrono::milliseconds timeout;
wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {}
};
/**
* Error injection class can be used to create and manage code injections
* which trigger an error or a custom action in debug mode.
@@ -454,6 +462,18 @@ public:
co_await func(handler);
}
// \brief Inject "breakpoint"
// Injects a pause in the code execution that's woken up explicitly by the injector
// request
// \param wfm -- the wait_for_message instance that describes details of the pause
future<> inject(const std::string_view& name, utils::wait_for_message wfm) {
co_await inject(name, [name, wfm] (injection_handler& handler) -> future<> {
errinj_logger.info("{}: waiting for message", name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout);
errinj_logger.info("{}: message received", name);
});
}
template <typename T = std::string_view>
std::optional<T> inject_parameter(const std::string_view& name) {
auto* data = get_data(name);
@@ -606,6 +626,12 @@ public:
return make_ready_future<>();
}
// \brief Inject "breakpoint"
[[gnu::always_inline]]
future<> inject(const std::string_view& name, utils::wait_for_message wfm) {
return make_ready_future<>();
}
template <typename T>
[[gnu::always_inline]]
std::optional<T> inject_parameter(const std::string_view& name) {