From e7e1f4b01a98c538cd4afe58353d17192a4e96a4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 6 Feb 2024 09:41:33 +0800 Subject: [PATCH] streaming: Fix rpc::source and rpc::optional parameter order The new rpc::optional parameter must come after any existing parameters, including the rpc::source parameters, otherwise it will break compatibility. The regression was introduced in: ``` commit fd3c089ccc8be68bbf43787a3f4dfc6a9eb25b55 Author: Tomasz Grabiec Date: Thu Oct 26 00:35:19 2023 +0200 service: range_streamer: Propagate topology_guard to receivers ``` We need to backport this patch ASAP before we release anything that contains commit fd3c089ccc8be68bbf43787a3f4dfc6a9eb25b55. Refs: #16941 Fixes: #17175 Closes scylladb/scylladb#17176 --- message/messaging_service.cc | 6 +++--- message/messaging_service.hh | 2 +- streaming/stream_session.cc | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9aca3cc5df..96b72e27bd 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -993,8 +993,8 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); return rpc_client->make_stream_sink().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { - auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); - return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future> source) mutable { + auto rpc_handler = rpc()->make_client (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink, service::session_id)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); + return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink, session).then_wrapped([sink, rpc_client] (future> source) mutable { return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable { return make_ready_future(value_type(std::move(sink), source.get())); }); @@ -1002,7 +1002,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche }); } -void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::optional, rpc::source> source)>&& func) { +void messaging_service::register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional, rpc::source> source, rpc::optional)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func)); } diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 99ab26d1d9..0211b8d047 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -382,7 +382,7 @@ public: // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use. - void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::optional, rpc::source> source)>&& func); + void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source, rpc::optional)>&& func); future<> unregister_stream_mutation_fragments(); rpc::sink make_sink_for_stream_mutation_fragments(rpc::source>& source); future, rpc::source>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3faec8c661..4347e8bc64 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -110,8 +110,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { }); ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, - rpc::optional session, - rpc::source> source) { + rpc::source> source, + rpc::optional session) { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);