diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 01af5a644b..dcc408449c 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -459,12 +459,7 @@ future<> compaction_task_executor::update_history(table_state& t, const sstables if (_cm._sys_ks) { auto sys_ks = _cm._sys_ks; // hold pointer on sys_ks - co_await utils::get_local_injector().inject("update_history_wait", [](auto& handler) -> future<> { - cmlog.info("update_history_wait: waiting"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{120}); - cmlog.info("update_history_wait: released"); - }); - + co_await utils::get_local_injector().inject("update_history_wait", utils::wait_for_message(120s)); std::unordered_map rows_merged; for (size_t id=0; id do_run() override { if (!is_system_keyspace(_status.keyspace)) { - co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", utils::wait_for_message(10s)); } co_await coroutine::switch_to(_cm.compaction_sg()); diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 2eb656e6ae..0a40ef6d07 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -378,8 +378,7 @@ future<> global_major_compaction_task_impl::run() { } future<> major_keyspace_compaction_task_impl::run() { - co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("compaction_major_keyspace_compaction_task_impl_run", utils::wait_for_message(10s)); if (_cv) { co_await wait_for_your_turn(*_cv, *_current_task, _status.id); diff --git a/db/snapshot/backup_task.cc b/db/snapshot/backup_task.cc index dbc895d3d0..1d6056ff74 100644 --- a/db/snapshot/backup_task.cc +++ b/db/snapshot/backup_task.cc @@ -89,10 +89,7 @@ future<> backup_task_impl::do_backup() { } }).finally([gh = std::move(gh)] {}); co_await coroutine::maybe_yield(); - co_await utils::get_local_injector().inject("backup_task_pause", [] (auto& handler) { - snap_log.info("backup task: waiting"); - return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); - }); + co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2))); if (impl::_as.abort_requested()) { ex = impl::_as.abort_requested_exception_ptr(); break; diff --git a/db/view/view.cc b/db/view/view.cc index 121ae1a795..a49ce8df37 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2356,8 +2356,7 @@ static future<> announce_with_raft( future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_name) { co_await write_view_build_status( [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min)); const sstring query_string = format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2); auto host_id = _db.get_token_metadata().get_my_id(); @@ -2366,8 +2365,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam "view builder: mark view build STARTED"); }, [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_add_new_view", utils::wait_for_message(5min)); co_await _sys_dist_ks.start_view_build(std::move(ks_name), std::move(view_name)); } ); @@ -2376,8 +2374,7 @@ future<> view_builder::mark_view_build_started(sstring ks_name, sstring view_nam future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_name) { co_await write_view_build_status( [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_mark_success", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min)); const sstring query_string = format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", db::system_keyspace::NAME, db::system_keyspace::VIEW_BUILD_STATUS_V2); auto host_id = _db.get_token_metadata().get_my_id(); @@ -2386,8 +2383,7 @@ future<> view_builder::mark_view_build_success(sstring ks_name, sstring view_nam "view builder: mark view build SUCCESS"); }, [&] () -> future<> { - co_await utils::get_local_injector().inject("view_builder_pause_mark_success", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_mark_success", utils::wait_for_message(5min)); co_await _sys_dist_ks.finish_view_build(std::move(ks_name), std::move(view_name)); } ); @@ -2654,8 +2650,7 @@ future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::syst {}, cql3::query_processor::cache_internal::no); - co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("view_builder_pause_in_migrate_v2", utils::wait_for_message(5min)); auto col_names = boost::copy_range>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); })); auto col_names_str = boost::algorithm::join(col_names, ", "); diff --git a/repair/repair.cc b/repair/repair.cc index 6965258f89..0c8ec7a9a8 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1038,8 +1038,7 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() { auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>(); co_await repair_range(range, table_info); if (2 * (_ranges_complete + 1) > ranges_size()) { - co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("repair_shard_repair_task_impl_do_repair_ranges", utils::wait_for_message(10s)); } ++_ranges_complete; if (_reason == streaming::stream_reason::bootstrap) { diff --git a/service/storage_service.cc b/service/storage_service.cc index fb2283a81e..577d6ba1aa 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5718,9 +5718,7 @@ future storage_service::raft_topology_cmd_handler(raft tasks::task_info parent_info{tasks::task_id{rs.request_id}, 0}; auto task = co_await get_task_manager_module().make_and_start_task(parent_info, parent_info.id, streaming::stream_reason::decommission, _decommission_result, coroutine::lambda([this] () -> future<> { - co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 60s); }); - + co_await utils::get_local_injector().inject("streaming_task_impl_decommission_run", utils::wait_for_message(60s)); co_await unbootstrap(); })); co_await task->done(); @@ -6671,11 +6669,7 @@ future storage_service::join_node_request_handler(join group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, format("raft topology: placing join request for {}", params.host_id)); - co_await utils::get_local_injector().inject("join-node-before-add-entry", [] (auto& handler) -> future<> { - rtlogger.info("join-node-before-add-entry injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("join-node-before-add-entry injection done"); - }); + co_await utils::get_local_injector().inject("join-node-before-add-entry", utils::wait_for_message(5min)); try { // Make replaced node and ignored nodes non voters earlier for better HA @@ -6739,11 +6733,7 @@ future storage_service::join_node_response_handler(jo // the replacing node that is alive. co_await _gossiper.advertise_to_nodes({}); - co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", [] (auto& handler) -> future<> { - rtlogger.info("join-node-response_handler-before-read-barrier injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("join-node-response_handler-before-read-barrier injection done"); - }); + co_await utils::get_local_injector().inject("join-node-response_handler-before-read-barrier", utils::wait_for_message(5min)); // Do a read barrier to read/initialize the topology state co_await _group0->group0_server_with_timeouts().read_barrier(&_group0_as, raft_timeout{}); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index b88ed56891..d8b5563fd2 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -842,11 +842,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.trace("do update {} reason {}", updates, reason); mixed_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); - co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> { - rtlogger.info("wait-before-committing-rf-change-event injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30}); - rtlogger.info("wait-before-committing-rf-change-event injection done"); - }); + co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", utils::wait_for_message(30s)); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); } break; @@ -1332,11 +1328,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // which happens outside the topology coordinator. bool has_updates = !updates.empty(); if (has_updates) { - co_await utils::get_local_injector().inject("tablet_transition_updates", [] (auto& handler) { - rtlogger.info("tablet_transition_updates: start"); - return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); - }); - + co_await utils::get_local_injector().inject("tablet_transition_updates", utils::wait_for_message(2min)); updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) .set_version(_topo_sm._topology.version + 1) @@ -1665,12 +1657,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { "bootstrap: insert tokens and CDC generation data (UUID: {})", gen_uuid); co_await update_topology_state(std::move(guard_), {std::move(mutation), builder.build()}, reason); - co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", - [] (auto& handler) -> future<> { - rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5}); - rtlogger.info("topology_coordinator_pause_after_updating_cdc_generation: done"); - }); + co_await utils::get_local_injector().inject("topology_coordinator_pause_after_updating_cdc_generation", utils::wait_for_message(5min)); } break; case node_state::replacing: { @@ -3010,8 +2997,7 @@ future<> topology_coordinator::run() { while (!_as.abort_requested()) { bool sleep = false; try { - co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); }); + co_await utils::get_local_injector().inject("topology_coordinator_pause_before_processing_backlog", utils::wait_for_message(5min)); auto guard = co_await cleanup_group0_config_if_needed(co_await start_operation()); if (_rollback) { @@ -3028,12 +3014,7 @@ future<> topology_coordinator::run() { co_await await_event(); rtlogger.debug("topology coordinator fiber got an event"); } - co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", [] (auto& handler) -> future<> { - rtlogger.info("wait-after-topology-coordinator-gets-event injection hit"); - co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30}); - rtlogger.info("wait-after-topology-coordinator-gets-event injection done"); - }); - + co_await utils::get_local_injector().inject("wait-after-topology-coordinator-gets-event", utils::wait_for_message(30s)); } catch (...) { sleep = handle_topology_coordinator_error(std::current_exception()); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 95e1e6c5a0..e6adda437e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1549,12 +1549,7 @@ future<> sstable::reload_reclaimed_components() { co_return; } - co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", [] (auto& handler) { - sstlog.info("reload_reclaimed_components/pause init"); - auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5}); - sstlog.info("reload_reclaimed_components/pause done"); - return ret; - }); + co_await utils::get_local_injector().inject("reload_reclaimed_components/pause", utils::wait_for_message(std::chrono::seconds(5))); co_await read_filter(); _total_reclaimable_memory.reset(); @@ -1958,12 +1953,7 @@ future sstable::validate(reader_permit permit, abort_source& abort, if (errors) { co_return errors; } - co_await utils::get_local_injector().inject("sstable_validate/pause", [] (auto& handler) { - sstlog.info("sstable_validate/pause init"); - auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5}); - sstlog.info("sstable_validate/pause done"); - return ret; - }); + co_await utils::get_local_injector().inject("sstable_validate/pause", utils::wait_for_message(std::chrono::seconds(5))); if (_version >= sstable_version_types::mc) { co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor); diff --git a/tasks/task_manager.cc b/tasks/task_manager.cc index ae67def434..d2301476e2 100644 --- a/tasks/task_manager.cc +++ b/tasks/task_manager.cc @@ -158,8 +158,7 @@ is_internal task_manager::task::impl::is_internal() const noexcept { } static future<> abort_children(task_manager::module_ptr module, task_id parent_id) noexcept { - co_await utils::get_local_injector().inject("tasks_abort_children", - [] (auto& handler) { return handler.wait_for_message(db::timeout_clock::now() + 10s); }); + co_await utils::get_local_injector().inject("tasks_abort_children", utils::wait_for_message(10s)); auto entered = module->async_gate().try_enter(); if (!entered) { diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index 3ad84760bd..57f9c112f0 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -134,7 +134,7 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server): tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix) print(f'Started task {tid}, aborting it early') - await log.wait_for('backup task: waiting', from_mark=mark) + await log.wait_for('backup_task_pause: waiting', from_mark=mark) await manager.api.abort_task(server.ip_addr, tid) await manager.api.message_injection(server.ip_addr, "backup_task_pause") status = await manager.api.wait_task(server.ip_addr, tid) diff --git a/test/topology_custom/test_raft_no_quorum.py b/test/topology_custom/test_raft_no_quorum.py index c064816cb4..3ef5cd1be7 100644 --- a/test/topology_custom/test_raft_no_quorum.py +++ b/test/topology_custom/test_raft_no_quorum.py @@ -105,8 +105,7 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time logger.info(f"waiting for the leader node {servers[0]} to start handling the join request") log_file = await manager.server_open_log(servers[0].server_id) - await log_file.wait_for("join-node-before-add-entry injection hit", - timeout=60) + await log_file.wait_for("join-node-before-add-entry: waiting", timeout=60) logger.info("stopping the second node") await manager.server_stop_gracefully(servers[1].server_id) @@ -153,7 +152,7 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli logger.info(f"waiting for the third node {servers[2]} to hit join-node-response_handler-before-read-barrier") log_file = await manager.server_open_log(servers[2].server_id) - await log_file.wait_for("join-node-response_handler-before-read-barrier injection hit", timeout=60) + await log_file.wait_for("join-node-response_handler-before-read-barrier: waiting", timeout=60) logger.info("stopping the second node") await manager.server_stop_gracefully(servers[1].server_id) diff --git a/test/topology_custom/test_tablets_cql.py b/test/topology_custom/test_tablets_cql.py index 044b6a9991..6c6c6920a2 100644 --- a/test/topology_custom/test_tablets_cql.py +++ b/test/topology_custom/test_tablets_cql.py @@ -51,7 +51,7 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None: logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request") leader_log_file = await manager.server_open_log(servers[0].server_id) - await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event injection hit", timeout=10) + await leader_log_file.wait_for("wait-after-topology-coordinator-gets-event: waiting", timeout=10) logger.info(f"dropping KS from the follower node {servers[1]} so that the leader, which hangs on injected sleep, " f"wakes up with the drop applied") @@ -106,7 +106,7 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request") leader_log_file = await manager.server_open_log(servers[0].server_id) - await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10) + await leader_log_file.wait_for("wait-before-committing-rf-change-event: waiting", timeout=10) logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, " f"wakes up with a changed schema") diff --git a/test/topology_experimental_raft/test_mv_tablets_replace.py b/test/topology_experimental_raft/test_mv_tablets_replace.py index dc2d309d8b..1828b8a273 100644 --- a/test/topology_experimental_raft/test_mv_tablets_replace.py +++ b/test/topology_experimental_raft/test_mv_tablets_replace.py @@ -65,7 +65,7 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient): replace_cfg = ReplaceConfig(replaced_id = server_to_replace.server_id, reuse_ip_addr = False, use_host_id = True) replace_task = asyncio.create_task(manager.server_add(replace_cfg)) - await coord_log.wait_for('tablet_transition_updates: start', from_mark=coord_mark) + await coord_log.wait_for('tablet_transition_updates: waiting', from_mark=coord_mark) if server_to_down.server_id != server_to_replace.server_id: await manager.server_stop(server_to_down.server_id) diff --git a/test/topology_random_failures/test_random_failures.py b/test/topology_random_failures/test_random_failures.py index 8e77cc4040..6e5e41d183 100644 --- a/test/topology_random_failures/test_random_failures.py +++ b/test/topology_random_failures/test_random_failures.py @@ -115,7 +115,7 @@ async def test_random_failures(manager: ManagerClient, coordinator_log_mark = await coordinator_log.mark() s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED) await coordinator_log.wait_for( - pattern="topology_coordinator_pause_after_updating_cdc_generation: wait for message for 5 minutes", + pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting", from_mark=coordinator_log_mark, ) await manager.server_pause(server_id=s_info.server_id) diff --git a/utils/error_injection.hh b/utils/error_injection.hh index ab39019b80..c589e295aa 100644 --- a/utils/error_injection.hh +++ b/utils/error_injection.hh @@ -43,6 +43,14 @@ extern logging::logger errinj_logger; using error_injection_parameters = std::unordered_map; +// Wraps the argument to breakpoint injection (see the relevant inject() overload +// in class error_injection below). The only parameter is the timeout after which +// the pause is aborted +struct wait_for_message { + std::chrono::milliseconds timeout; + wait_for_message(std::chrono::milliseconds tmo) noexcept : timeout(tmo) {} +}; + /** * Error injection class can be used to create and manage code injections * which trigger an error or a custom action in debug mode. @@ -454,6 +462,18 @@ public: co_await func(handler); } + // \brief Inject "breakpoint" + // Injects a pause in the code execution that's woken up explicitly by the injector + // request + // \param wfm -- the wait_for_message instance that describes details of the pause + future<> inject(const std::string_view& name, utils::wait_for_message wfm) { + co_await inject(name, [name, wfm] (injection_handler& handler) -> future<> { + errinj_logger.info("{}: waiting for message", name); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + wfm.timeout); + errinj_logger.info("{}: message received", name); + }); + } + template std::optional inject_parameter(const std::string_view& name) { auto* data = get_data(name); @@ -606,6 +626,12 @@ public: return make_ready_future<>(); } + // \brief Inject "breakpoint" + [[gnu::always_inline]] + future<> inject(const std::string_view& name, utils::wait_for_message wfm) { + return make_ready_future<>(); + } + template [[gnu::always_inline]] std::optional inject_parameter(const std::string_view& name) {