mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
messaging: improve the error messages of closed_errors
The default error message of `closed_error` is "connection is closed". It lacks the host ID and the IP address of the connected node, which makes debugging harder. Also, it can be more specific when `closed_error` is thrown due to the local node shutting down. Fixes #16923 Closes scylladb/scylladb#27699
This commit is contained in:
committed by
Avi Kivity
parent
567c28dd0d
commit
4e63e74438
@@ -1292,7 +1292,7 @@ future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation
|
||||
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, service::session_id session, locator::host_id id) {
|
||||
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
|
||||
if (is_shutting_down()) {
|
||||
return make_exception_future<value_type>(rpc::closed_error());
|
||||
return make_exception_future<value_type>(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, addr_for_host_id(id), id);
|
||||
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
|
||||
@@ -1321,7 +1321,7 @@ rpc::sink<streaming::stream_blob_cmd_data> messaging_service::make_sink_for_stre
|
||||
future<std::tuple<rpc::sink<streaming::stream_blob_cmd_data>, rpc::source<streaming::stream_blob_cmd_data>>>
|
||||
messaging_service::make_sink_and_source_for_stream_blob(streaming::stream_blob_meta meta, locator::host_id id) {
|
||||
if (is_shutting_down()) {
|
||||
co_await coroutine::return_exception(rpc::closed_error());
|
||||
co_await coroutine::return_exception(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client = get_rpc_client(messaging_verb::STREAM_BLOB, addr_for_host_id(id), id);
|
||||
auto sink = co_await rpc_client->make_stream_sink<netw::serializer, streaming::stream_blob_cmd_data>();
|
||||
@@ -1370,7 +1370,7 @@ future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wir
|
||||
messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, shard_id dst_cpu_id, locator::host_id id) {
|
||||
auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM;
|
||||
if (is_shutting_down()) {
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>(rpc::closed_error());
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id);
|
||||
return do_make_sink_source<repair_hash_with_cmd, repair_row_on_wire_with_cmd>(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc());
|
||||
@@ -1392,7 +1392,7 @@ future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_str
|
||||
messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, shard_id dst_cpu_id, locator::host_id id) {
|
||||
auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM;
|
||||
if (is_shutting_down()) {
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>(rpc::closed_error());
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id);
|
||||
return do_make_sink_source<repair_row_on_wire_with_cmd, repair_stream_cmd>(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc());
|
||||
@@ -1414,7 +1414,7 @@ future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd
|
||||
messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, shard_id dst_cpu_id, locator::host_id id) {
|
||||
auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM;
|
||||
if (is_shutting_down()) {
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>(rpc::closed_error());
|
||||
return make_exception_future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id);
|
||||
return do_make_sink_source<repair_stream_cmd, repair_hash_with_cmd>(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc());
|
||||
|
||||
@@ -127,20 +127,21 @@ auto send_message(messaging_service* ms, messaging_verb verb, std::optional<loca
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
||||
if (ms->is_shutting_down()) {
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
if (try_catch<rpc::closed_error>(eptr)) {
|
||||
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
||||
// This is a transport error
|
||||
if (host_id) {
|
||||
ms->remove_error_rpc_client(verb, *host_id);
|
||||
} else {
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
}
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
||||
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
||||
} else {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
@@ -165,20 +166,21 @@ auto send_message_timeout(messaging_service* ms, messaging_verb verb, std::optio
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
||||
if (ms->is_shutting_down()) {
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
if (try_catch<rpc::closed_error>(eptr)) {
|
||||
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
||||
// This is a transport error
|
||||
if (host_id) {
|
||||
ms->remove_error_rpc_client(verb, *host_id);
|
||||
} else {
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
}
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
||||
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
||||
} else {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
@@ -206,7 +208,7 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::o
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
||||
if (ms->is_shutting_down()) {
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
@@ -222,14 +224,15 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::o
|
||||
|
||||
return rpc_handler(rpc_client, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
if (try_catch<rpc::closed_error>(eptr)) {
|
||||
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
||||
// This is a transport error
|
||||
if (host_id) {
|
||||
ms->remove_error_rpc_client(verb, *host_id);
|
||||
} else {
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
}
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
||||
host_id.value_or(locator::host_id{}), id.addr, exp->what())));
|
||||
} else if (try_catch<rpc::canceled_error>(eptr)) {
|
||||
// Translate low-level canceled_error into high-level abort_requested_exception.
|
||||
return futurator::make_exception_future(abort_requested_exception{});
|
||||
@@ -255,9 +258,10 @@ auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
|
||||
if (ms->is_shutting_down()) {
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
return futurator::make_exception_future(rpc::closed_error("local node is shutting down"));
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id);
|
||||
auto address = ms->addr_for_host_id(host_id);
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, address, host_id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
|
||||
auto c = std::make_unique<seastar::rpc::cancellable>();
|
||||
@@ -269,12 +273,13 @@ auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb
|
||||
return futurator::make_exception_future(abort_requested_exception{});
|
||||
}
|
||||
|
||||
return rpc_handler(rpc_client, timeout, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
|
||||
return rpc_handler(rpc_client, timeout, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, address, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
if (try_catch<rpc::closed_error>(eptr)) {
|
||||
if (const auto* exp = try_catch<rpc::closed_error>(eptr)) {
|
||||
// This is a transport error
|
||||
ms->remove_error_rpc_client(verb, host_id);
|
||||
return futurator::make_exception_future(std::move(eptr));
|
||||
return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}",
|
||||
host_id, address.addr, exp->what())));
|
||||
} else if (try_catch<rpc::canceled_error>(eptr)) {
|
||||
// Translate low-level canceled_error into high-level abort_requested_exception.
|
||||
return futurator::make_exception_future(abort_requested_exception{});
|
||||
|
||||
Reference in New Issue
Block a user