diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 8ea5edde2f..65e12bfe25 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -172,7 +172,7 @@ void stream_session::init_messaging_service_handler() { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", utils::fb_utilities::get_broadcast_address()))); } - return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(), [from, estimated_partitions, plan_id, schema_id, &cf, source, reason] () mutable { + return service::get_schema_for_write(schema_id, from).then([from, estimated_partitions, plan_id, schema_id, &cf, source, reason] (schema_ptr s) mutable { auto sink = ms().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { @@ -270,7 +270,6 @@ void stream_session::init_messaging_service_handler() { }); return make_ready_future>(sink); }); - }); }); ms().register_stream_mutation_done([] (const rpc::client_info& cinfo, UUID plan_id, dht::token_range_vector ranges, UUID cf_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr");