// SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 // Copyright 2021-present ScyllaDB #pragma once #include #include "messaging_service.hh" #include "serializer.hh" #include "serializer_impl.hh" #include "seastarx.hh" #include "utils/exceptions.hh" namespace netw { struct serializer; // thunk from rpc serializers to generate serializers template void write(serializer, Output& out, const T& data) { ser::serialize(out, data); } template T read(serializer, Input& in, std::type_identity type) { return ser::deserialize(in, type); } template void write(serializer s, Output& out, const foreign_ptr& v) { return write(s, out, *v); } template foreign_ptr read(serializer s, Input& in, std::type_identity>) { return make_foreign(read(s, in, std::type_identity())); } template void write(serializer s, Output& out, const lw_shared_ptr& v) { return write(s, out, *v); } template lw_shared_ptr read(serializer s, Input& in, std::type_identity>) { return make_lw_shared(read(s, in, std::type_identity())); } using rpc_protocol = rpc::protocol; class messaging_service::rpc_protocol_wrapper { rpc_protocol _impl; public: explicit rpc_protocol_wrapper(serializer &&s) : _impl(std::move(s)) {} rpc_protocol &protocol() { return _impl; } template auto make_client(messaging_verb t) { return _impl.make_client(t); } template auto register_handler(messaging_verb t, Func &&func) { return _impl.register_handler(t, std::forward(func)); } template auto register_handler(messaging_verb t, scheduling_group sg, Func &&func) { return _impl.register_handler(t, sg, std::forward(func)); } future<> unregister_handler(messaging_verb t) { return _impl.unregister_handler(t); } void set_logger(::seastar::logger *logger) { _impl.set_logger(logger); } bool has_handler(messaging_verb msg_id) { return _impl.has_handler(msg_id); } bool has_handlers() const noexcept { return _impl.has_handlers(); } }; // This wrapper pretends to be rpc_protocol::client, but also handles // stopping it before destruction, in case it wasn't stopped already. // This should be integrated into messaging_service proper. class messaging_service::rpc_protocol_client_wrapper { std::unique_ptr _p; ::shared_ptr _credentials; public: rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr, socket_address local = {}) : _p(std::make_unique(proto, std::move(opts), addr, local)) { } rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr, socket_address local, ::shared_ptr c) : _p( std::make_unique(proto, std::move(opts), seastar::tls::socket(c), addr, local)), _credentials(c) {} auto get_stats() const { return _p->get_stats(); } future<> stop() { return _p->stop(); } bool error() { return _p->error(); } operator rpc_protocol::client &() { return *_p; } /** * #3787 Must ensure we use the right type of socket. I.e. tls or not. * See above, we retain credentials object so we here can know if we * are tls or not. */ template future> make_stream_sink() { if (_credentials) { return _p->make_stream_sink(seastar::tls::socket(_credentials)); } return _p->make_stream_sink(); } }; // Register a handler (a callback lambda) for verb template void register_handler(messaging_service *ms, messaging_verb verb, Func &&func) { ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func)); } // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, std::optional host_id, msg_addr id, MsgOut&&... msg) { auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { return futurator::make_exception_future(rpc::closed_error()); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; return rpc_handler(rpc_client, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); if (try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } return futurator::make_exception_future(std::move(eptr)); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); } }); } template auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { return send_message(ms, verb, std::nullopt, id, std::forward(msg)...); } // Send a message for verb template auto send_message(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) { return send_message(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward(msg)...); } // TODO: Remove duplicated code in send_message template auto send_message_timeout(messaging_service* ms, messaging_verb verb, std::optional host_id, msg_addr id, Timeout timeout, MsgOut&&... msg) { auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { return futurator::make_exception_future(rpc::closed_error()); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; return rpc_handler(rpc_client, timeout, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); if (try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } return futurator::make_exception_future(std::move(eptr)); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); } }); } template auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) { return send_message_timeout(ms, verb, std::nullopt, id, timeout, std::forward(msg)...); } // Send a message for verb template auto send_message_timeout(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) { return send_message_timeout(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward(msg)...); } // Requesting abort on the provided abort_source drops the message from the outgoing queue (if it's still there) // and causes the returned future to resolve exceptionally with `abort_requested_exception`. // TODO: Remove duplicated code in send_message template auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::optional host_id, msg_addr id, abort_source& as, MsgOut&&... msg) { auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { return futurator::make_exception_future(rpc::closed_error()); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; auto c = std::make_unique(); auto& c_ref = *c; auto sub = as.subscribe([c = std::move(c)] () noexcept { c->cancel(); }); if (!sub) { return futurator::make_exception_future(abort_requested_exception{}); } return rpc_handler(rpc_client, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); if (try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } return futurator::make_exception_future(std::move(eptr)); } else if (try_catch(eptr)) { // Translate low-level canceled_error into high-level abort_requested_exception. return futurator::make_exception_future(abort_requested_exception{}); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); } }); } template auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_addr id, abort_source& as, MsgOut&&... msg) { return send_message_cancellable(ms, verb, std::nullopt, id, as, std::forward(msg)...); } template auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id id, abort_source& as, MsgOut&&... msg) { return send_message_cancellable(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward(msg)...); } template auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) { auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { return futurator::make_exception_future(rpc::closed_error()); } auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id); auto& rpc_client = *rpc_client_ptr; auto c = std::make_unique(); auto& c_ref = *c; auto sub = as.subscribe([c = std::move(c)] () noexcept { c->cancel(); }); if (!sub) { return futurator::make_exception_future(abort_requested_exception{}); } return rpc_handler(rpc_client, timeout, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); if (try_catch(eptr)) { // This is a transport error ms->remove_error_rpc_client(verb, host_id); return futurator::make_exception_future(std::move(eptr)); } else if (try_catch(eptr)) { // Translate low-level canceled_error into high-level abort_requested_exception. return futurator::make_exception_future(abort_requested_exception{}); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); } }); } // Send one way message for verb template auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { return send_message(ms, std::move(verb), std::move(id), std::forward(msg)...); } // Send one way message for verb template auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) { return send_message_timeout(ms, std::move(verb), std::move(id), timeout, std::forward(msg)...); } template auto send_message_oneway(messaging_service* ms, messaging_verb verb, locator::host_id id, MsgOut&&... msg) { return send_message(ms, std::move(verb), std::move(id), std::forward(msg)...); } // Send one way message for verb template auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, locator::host_id id, Timeout timeout, MsgOut&&... msg) { return send_message_timeout(ms, std::move(verb), std::move(id), timeout, std::forward(msg)...); } } // namespace netw