diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 58a61d7606..62998390ea 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -121,7 +121,6 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable { return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable { - auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; bool got_end_of_stream = false; @@ -163,6 +162,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } }); }; + auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); try { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped.