diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 1bcfafbe00..cfe062fdea 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1292,7 +1292,7 @@ future, rpc::source>; if (is_shutting_down()) { - return make_exception_future(rpc::closed_error()); + return make_exception_future(rpc::closed_error("local node is shutting down")); } auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, addr_for_host_id(id), id); return rpc_client->make_stream_sink().then([this, session, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink sink) mutable { @@ -1321,7 +1321,7 @@ rpc::sink messaging_service::make_sink_for_stre future, rpc::source>> messaging_service::make_sink_and_source_for_stream_blob(streaming::stream_blob_meta meta, locator::host_id id) { if (is_shutting_down()) { - co_await coroutine::return_exception(rpc::closed_error()); + co_await coroutine::return_exception(rpc::closed_error("local node is shutting down")); } auto rpc_client = get_rpc_client(messaging_verb::STREAM_BLOB, addr_for_host_id(id), id); auto sink = co_await rpc_client->make_stream_sink(); @@ -1370,7 +1370,7 @@ future, rpc::source, rpc::source>>(rpc::closed_error()); + return make_exception_future, rpc::source>>(rpc::closed_error("local node is shutting down")); } auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id); return do_make_sink_source(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc()); @@ -1392,7 +1392,7 @@ future, rpc::source, rpc::source>>(rpc::closed_error()); + return make_exception_future, rpc::source>>(rpc::closed_error("local node is shutting down")); } auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id); return do_make_sink_source(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc()); @@ -1414,7 +1414,7 @@ future, rpc::source, rpc::source>>(rpc::closed_error()); + return make_exception_future, rpc::source>>(rpc::closed_error("local node is shutting down")); } auto rpc_client = get_rpc_client(verb, addr_for_host_id(id), id); return do_make_sink_source(verb, repair_meta_id, dst_cpu_id, std::move(rpc_client), rpc()); diff --git a/message/rpc_protocol_impl.hh b/message/rpc_protocol_impl.hh index 3627d93b29..4061df2864 100644 --- a/message/rpc_protocol_impl.hh +++ b/message/rpc_protocol_impl.hh @@ -127,20 +127,21 @@ auto send_message(messaging_service* ms, messaging_verb verb, std::optionalrpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { - return futurator::make_exception_future(rpc::closed_error()); + return futurator::make_exception_future(rpc::closed_error("local node is shutting down")); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; return rpc_handler(rpc_client, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); - if (try_catch(eptr)) { + if (const auto* exp = try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } - return futurator::make_exception_future(std::move(eptr)); + return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}", + host_id.value_or(locator::host_id{}), id.addr, exp->what()))); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); @@ -165,20 +166,21 @@ auto send_message_timeout(messaging_service* ms, messaging_verb verb, std::optio auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { - return futurator::make_exception_future(rpc::closed_error()); + return futurator::make_exception_future(rpc::closed_error("local node is shutting down")); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; return rpc_handler(rpc_client, timeout, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); - if (try_catch(eptr)) { + if (const auto* exp = try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } - return futurator::make_exception_future(std::move(eptr)); + return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}", + host_id.value_or(locator::host_id{}), id.addr, exp->what()))); } else { // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. return futurator::make_exception_future(std::move(eptr)); @@ -206,7 +208,7 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::o auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { - return futurator::make_exception_future(rpc::closed_error()); + return futurator::make_exception_future(rpc::closed_error("local node is shutting down")); } auto rpc_client_ptr = ms->get_rpc_client(verb, id, host_id); auto& rpc_client = *rpc_client_ptr; @@ -222,14 +224,15 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, std::o return rpc_handler(rpc_client, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), id, host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); - if (try_catch(eptr)) { + if (const auto* exp = try_catch(eptr)) { // This is a transport error if (host_id) { ms->remove_error_rpc_client(verb, *host_id); } else { ms->remove_error_rpc_client(verb, id); } - return futurator::make_exception_future(std::move(eptr)); + return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}", + host_id.value_or(locator::host_id{}), id.addr, exp->what()))); } else if (try_catch(eptr)) { // Translate low-level canceled_error into high-level abort_requested_exception. return futurator::make_exception_future(abort_requested_exception{}); @@ -255,9 +258,10 @@ auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb auto rpc_handler = ms->rpc()->make_client(verb); using futurator = futurize>; if (ms->is_shutting_down()) { - return futurator::make_exception_future(rpc::closed_error()); + return futurator::make_exception_future(rpc::closed_error("local node is shutting down")); } - auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id); + auto address = ms->addr_for_host_id(host_id); + auto rpc_client_ptr = ms->get_rpc_client(verb, address, host_id); auto& rpc_client = *rpc_client_ptr; auto c = std::make_unique(); @@ -269,12 +273,13 @@ auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb return futurator::make_exception_future(abort_requested_exception{}); } - return rpc_handler(rpc_client, timeout, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { + return rpc_handler(rpc_client, timeout, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, address, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { ms->increment_dropped_messages(verb); - if (try_catch(eptr)) { + if (const auto* exp = try_catch(eptr)) { // This is a transport error ms->remove_error_rpc_client(verb, host_id); - return futurator::make_exception_future(std::move(eptr)); + return futurator::make_exception_future(rpc::closed_error(fmt::format("got error from node {}/{}: {}", + host_id, address.addr, exp->what()))); } else if (try_catch(eptr)) { // Translate low-level canceled_error into high-level abort_requested_exception. return futurator::make_exception_future(abort_requested_exception{});