diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 16d0128edd..ad4b345efc 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -209,12 +209,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { auto op = table.stream_in_progress(); auto sharder_ptr = std::make_unique(table.shared_from_this()); auto& sharder = *sharder_ptr; - //FIXME: discarded future. - (void)mutation_writer::distribute_reader_and_consume_on_shards(s, sharder, - make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), - make_streaming_consumer(estimated_partitions, reason, topo_guard), - std::move(op) - ).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future f) mutable { + auto result_handling_cont = [s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future f) mutable -> future<> { int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { @@ -241,13 +236,21 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return sink(status).finally([sink] () mutable { return sink.close(); }); - }).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) { - auto level = seastar::log_level::error; - if (try_catch(ep)) { - level = seastar::log_level::debug; - } - sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", - plan_id, s->ks_name(), s->cf_name(), from, ep); + }; + //FIXME: discarded future. + (void)do_with(std::move(result_handling_cont), [&] (auto& result_handling) { + return mutation_writer::distribute_reader_and_consume_on_shards(s, sharder, + make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), + make_streaming_consumer(estimated_partitions, reason, topo_guard), + std::move(op) + ).then_wrapped(std::ref(result_handling)).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) { + auto level = seastar::log_level::error; + if (try_catch(ep)) { + level = seastar::log_level::debug; + } + sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", + plan_id, s->ks_name(), s->cf_name(), from, ep); + }); }); } catch (...) { return sink.close().then([sink, eptr = std::current_exception()] () -> future> {