diff --git a/message/messaging_service.cc b/message/messaging_service.cc index cb3c544c5c..6f5c5821c8 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,12 +135,14 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc // This should be integrated into messaging_service proper. class messaging_service::rpc_protocol_client_wrapper { std::unique_ptr _p; + ::shared_ptr _credentials; public: rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr()) : _p(std::make_unique(proto, std::move(opts), addr, local)) { } rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr c) : _p(std::make_unique(proto, std::move(opts), seastar::tls::socket(c), addr, local)) + , _credentials(c) {} auto get_stats() const { return _p->get_stats(); } future<> stop() { return _p->stop(); } @@ -148,6 +150,19 @@ public: return _p->error(); } operator rpc_protocol::client&() { return *_p; } + + /** + * #3787 Must ensure we use the right type of socker. I.e. tls or not. + * See above, we retain credentials object so we here can know if we + * are tls or not. + */ + template + future> make_stream_sink() { + if (_credentials) { + return _p->make_stream_sink(seastar::tls::socket(_credentials)); + } + return _p->make_stream_sink(); + } }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; @@ -639,8 +654,9 @@ rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rp future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, msg_addr id) { - rpc_protocol::client& rpc_client = *get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); - return rpc_client.make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink sink) mutable { + auto wrapper = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); + rpc_protocol::client& rpc_client = *wrapper; + return wrapper->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink sink) mutable { auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, sink).then([sink] (rpc::source source) mutable { return make_ready_future, rpc::source>(std::move(sink), std::move(source));