diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9aca3cc5df..96b72e27bd 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -993,8 +993,8 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); return rpc_client->make_stream_sink().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { - auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); - return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future> source) mutable { + auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink, service::session_id)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); + return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink, session).then_wrapped([sink, rpc_client] (future> source) mutable { return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { return make_ready_future(value_type(std::move(sink), source.get())); }); @@ -1002,7 +1002,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche }); } -void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::optional, rpc::source> source)>&& func) { +void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source> source, rpc::optional)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 99ab26d1d9..0211b8d047 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -382,7 +382,7 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use. - void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::optional, rpc::source> source)>&& func); + void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source, rpc::optional)>&& func); future<> unregister_stream_mutation_fragments(); rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); future, rpc::source>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3faec8c661..4347e8bc64 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -110,8 +110,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { }); ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, - rpc::optional session, - rpc::source> source) { + rpc::source> source, + rpc::optional session) { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);