streaming: Always close the rpc::sink

rpc::sink::~sink aborts if not closed. There is a try/catch clause
which ensures that close() is called, but there was code after sink is
created which is not covered by it. Move sink construction past that
code.
This commit is contained in:
Tomasz Grabiec
2023-12-06 18:33:00 +01:00
parent 0e42fe4c3c
commit 063095ea50

View File

@@ -121,7 +121,6 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
return _mm.local().get_schema_for_write(schema_id, from, _ms.local(), as).then([this, from, estimated_partitions, plan_id, cf_id, source, reason] (schema_ptr s) mutable {
return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout, {}).then([this, from, estimated_partitions, plan_id, cf_id, source, reason, s] (reader_permit permit) mutable {
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
struct stream_mutation_fragments_cmd_status {
bool got_cmd = false;
bool got_end_of_stream = false;
@@ -163,6 +162,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
});
};
auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source);
try {
// Make sure the table with cf_id is still present at this point.
// Close the sink in case the table is dropped.