Files
scylladb/message/rpc_protocol_impl.hh
Patryk Jędrzejczak 4e63e74438 messaging: improve the error messages of closed_errors
The default error message of `closed_error` is "connection is closed".
It lacks the host ID and the IP address of the connected node, which
makes debugging harder. Also, it can be more specific when
`closed_error` is thrown due to the local node shutting down.

Fixes #16923

Closes scylladb/scylladb#27699
2025-12-29 18:36:07 +02:00

317 lines
15 KiB
C++

// SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
// Copyright 2021-present ScyllaDB
#pragma once
#include <seastar/rpc/rpc.hh>
#include "messaging_service.hh"
#include "serializer.hh"
#include "serializer_impl.hh"
#include "seastarx.hh"
#include "utils/exceptions.hh"
namespace netw {
struct serializer;
// thunk from rpc serializers to generate serializers
template <typename T, typename Output>
void write(serializer, Output& out, const T& data) {
ser::serialize(out, data);
}
template <typename T, typename Input>
T read(serializer, Input& in, std::type_identity<T> type) {
return ser::deserialize(in, type);
}
template <typename Output, typename T>
void write(serializer s, Output& out, const foreign_ptr<T>& v) {
return write(s, out, *v);
}
template <typename Input, typename T>
foreign_ptr<T> read(serializer s, Input& in, std::type_identity<foreign_ptr<T>>) {
return make_foreign(read(s, in, std::type_identity<T>()));
}
template <typename Output, typename T>
void write(serializer s, Output& out, const lw_shared_ptr<T>& v) {
return write(s, out, *v);
}
template <typename Input, typename T>
lw_shared_ptr<T> read(serializer s, Input& in, std::type_identity<lw_shared_ptr<T>>) {
return make_lw_shared<T>(read(s, in, std::type_identity<T>()));
}
using rpc_protocol = rpc::protocol<serializer, messaging_verb>;
class messaging_service::rpc_protocol_wrapper {
rpc_protocol _impl;
public:
explicit rpc_protocol_wrapper(serializer &&s) : _impl(std::move(s)) {}
rpc_protocol &protocol() { return _impl; }
template<typename Func>
auto make_client(messaging_verb t) { return _impl.make_client<Func>(t); }
template<typename Func>
auto register_handler(messaging_verb t, Func &&func) {
return _impl.register_handler(t, std::forward<Func>(func));
}
template<typename Func>
auto register_handler(messaging_verb t, scheduling_group sg, Func &&func) {
return _impl.register_handler(t, sg, std::forward<Func>(func));
}
future<> unregister_handler(messaging_verb t) { return _impl.unregister_handler(t); }
void set_logger(::seastar::logger *logger) { _impl.set_logger(logger); }
bool has_handler(messaging_verb msg_id) { return _impl.has_handler(msg_id); }
bool has_handlers() const noexcept { return _impl.has_handlers(); }
};
// This wrapper pretends to be rpc_protocol::client, but also handles
// stopping it before destruction, in case it wasn't stopped already.
// This should be integrated into messaging_service proper.
class messaging_service::rpc_protocol_client_wrapper {
std::unique_ptr<rpc_protocol::client> _p;
::shared_ptr<seastar::tls::server_credentials> _credentials;
public:
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
socket_address local = {})
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
}
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
socket_address local, ::shared_ptr<seastar::tls::server_credentials> c)
: _p(
std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local)),
_credentials(c) {}
auto get_stats() const { return _p->get_stats(); }
future<> stop() { return _p->stop(); }
bool error() {
return _p->error();
}
operator rpc_protocol::client &() { return *_p; }
/**
* #3787 Must ensure we use the right type of socket. I.e. tls or not.
* See above, we retain credentials object so we here can know if we
* are tls or not.
*/
template<typename Serializer, typename... Out>
future<rpc::sink<Out...>> make_stream_sink() {
if (_credentials) {
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
}
return _p->make_stream_sink<Serializer, Out...>();
}
};
// Register a handler (a callback lambda) for verb
template<typename Func>
void register_handler(messaging_service *ms, messaging_verb verb, Func &&func) {
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
auto send_message(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
}
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
auto& rpc_client = *rpc_client_ptr;
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
if (host_id) {
ms->remove_error_rpc_client(verb, *host_id);
} else {
ms->remove_error_rpc_client(verb, id);
}
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}
template <typename MsgIn, typename... MsgOut>
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
return send_message<MsgIn, MsgOut...>(ms, verb, std::nullopt, id, std::forward<MsgOut>(msg)...);
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
auto send_message(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
return send_message<MsgIn, MsgOut...>(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
}
// TODO: Remove duplicated code in send_message
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, Timeout timeout, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
}
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
auto& rpc_client = *rpc_client_ptr;
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
if (host_id) {
ms->remove_error_rpc_client(verb, *host_id);
} else {
ms->remove_error_rpc_client(verb, id);
}
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
return send_message_timeout<MsgIn, Timeout, MsgOut...>(ms, verb, std::nullopt, id, timeout, std::forward<MsgOut>(msg)...);
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
return send_message_timeout<MsgIn, MsgOut...>(ms, verb, std::optional{hid}, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
}
// Requesting abort on the provided abort_source drops the message from the outgoing queue (if it's still there)
// and causes the returned future to resolve exceptionally with `abort_requested_exception`.
// TODO: Remove duplicated code in send_message
template <typename MsgIn, typename... MsgOut>
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::optional<locator::host_id> host_id, msg_addr id, abort_source& as, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
}
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
auto& rpc_client = *rpc_client_ptr;
auto c = std::make_unique<seastar::rpc::cancellable>();
auto& c_ref = *c;
auto sub = as.subscribe([c = std::move(c)] () noexcept {
c->cancel();
});
if (!sub) {
return futurator::make_exception_future(abort_requested_exception{});
}
return rpc_handler(rpc_client, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
if (host_id) {
ms->remove_error_rpc_client(verb, *host_id);
} else {
ms->remove_error_rpc_client(verb, id);
}
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
} else if (try_catch<rpc::canceled_error>(eptr)) {
// Translate low-level canceled_error into high-level abort_requested_exception.
return futurator::make_exception_future(abort_requested_exception{});
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}
template <typename MsgIn, typename... MsgOut>
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_addr id, abort_source& as, MsgOut&&... msg) {
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, std::nullopt, id, as, std::forward<MsgOut>(msg)...);
}
template <typename MsgIn, typename... MsgOut>
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id id, abort_source& as, MsgOut&&... msg) {
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward<MsgOut>(msg)...);
}
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
}
auto address = ms->addr_for_host_id(host_id);
auto rpc_client_ptr = ms->get_rpc_client(verb, address, host_id);
auto& rpc_client = *rpc_client_ptr;
auto c = std::make_unique<seastar::rpc::cancellable>();
auto& c_ref = *c;
auto sub = as.subscribe([c = std::move(c)] () noexcept {
c->cancel();
});
if (!sub) {
return futurator::make_exception_future(abort_requested_exception{});
}
return rpc_handler(rpc_client, timeout, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, address, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
ms->remove_error_rpc_client(verb, host_id);
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
host_id, address.addr, exp->what())));
} else if (try_catch<rpc::canceled_error>(eptr)) {
// Translate low-level canceled_error into high-level abort_requested_exception.
return futurator::make_exception_future(abort_requested_exception{});
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}
// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
}
// Send one way message for verb
template <typename Timeout, typename... MsgOut>
auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
}
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, locator::host_id id, MsgOut&&... msg) {
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
}
// Send one way message for verb
template <typename Timeout, typename... MsgOut>
auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, locator::host_id id, Timeout timeout, MsgOut&&... msg) {
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
}
} // namespace netw