The default error message of `closed_error` is "connection is closed". It lacks the host ID and the IP address of the connected node, which makes debugging harder. Also, it can be more specific when `closed_error` is thrown due to the local node shutting down. Fixes #16923 Closes scylladb/scylladb#27699
317 lines
15 KiB
C++
317 lines
15 KiB
C++
// SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
// Copyright 2021-present ScyllaDB
|
|
|
|
#pragma once
|
|
|
|
#include <seastar/rpc/rpc.hh>
|
|
#include "messaging_service.hh"
|
|
#include "serializer.hh"
|
|
#include "serializer_impl.hh"
|
|
#include "seastarx.hh"
|
|
#include "utils/exceptions.hh"
|
|
|
|
namespace netw {
|
|
|
|
struct serializer;
|
|
|
|
// thunk from rpc serializers to generate serializers
|
|
template <typename T, typename Output>
|
|
void write(serializer, Output& out, const T& data) {
|
|
ser::serialize(out, data);
|
|
}
|
|
template <typename T, typename Input>
|
|
T read(serializer, Input& in, std::type_identity<T> type) {
|
|
return ser::deserialize(in, type);
|
|
}
|
|
|
|
template <typename Output, typename T>
|
|
void write(serializer s, Output& out, const foreign_ptr<T>& v) {
|
|
return write(s, out, *v);
|
|
}
|
|
template <typename Input, typename T>
|
|
foreign_ptr<T> read(serializer s, Input& in, std::type_identity<foreign_ptr<T>>) {
|
|
return make_foreign(read(s, in, std::type_identity<T>()));
|
|
}
|
|
|
|
template <typename Output, typename T>
|
|
void write(serializer s, Output& out, const lw_shared_ptr<T>& v) {
|
|
return write(s, out, *v);
|
|
}
|
|
template <typename Input, typename T>
|
|
lw_shared_ptr<T> read(serializer s, Input& in, std::type_identity<lw_shared_ptr<T>>) {
|
|
return make_lw_shared<T>(read(s, in, std::type_identity<T>()));
|
|
}
|
|
|
|
using rpc_protocol = rpc::protocol<serializer, messaging_verb>;
|
|
|
|
class messaging_service::rpc_protocol_wrapper {
|
|
rpc_protocol _impl;
|
|
public:
|
|
explicit rpc_protocol_wrapper(serializer &&s) : _impl(std::move(s)) {}
|
|
|
|
rpc_protocol &protocol() { return _impl; }
|
|
|
|
template<typename Func>
|
|
auto make_client(messaging_verb t) { return _impl.make_client<Func>(t); }
|
|
|
|
template<typename Func>
|
|
auto register_handler(messaging_verb t, Func &&func) {
|
|
return _impl.register_handler(t, std::forward<Func>(func));
|
|
}
|
|
|
|
template<typename Func>
|
|
auto register_handler(messaging_verb t, scheduling_group sg, Func &&func) {
|
|
return _impl.register_handler(t, sg, std::forward<Func>(func));
|
|
}
|
|
|
|
future<> unregister_handler(messaging_verb t) { return _impl.unregister_handler(t); }
|
|
|
|
void set_logger(::seastar::logger *logger) { _impl.set_logger(logger); }
|
|
|
|
bool has_handler(messaging_verb msg_id) { return _impl.has_handler(msg_id); }
|
|
|
|
bool has_handlers() const noexcept { return _impl.has_handlers(); }
|
|
};
|
|
|
|
// This wrapper pretends to be rpc_protocol::client, but also handles
|
|
// stopping it before destruction, in case it wasn't stopped already.
|
|
// This should be integrated into messaging_service proper.
|
|
class messaging_service::rpc_protocol_client_wrapper {
|
|
std::unique_ptr<rpc_protocol::client> _p;
|
|
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
|
public:
|
|
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
|
|
socket_address local = {})
|
|
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
|
}
|
|
|
|
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
|
|
socket_address local, ::shared_ptr<seastar::tls::server_credentials> c)
|
|
: _p(
|
|
std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local)),
|
|
_credentials(c) {}
|
|
|
|
auto get_stats() const { return _p->get_stats(); }
|
|
|
|
future<> stop() { return _p->stop(); }
|
|
|
|
bool error() {
|
|
return _p->error();
|
|
}
|
|
|
|
operator rpc_protocol::client &() { return *_p; }
|
|
|
|
/**
|
|
* #3787 Must ensure we use the right type of socket. I.e. tls or not.
|
|
* See above, we retain credentials object so we here can know if we
|
|
* are tls or not.
|
|
*/
|
|
template<typename Serializer, typename... Out>
|
|
future<rpc::sink<Out...>> make_stream_sink() {
|
|
if (_credentials) {
|
|
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
|
|
}
|
|
return _p->make_stream_sink<Serializer, Out...>();
|
|
}
|
|
};
|
|
|
|
// Register a handler (a callback lambda) for verb
|
|
template<typename Func>
|
|
void register_handler(messaging_service *ms, messaging_verb verb, Func &&func) {
|
|
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
|
|
}
|
|
|
|
// Send a message for verb
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, MsgOut&&... msg) {
|
|
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
|
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
|
if (ms->is_shutting_down()) {
|
|
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
|
}
|
|
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
|
auto& rpc_client = *rpc_client_ptr;
|
|
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
|
|
ms->increment_dropped_messages(verb);
|
|
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
|
// This is a transport error
|
|
if (host_id) {
|
|
ms->remove_error_rpc_client(verb, *host_id);
|
|
} else {
|
|
ms->remove_error_rpc_client(verb, id);
|
|
}
|
|
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
|
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
|
} else {
|
|
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
|
return futurator::make_exception_future(std::move(eptr));
|
|
}
|
|
});
|
|
}
|
|
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
|
return send_message<MsgIn, MsgOut...>(ms, verb, std::nullopt, id, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
// Send a message for verb
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
|
|
return send_message<MsgIn, MsgOut...>(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
// TODO: Remove duplicated code in send_message
|
|
template <typename MsgIn, typename Timeout, typename... MsgOut>
|
|
auto send_message_timeout(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, Timeout timeout, MsgOut&&... msg) {
|
|
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
|
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
|
if (ms->is_shutting_down()) {
|
|
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
|
}
|
|
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
|
auto& rpc_client = *rpc_client_ptr;
|
|
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
|
|
ms->increment_dropped_messages(verb);
|
|
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
|
// This is a transport error
|
|
if (host_id) {
|
|
ms->remove_error_rpc_client(verb, *host_id);
|
|
} else {
|
|
ms->remove_error_rpc_client(verb, id);
|
|
}
|
|
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
|
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
|
} else {
|
|
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
|
return futurator::make_exception_future(std::move(eptr));
|
|
}
|
|
});
|
|
}
|
|
|
|
template <typename MsgIn, typename Timeout, typename... MsgOut>
|
|
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
|
|
return send_message_timeout<MsgIn, Timeout, MsgOut...>(ms, verb, std::nullopt, id, timeout, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
|
|
// Send a message for verb
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message_timeout(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
|
|
return send_message_timeout<MsgIn, MsgOut...>(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
// Requesting abort on the provided abort_source drops the message from the outgoing queue (if it's still there)
|
|
// and causes the returned future to resolve exceptionally with `abort_requested_exception`.
|
|
// TODO: Remove duplicated code in send_message
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, abort_source& as, MsgOut&&... msg) {
|
|
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
|
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
|
if (ms->is_shutting_down()) {
|
|
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
|
}
|
|
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
|
auto& rpc_client = *rpc_client_ptr;
|
|
|
|
auto c = std::make_unique<seastar::rpc::cancellable>();
|
|
auto& c_ref = *c;
|
|
auto sub = as.subscribe([c = std::move(c)] () noexcept {
|
|
c->cancel();
|
|
});
|
|
if (!sub) {
|
|
return futurator::make_exception_future(abort_requested_exception{});
|
|
}
|
|
|
|
return rpc_handler(rpc_client, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
|
|
ms->increment_dropped_messages(verb);
|
|
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
|
// This is a transport error
|
|
if (host_id) {
|
|
ms->remove_error_rpc_client(verb, *host_id);
|
|
} else {
|
|
ms->remove_error_rpc_client(verb, id);
|
|
}
|
|
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
|
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
|
} else if (try_catch<rpc::canceled_error>(eptr)) {
|
|
// Translate low-level canceled_error into high-level abort_requested_exception.
|
|
return futurator::make_exception_future(abort_requested_exception{});
|
|
} else {
|
|
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
|
return futurator::make_exception_future(std::move(eptr));
|
|
}
|
|
});
|
|
}
|
|
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_addr id, abort_source& as, MsgOut&&... msg) {
|
|
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, std::nullopt, id, as, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
template <typename MsgIn, typename... MsgOut>
|
|
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id id, abort_source& as, MsgOut&&... msg) {
|
|
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
template <typename MsgIn, typename Timeout, typename... MsgOut>
|
|
auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) {
|
|
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
|
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
|
if (ms->is_shutting_down()) {
|
|
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
|
}
|
|
auto address = ms->addr_for_host_id(host_id);
|
|
auto rpc_client_ptr = ms->get_rpc_client(verb, address, host_id);
|
|
auto& rpc_client = *rpc_client_ptr;
|
|
|
|
auto c = std::make_unique<seastar::rpc::cancellable>();
|
|
auto& c_ref = *c;
|
|
auto sub = as.subscribe([c = std::move(c)] () noexcept {
|
|
c->cancel();
|
|
});
|
|
if (!sub) {
|
|
return futurator::make_exception_future(abort_requested_exception{});
|
|
}
|
|
|
|
return rpc_handler(rpc_client, timeout, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, address, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
|
|
ms->increment_dropped_messages(verb);
|
|
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
|
// This is a transport error
|
|
ms->remove_error_rpc_client(verb, host_id);
|
|
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
|
host_id, address.addr, exp->what())));
|
|
} else if (try_catch<rpc::canceled_error>(eptr)) {
|
|
// Translate low-level canceled_error into high-level abort_requested_exception.
|
|
return futurator::make_exception_future(abort_requested_exception{});
|
|
} else {
|
|
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
|
return futurator::make_exception_future(std::move(eptr));
|
|
}
|
|
});
|
|
}
|
|
|
|
// Send one way message for verb
|
|
template <typename... MsgOut>
|
|
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
|
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
// Send one way message for verb
|
|
template <typename Timeout, typename... MsgOut>
|
|
auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
|
|
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
template <typename... MsgOut>
|
|
auto send_message_oneway(messaging_service* ms, messaging_verb verb, locator::host_id id, MsgOut&&... msg) {
|
|
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
// Send one way message for verb
|
|
template <typename Timeout, typename... MsgOut>
|
|
auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, locator::host_id id, Timeout timeout, MsgOut&&... msg) {
|
|
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
|
|
}
|
|
|
|
} // namespace netw
|