From 3cb50c861d5585d7b00af5d6e893449abbecbebd Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 10 Oct 2018 00:32:49 +0000 Subject: [PATCH] messaging_service: Make rpc streaming sink respect tls connection Fixes #3787 Message service streaming sink was created using direct call to rpc::client::make_sink. This in turn needs a new socker, which it creates completely ignoring what underlying transport is active for the client in question. Fix by retaining the tls credential pointer in the client wrapper, and using this in a sink method to determine whether to create a new tls socker, or just go ahead with a plain one. Message-Id: <20181010003249.30526-1-calle@scylladb.com> --- message/messaging_service.cc | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index cb3c544c5c..6f5c5821c8 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -135,12 +135,14 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc // This should be integrated into messaging_service proper. class messaging_service::rpc_protocol_client_wrapper { std::unique_ptr _p; + ::shared_ptr _credentials; public: rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr()) : _p(std::make_unique(proto, std::move(opts), addr, local)) { } rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr c) : _p(std::make_unique(proto, std::move(opts), seastar::tls::socket(c), addr, local)) + , _credentials(c) {} auto get_stats() const { return _p->get_stats(); } future<> stop() { return _p->stop(); } @@ -148,6 +150,19 @@ public: return _p->error(); } operator rpc_protocol::client&() { return *_p; } + + /** + * #3787 Must ensure we use the right type of socker. I.e. tls or not. + * See above, we retain credentials object so we here can know if we + * are tls or not. + */ + template + future> make_stream_sink() { + if (_credentials) { + return _p->make_stream_sink(seastar::tls::socket(_credentials)); + } + return _p->make_stream_sink(); + } }; struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; }; @@ -639,8 +654,9 @@ rpc::sink messaging_service::make_sink_for_stream_mutation_fragments(rp future, rpc::source> messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, msg_addr id) { - rpc_protocol::client& rpc_client = *get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); - return rpc_client.make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink sink) mutable { + auto wrapper = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id); + rpc_protocol::client& rpc_client = *wrapper; + return wrapper->make_stream_sink().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink sink) mutable { auto rpc_handler = rpc()->make_client (utils::UUID, utils::UUID, utils::UUID, uint64_t, rpc::sink)>(messaging_verb::STREAM_MUTATION_FRAGMENTS); return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, sink).then([sink] (rpc::source source) mutable { return make_ready_future, rpc::source>(std::move(sink), std::move(source));