From f103f75ed857fe0d16f5971a57fc4a2d5efd9300 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 1 Feb 2024 10:11:22 +0800 Subject: [PATCH 1/2] storage_service: Run stream_ranges cmd in streaming group Otherwise it will inherit the rpc verb's scheduling group which is gossip. As a result, it causes the streaming runs in the wrong scheduling group. Fixes #17090 --- service/storage_service.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 39a34113c3..2ea0393c30 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4654,12 +4654,13 @@ future storage_service::raft_topology_cmd_handler(raft } break; case raft_topology_cmd::command::stream_ranges: { + co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> { const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second; auto tstate = _topology_state_machine._topology.tstate; if (!rs.ring || (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) { rtlogger.warn("got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); - break; + co_return; } utils::get_local_injector().inject("stream_ranges_fail", @@ -4811,6 +4812,8 @@ future storage_service::raft_topology_cmd_handler(raft raft_server.id(), rs.state)); break; } + co_return; + })); } break; case raft_topology_cmd::command::wait_for_ip: { From e1fc91bea929321232a5caa538dac440f5886067 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 1 Feb 2024 10:23:19 +0800 Subject: [PATCH 2/2] streaming: Verify stream consumer runs inside streaming group This will catch schedule group leaks by accident. Refs: 17090 --- streaming/consumer.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/streaming/consumer.cc b/streaming/consumer.cc index f513cde7b9..30f44a5bf4 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,6 +29,11 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { + if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { + on_internal_error(sstables::sstlog, format("The stream consumer is not running in streaming group current_scheduling_group={}", + current_scheduling_group().name())); + } + auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); auto guard = service::topology_guard(*cf, frozen_guard); auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);