mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
messaging_service: Make rpc streaming sink respect tls connection
Fixes #3787 Message service streaming sink was created using direct call to rpc::client::make_sink. This in turn needs a new socker, which it creates completely ignoring what underlying transport is active for the client in question. Fix by retaining the tls credential pointer in the client wrapper, and using this in a sink method to determine whether to create a new tls socker, or just go ahead with a plain one. Message-Id: <20181010003249.30526-1-calle@scylladb.com>
This commit is contained in:
@@ -135,12 +135,14 @@ struct messaging_service::rpc_protocol_wrapper : public rpc_protocol { using rpc
|
||||
// This should be integrated into messaging_service proper.
|
||||
class messaging_service::rpc_protocol_client_wrapper {
|
||||
std::unique_ptr<rpc_protocol::client> _p;
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
public:
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local = ipv4_addr())
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
||||
}
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, ipv4_addr addr, ipv4_addr local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local))
|
||||
, _credentials(c)
|
||||
{}
|
||||
auto get_stats() const { return _p->get_stats(); }
|
||||
future<> stop() { return _p->stop(); }
|
||||
@@ -148,6 +150,19 @@ public:
|
||||
return _p->error();
|
||||
}
|
||||
operator rpc_protocol::client&() { return *_p; }
|
||||
|
||||
/**
|
||||
* #3787 Must ensure we use the right type of socker. I.e. tls or not.
|
||||
* See above, we retain credentials object so we here can know if we
|
||||
* are tls or not.
|
||||
*/
|
||||
template<typename Serializer, typename... Out>
|
||||
future<rpc::sink<Out...>> make_stream_sink() {
|
||||
if (_credentials) {
|
||||
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
|
||||
}
|
||||
return _p->make_stream_sink<Serializer, Out...>();
|
||||
}
|
||||
};
|
||||
|
||||
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
|
||||
@@ -639,8 +654,9 @@ rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rp
|
||||
|
||||
future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>
|
||||
messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUID schema_id, utils::UUID plan_id, utils::UUID cf_id, uint64_t estimated_partitions, msg_addr id) {
|
||||
rpc_protocol::client& rpc_client = *get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
|
||||
return rpc_client.make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
|
||||
auto wrapper = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
|
||||
rpc_protocol::client& rpc_client = *wrapper;
|
||||
return wrapper->make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, &rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
|
||||
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, rpc::sink<frozen_mutation_fragment>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
|
||||
return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, sink).then([sink] (rpc::source<int32_t> source) mutable {
|
||||
return make_ready_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(std::move(sink), std::move(source));
|
||||
|
||||
Reference in New Issue
Block a user