Merge 'storage_service: Run stream_ranges cmd in streaming group' from Asias He

Otherwise it will inherit the rpc verb's scheduling group which is gossip. As a result, it causes the streaming runs in the wrong scheduling group.

Fixes #17090

Closes scylladb/scylladb#17097

* github.com:scylladb/scylladb:
  streaming: Verify stream consumer runs inside streaming group
  storage_service: Run stream_ranges cmd in streaming group
This commit is contained in:
Botond Dénes
2024-02-01 13:18:26 +02:00
2 changed files with 9 additions and 1 deletions

View File

@@ -4750,12 +4750,13 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
break;
case raft_topology_cmd::command::stream_ranges: {
co_await with_scheduling_group(_db.local().get_streaming_scheduling_group(), coroutine::lambda([&] () -> future<> {
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
auto tstate = _topology_state_machine._topology.tstate;
if (!rs.ring ||
(tstate != topology::transition_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) {
rtlogger.warn("got stream_ranges request while my tokens state is {} and node state is {}", tstate, rs.state);
break;
co_return;
}
utils::get_local_injector().inject("stream_ranges_fail",
@@ -4906,6 +4907,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
raft_server.id(), rs.state));
break;
}
co_return;
}));
}
break;
case raft_topology_cmd::command::wait_for_ip: {

View File

@@ -29,6 +29,11 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin), frozen_guard] (flat_mutation_reader_v2 reader) -> future<> {
std::exception_ptr ex;
try {
if (current_scheduling_group() != db.local().get_streaming_scheduling_group()) {
on_internal_error(sstables::sstlog, format("The stream consumer is not running in streaming group current_scheduling_group={}",
current_scheduling_group().name()));
}
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto guard = service::topology_guard(*cf, frozen_guard);
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);