From c1432f36579b4191018969160ad6ac48ef4aae38 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 28 Oct 2024 15:34:30 +0300 Subject: [PATCH 1/4] error_injection: Add inject() overload with wait_for_message wrapper The wrapper object denotes that injection should run a handler and wait_for_message() on it. Wrapper carries the timeout used to call the mentioned method. It's currently unused, next patches will start enjoing it. Signed-off-by: Pavel Emelyanov --- utils/error_injection.hh | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/utils/error_injection.hh b/utils/error_injection.hh index ab39019b80..c589e295aa 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -43,6 +43,14 @@ extern logging::logger errinj_logger; using error_injection_parameters = std::unordered_map; +// Wraps the argument to breakpoint injection (see the relevant inject() overload +// in class error_injection below). The only parameter is the timeout after which +// the pause is aborted +struct wait_for_message { + std::chrono::milliseconds timeout; + wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {} +}; + /** * Error injection class can be used to create and manage code injections * which trigger an error or a custom action in debug mode. @@ -454,6 +462,18 @@ public: co_await func(handler); } + // \brief Inject "breakpoint" + // Injects a pause in the code execution that's woken up explicitly by the injector + // request + // \param wfm -- the wait_for_message instance that describes details of the pause + future<> inject(const std::string_view& name, utils::wait_for_message wfm) { + co_await inject(name, [name, wfm] (injection_handler& handler) -> future<> { + errinj_logger.info("{}: waiting for message", name); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout); + errinj_logger.info("{}: message received", name); + }); + } + template std::optional inject_parameter(const std::string_view& name) { auto* data = get_data(name); @@ -606,6 +626,12 @@ public: return make_ready_future<>(); } + // \brief Inject "breakpoint" + [[gnu::always_inline]] + future<> inject(const std::string_view& name, utils::wait_for_message wfm) { + return make_ready_future<>(); + } + template [[gnu::always_inline]] std::optional inject_parameter(const std::string_view& name) { From 7d8cc3ccc2f89b31cd44e76856abcc94e7edcad9 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 28 Oct 2024 18:14:14 +0300 Subject: [PATCH 2/4] treewide,error_injection: Use inject(wait_for_message) overload Many places want to inject a handler that waits for external kick. Now there's convenience inject() method overload for this. It will result in extra messages in logs, but so far no code/test cares about it. Signed-off-by: Pavel Emelyanov --- compaction/compaction_manager.cc | 10 ++-------- compaction/task_manager_module.cc | 3 +-- db/view/view.cc | 15 +++++---------- repair/repair.cc | 3 +-- service/storage_service.cc | 4 +--- tasks/task_manager.cc | 3 +-- 6 files changed, 11 insertions(+), 27 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 2c36ad14dd..32aafed844 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -459,12 +459,7 @@ future<> compaction_task_executor::update_history(table_state& t, const sstables if (_cm._sys_ks) { auto sys_ks = _cm._sys_ks; // hold pointer on sys_ks - co_await utils::get_local_injector().inject("update_history_wait", [](auto& handler) -> future<> { - cmlog.info("update_history_wait: waiting"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{120}); - cmlog.info("update_history_wait: released"); - }); - + co_await utils::get_local_injector().inject("update_history_wait", utils::wait_for_message(120s)); std::unordered_map rows_merged; for (size_t id=0; id do_run() override { if (!is_system_keyspace(_status.keyspace)) { - co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", utils::wait_for_message(10s)); } co_await coroutine::switch_to(_cm.compaction_sg()); diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 2eb656e6ae..0a40ef6d07 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -378,8 +378,7 @@ future<> global_major_compaction_task_impl::run() { } future<> major_keyspace_compaction_task_impl::run() { - co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", utils::wait_for_message(10s)); if (_cv) { co_await wait_for_your_turn(*_cv, *_current_task, _status.id); diff --git a/db/view/view.cc b/db/view/view.cc index 6b1c760728..f0a99585c8 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2355,8 +2355,7 @@ static future<> announce_with_raft( future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_name) { co_await write_view_build_status( [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min)); const sstring query_string = format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2); auto host_id = _db.get_token_metadata().get_my_id(); @@ -2365,8 +2364,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam "view builder: mark view build STARTED"); }, [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min)); co_await _sys_dist_ks.start_view_build(std::move(ks_name), std::move(view_name)); } ); @@ -2375,8 +2373,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_name) { co_await write_view_build_status( [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_mark_success", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min)); const sstring query_string = format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2); auto host_id = _db.get_token_metadata().get_my_id(); @@ -2385,8 +2382,7 @@ future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_nam "view builder: mark view build SUCCESS"); }, [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_mark_success", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min)); co_await _sys_dist_ks.finish_view_build(std::move(ks_name), std::move(view_name)); } ); @@ -2649,8 +2645,7 @@ future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::syst {}, cql3::query_processor::cache_internal::no); - co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2", utils::wait_for_message(5min)); auto col_names = boost::copy_range>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); })); auto col_names_str = boost::algorithm::join(col_names, ", "); diff --git a/repair/repair.cc b/repair/repair.cc index 6965258f89..0c8ec7a9a8 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1038,8 +1038,7 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() { auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>(); co_await repair_range(range, table_info); if (2 * (_ranges_complete + 1) > ranges_size()) { - co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges", utils::wait_for_message(10s)); } ++_ranges_complete; if (_reason == streaming::stream_reason::bootstrap) { diff --git a/service/storage_service.cc b/service/storage_service.cc index fb2283a81e..6d584c8636 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5718,9 +5718,7 @@ future storage_service::raft_topology_cmd_handler(raft tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; auto task = co_await get_task_manager_module().make_and_start_task(parent_info, parent_info.id, streaming::stream_reason::decommission, _decommission_result, coroutine::lambda([this] () -> future<> { - co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 60s); }); - + co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", utils::wait_for_message(60s)); co_await unbootstrap(); })); co_await task->done(); diff --git a/tasks/task_manager.cc b/tasks/task_manager.cc index ae67def434..d2301476e2 100644 --- a/tasks/task_manager.cc +++ b/tasks/task_manager.cc @@ -158,8 +158,7 @@ is_internal task_manager::task::impl::is_internal() const noexcept { } static future<> abort_children(task_manager::module_ptr module, task_id parent_id) noexcept { - co_await utils::get_local_injector().inject("tasks_abort_children", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("tasks_abort_children", utils::wait_for_message(10s)); auto entered = module->async_gate().try_enter(); if (!entered) { From 39cb93be3cd9ba24f3f46e594e5c0cd5f48e7852 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 28 Oct 2024 18:15:12 +0300 Subject: [PATCH 3/4] treewide,error_injection: Use inject(wait_for_message) and fix tests This is continuation of previous patch, this time also update tests that wait for specific message in logs (to make sure injection handler was called and paused the code execution). Signed-off-by: Pavel Emelyanov --- db/snapshot/backup_task.cc | 5 +--- service/storage_service.cc | 12 ++------ service/topology_coordinator.cc | 29 ++++--------------- test/object_store/test_backup.py | 2 +- test/topology_custom/test_raft_no_quorum.py | 5 ++-- test/topology_custom/test_tablets_cql.py | 4 +-- .../test_mv_tablets_replace.py | 2 +- .../test_random_failures.py | 2 +- 8 files changed, 15 insertions(+), 46 deletions(-) diff --git a/db/snapshot/backup_task.cc b/db/snapshot/backup_task.cc index dbc895d3d0..1d6056ff74 100644 --- a/db/snapshot/backup_task.cc +++ b/db/snapshot/backup_task.cc @@ -89,10 +89,7 @@ future<> backup_task_impl::do_backup() { } }).finally([gh = std::move(gh)] {}); co_await coroutine::maybe_yield(); - co_await utils::get_local_injector().inject("backup_task_pause", [] (auto& handler) { - snap_log.info("backup task: waiting"); - return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); - }); + co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2))); if (impl::_as.abort_requested()) { ex = impl::_as.abort_requested_exception_ptr(); break; diff --git a/service/storage_service.cc b/service/storage_service.cc index 6d584c8636..577d6ba1aa 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6669,11 +6669,7 @@ future storage_service::join_node_request_handler(join group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, format("raft topology: placing join request for {}", params.host_id)); - co_await utils::get_local_injector().inject("join-node-before-add-entry", [] (auto& handler) -> future<> { - rtlogger.info("join-node-before-add-entry injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("join-node-before-add-entry injection done"); - }); + co_await utils::get_local_injector().inject("join-node-before-add-entry", utils::wait_for_message(5min)); try { // Make replaced node and ignored nodes non voters earlier for better HA @@ -6737,11 +6733,7 @@ future storage_service::join_node_response_handler(jo // the replacing node that is alive. co_await _gossiper.advertise_to_nodes({}); - co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", [] (auto& handler) -> future<> { - rtlogger.info("join-node-response_handler-before-read-barrier injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("join-node-response_handler-before-read-barrier injection done"); - }); + co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", utils::wait_for_message(5min)); // Do a read barrier to read/initialize the topology state co_await _group0->group0_server_with_timeouts().read_barrier(&_group0_as, raft_timeout{}); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index b88ed56891..d8b5563fd2 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -842,11 +842,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.trace("do update {} reason {}", updates, reason); mixed_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); - co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> { - rtlogger.info("wait-before-committing-rf-change-event injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30}); - rtlogger.info("wait-before-committing-rf-change-event injection done"); - }); + co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", utils::wait_for_message(30s)); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); } break; @@ -1332,11 +1328,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // which happens outside the topology coordinator. bool has_updates = !updates.empty(); if (has_updates) { - co_await utils::get_local_injector().inject("tablet_transition_updates", [] (auto& handler) { - rtlogger.info("tablet_transition_updates: start"); - return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); - }); - + co_await utils::get_local_injector().inject("tablet_transition_updates", utils::wait_for_message(2min)); updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) .set_version(_topo_sm._topology.version + 1) @@ -1665,12 +1657,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { "bootstrap: insert tokens and CDC generation data (UUID: {})", gen_uuid); co_await update_topology_state(std::move(guard_), {std::move(mutation), builder.build()}, reason); - co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", - [] (auto& handler) -> future<> { - rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: done"); - }); + co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", utils::wait_for_message(5min)); } break; case node_state::replacing: { @@ -3010,8 +2997,7 @@ future<> topology_coordinator::run() { while (!_as.abort_requested()) { bool sleep = false; try { - co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min)); auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation()); if (_rollback) { @@ -3028,12 +3014,7 @@ future<> topology_coordinator::run() { co_await await_event(); rtlogger.debug("topology coordinator fiber got an event"); } - co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", [] (auto& handler) -> future<> { - rtlogger.info("wait-after-topology-coordinator-gets-event injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30}); - rtlogger.info("wait-after-topology-coordinator-gets-event injection done"); - }); - + co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s)); } catch (...) { sleep = handle_topology_coordinator_error(std::current_exception()); } diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index 3ad84760bd..57f9c112f0 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -134,7 +134,7 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server): tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix) print(f'Started task {tid}, aborting it early') - await log.wait_for('backup task: waiting', from_mark=mark) + await log.wait_for('backup_task_pause: waiting', from_mark=mark) await manager.api.abort_task(server.ip_addr, tid) await manager.api.message_injection(server.ip_addr, "backup_task_pause") status = await manager.api.wait_task(server.ip_addr, tid) diff --git a/test/topology_custom/test_raft_no_quorum.py b/test/topology_custom/test_raft_no_quorum.py index c064816cb4..3ef5cd1be7 100644 --- a/test/topology_custom/test_raft_no_quorum.py +++ b/test/topology_custom/test_raft_no_quorum.py @@ -105,8 +105,7 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time logger.info(f"waiting for the leader node {servers[0]} to start handling the join request") log_file = await manager.server_open_log(servers[0].server_id) - await log_file.wait_for("join-node-before-add-entry injection hit", - timeout=60) + await log_file.wait_for("join-node-before-add-entry: waiting", timeout=60) logger.info("stopping the second node") await manager.server_stop_gracefully(servers[1].server_id) @@ -153,7 +152,7 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli logger.info(f"waiting for the third node {servers[2]} to hit join-node-response_handler-before-read-barrier") log_file = await manager.server_open_log(servers[2].server_id) - await log_file.wait_for("join-node-response_handler-before-read-barrier injection hit", timeout=60) + await log_file.wait_for("join-node-response_handler-before-read-barrier: waiting", timeout=60) logger.info("stopping the second node") await manager.server_stop_gracefully(servers[1].server_id) diff --git a/test/topology_custom/test_tablets_cql.py b/test/topology_custom/test_tablets_cql.py index 044b6a9991..6c6c6920a2 100644 --- a/test/topology_custom/test_tablets_cql.py +++ b/test/topology_custom/test_tablets_cql.py @@ -51,7 +51,7 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None: logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request") leader_log_file = await manager.server_open_log(servers[0].server_id) - await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event injection hit", timeout=10) + await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event: waiting", timeout=10) logger.info(f"dropping KS from the follower node {servers[1]} so that the leader, which hangs on injected sleep, " f"wakes up with the drop applied") @@ -106,7 +106,7 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request") leader_log_file = await manager.server_open_log(servers[0].server_id) - await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10) + await leader_log_file.wait_for("wait-before-committing-rf-change-event: waiting", timeout=10) logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, " f"wakes up with a changed schema") diff --git a/test/topology_experimental_raft/test_mv_tablets_replace.py b/test/topology_experimental_raft/test_mv_tablets_replace.py index dc2d309d8b..1828b8a273 100644 --- a/test/topology_experimental_raft/test_mv_tablets_replace.py +++ b/test/topology_experimental_raft/test_mv_tablets_replace.py @@ -65,7 +65,7 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient): replace_cfg = ReplaceConfig(replaced_id = server_to_replace.server_id, reuse_ip_addr = False, use_host_id = True) replace_task = asyncio.create_task(manager.server_add(replace_cfg)) - await coord_log.wait_for('tablet_transition_updates: start', from_mark=coord_mark) + await coord_log.wait_for('tablet_transition_updates: waiting', from_mark=coord_mark) if server_to_down.server_id != server_to_replace.server_id: await manager.server_stop(server_to_down.server_id) diff --git a/test/topology_random_failures/test_random_failures.py b/test/topology_random_failures/test_random_failures.py index 8e77cc4040..6e5e41d183 100644 --- a/test/topology_random_failures/test_random_failures.py +++ b/test/topology_random_failures/test_random_failures.py @@ -115,7 +115,7 @@ async def test_random_failures(manager: ManagerClient, coordinator_log_mark = await coordinator_log.mark() s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED) await coordinator_log.wait_for( - pattern="topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes", + pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting", from_mark=coordinator_log_mark, ) await manager.server_pause(server_id=s_info.server_id) From c16369323b29c137e5e749ad17d7c00a697ddee3 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 28 Oct 2024 17:32:38 +0300 Subject: [PATCH 4/4] sstables: Use inject(wait_for_message_overload) This place could be in the pre-previous patch, it just can use the overload, but it seemengly has a bug. It prints _two_ messages -- that the injection handler was suspended and that it was woken up. The bug is in the 2nd message -- it's printed without waiting for the message, so it likely gets printed before wakeup itself. It seems that no tests care about it though. Signed-off-by: Pavel Emelyanov --- sstables/sstables.cc | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 6297f15e21..b5e0631a89 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1548,12 +1548,7 @@ future<> sstable::reload_reclaimed_components() { co_return; } - co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", [] (auto& handler) { - sstlog.info("reload_reclaimed_components/pause init"); - auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5}); - sstlog.info("reload_reclaimed_components/pause done"); - return ret; - }); + co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", utils::wait_for_message(std::chrono::seconds(5))); co_await read_filter(); _total_reclaimable_memory.reset(); @@ -1957,12 +1952,7 @@ future sstable::validate(reader_permit permit, abort_source& abort, if (errors) { co_return errors; } - co_await utils::get_local_injector().inject("sstable_validate/pause", [] (auto& handler) { - sstlog.info("sstable_validate/pause init"); - auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5}); - sstlog.info("sstable_validate/pause done"); - return ret; - }); + co_await utils::get_local_injector().inject("sstable_validate/pause", utils::wait_for_message(std::chrono::seconds(5))); if (_version >= sstable_version_types::mc) { co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor);