diff --git a/service/storage_service.cc b/service/storage_service.cc index eaa20eeb5a..7387a57abe 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4750,12 +4750,13 @@ future storage_service::raft_topology_cmd_handler(raft } break; case raft_topology_cmd::command::stream_ranges: { + co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> { const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second; auto tstate = _topology_state_machine._topology.tstate; if (!rs.ring || (tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) { rtlogger.warn("got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state); - break; + co_return; } utils::get_local_injector().inject("stream_ranges_fail", @@ -4906,6 +4907,8 @@ future storage_service::raft_topology_cmd_handler(raft raft_server.id(), rs.state)); break; } + co_return; + })); } break; case raft_topology_cmd::command::wait_for_ip: { diff --git a/streaming/consumer.cc b/streaming/consumer.cc index f513cde7b9..30f44a5bf4 100644 --- a/streaming/consumer.cc +++ b/streaming/consumer.cc @@ -29,6 +29,11 @@ std::function (flat_mutation_reader_v2)> make_streaming_consumer(sstrin return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> { std::exception_ptr ex; try { + if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) { + on_internal_error(sstables::sstlog, format("The stream consumer is not running in streaming group current_scheduling_group={}", + current_scheduling_group().name())); + } + auto cf = db.local().find_column_family(reader.schema()).shared_from_this(); auto guard = service::topology_guard(*cf, frozen_guard); auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);