streaming: save a continuation lambda

In the following patches, an additional preemption point will be
added to the coroutine lambda in register_stream_mutation_fragments.

Assign a lambda to a variable to prolong the captures lifetime.

(cherry picked from commit 44748d624d)
This commit is contained in:
Aleksandra Martyniuk
2025-03-04 14:09:59 +01:00
parent 67b0ea99a0
commit b57774dea6

View File

@@ -209,12 +209,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
auto op = table.stream_in_progress();
auto sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this());
auto& sharder = *sharder_ptr;
//FIXME: discarded future.
(void)mutation_writer::distribute_reader_and_consume_on_shards(s, sharder,
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer(estimated_partitions, reason, topo_guard),
std::move(op)
).then_wrapped([s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable {
auto result_handling_cont = [s, plan_id, from, sink, estimated_partitions, log_done, sh_ptr = std::move(sharder_ptr)] (future<uint64_t> f) mutable -> future<> {
int32_t status = 0;
uint64_t received_partitions = 0;
if (f.failed()) {
@@ -241,13 +236,21 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
return sink(status).finally([sink] () mutable {
return sink.close();
});
}).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) {
auto level = seastar::log_level::error;
if (try_catch<seastar::rpc::closed_error>(ep)) {
level = seastar::log_level::debug;
}
sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}",
plan_id, s->ks_name(), s->cf_name(), from, ep);
};
//FIXME: discarded future.
(void)do_with(std::move(result_handling_cont), [&] (auto& result_handling) {
return mutation_writer::distribute_reader_and_consume_on_shards(s, sharder,
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
make_streaming_consumer(estimated_partitions, reason, topo_guard),
std::move(op)
).then_wrapped(std::ref(result_handling)).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) {
auto level = seastar::log_level::error;
if (try_catch<seastar::rpc::closed_error>(ep)) {
level = seastar::log_level::debug;
}
sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}",
plan_id, s->ks_name(), s->cf_name(), from, ep);
});
});
} catch (...) {
return sink.close().then([sink, eptr = std::current_exception()] () -> future<rpc::sink<int>> {