messaging_service: Make rpc streaming sink respect tls connection

Fixes #3787

Message service streaming sink was created using direct call to
rpc::client::make_sink. This in turn needs a new socker, which it
creates completely ignoring what underlying transport is active for the
client in question.

Fix by retaining the tls credential pointer in the client wrapper, and
using this in a sink method to determine whether to create a new tls
socker, or just go ahead with a plain one.

Message-Id: <20181010003249.30526-1-calle@scylladb.com>
(cherry picked from commit 3cb50c861d)
This commit is contained in:
Calle Wilund
2018-10-10 00:32:49 +00:00
committed by Avi Kivity
parent 7b34d54a96
commit 76ff2e5c3d

View File

@@ -135,12 +135,14 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
// This should be integrated into messaging_service proper.
class messaging_service::rpc_protocol_client_wrapper {
std::unique_ptr<rpc_protocol::client> _p;
::shared_ptr<seastar::tls::server_credentials> _credentials;
public:
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
}
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local))
, _credentials(c)
{}
auto get_stats() const { return _p->get_stats(); }
future<> stop() { return _p->stop(); }
@@ -148,6 +150,19 @@ public:
return _p->error();
}
operator rpc_protocol::client&() { return *_p; }
/**
* #3787 Must ensure we use the right type of socker. I.e. tls or not.
* See above, we retain credentials object so we here can know if we
* are tls or not.
*/
template<typename Serializer, typename... Out>
future<rpc::sink<Out...>> make_stream_sink() {
if (_credentials) {
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
}
return _p->make_stream_sink<Serializer, Out...>();
}
};
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
@@ -639,8 +654,9 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, msg_addr id) {
rpc_protocol::client& rpc_client = *get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client.make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
auto wrapper = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
rpc_protocol::client& rpc_client = *wrapper;
return wrapper->make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, rpc::sink<frozen_mutation_fragment>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, sink).then([sink] (rpc::source<int32_t> source) mutable {
return make_ready_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(std::move(sink), std::move(source));