streaming: Fix rpc::source and rpc::optional parameter order

The new rpc::optional parameter must come after any existing parameters,
including the rpc::source parameters, otherwise it will break
compatibility.

The regression was introduced in:

```
commit fd3c089ccc
Author: Tomasz Grabiec <tgrabiec@scylladb.com>
Date:   Thu Oct 26 00:35:19 2023 +0200

    service: range_streamer: Propagate topology_guard to receivers
```

We need to backport this patch ASAP before we release anything that
contains commit fd3c089ccc.

Refs: #16941
Fixes: #17175

Closes scylladb/scylladb#17176
This commit is contained in:
Asias He
2024-02-06 09:41:33 +08:00
committed by Tomasz Grabiec
parent a3d4131918
commit e7e1f4b01a
3 changed files with 6 additions and 6 deletions

View File

@@ -993,8 +993,8 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, service::session_id, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, session, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, service::session_id)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink, session).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<value_type>(value_type(std::move(sink), source.get()));
});
@@ -1002,7 +1002,7 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_sche
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source, rpc::optional<service::session_id>)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}

View File

@@ -382,7 +382,7 @@ public:
// Wrapper for STREAM_MUTATION_FRAGMENTS
// The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use.
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::optional<service::session_id>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func);
void register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason> reason_opt, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source, rpc::optional<service::session_id>)>&& func);
future<> unregister_stream_mutation_fragments();
rpc::sink<int32_t> make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source);
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>> make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, msg_addr id);

View File

@@ -110,8 +110,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
});
ms.register_stream_mutation_fragments([this, &as] (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions,
rpc::optional<stream_reason> reason_opt,
rpc::optional<service::session_id> session,
rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source) {
rpc::source<frozen_mutation_fragment, rpc::optional<stream_mutation_fragments_cmd>> source,
rpc::optional<service::session_id> session) {
auto from = netw::messaging_service::get_source(cinfo);
auto reason = reason_opt ? *reason_opt: stream_reason::unspecified;
service::frozen_topology_guard topo_guard = session.value_or(service::default_session_id);