diff --git a/gms/gossiper.cc b/gms/gossiper.cc index f408d02ace..babc63d9c9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -954,17 +954,21 @@ future> gossiper::get_unreachable_members_synchronized() }); } +future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up) { + return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, _abort_source, generation_number, notify_up); +} + future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) { auto last = gossiper::clk::now(); auto diff = gossiper::clk::duration(0); auto echo_interval = std::chrono::seconds(2); auto max_duration = echo_interval + std::chrono::milliseconds(_gcfg.failure_detector_timeout_ms()); auto node = _address_map.get(host_id); - while (is_enabled()) { + while (is_enabled() && !_abort_source.abort_requested()) { bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}/{}, status = started", host_id, node); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false); + co_await send_echo(host_id, max_duration, gossip_generation.value(), false); logger.debug("failure_detector_loop: Send echo to node {}/{}, status = ok", host_id, node); } catch (...) { failed = true; @@ -992,7 +996,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version); co_return; } else { - co_await sleep_abortable(echo_interval, _abort_source); + co_await sleep_abortable(echo_interval, _abort_source).handle_exception_type([] (const abort_requested_exception&) {}); } } co_return; @@ -1688,7 +1692,7 @@ future<> gossiper::notify_nodes_on_up(std::unordered_set dsts) if (dst != _gcfg.host_id) { try { auto generation = my_endpoint_state().get_heart_beat_state().get_generation(); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, dst, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), generation.value(), true); + co_await send_echo(dst, std::chrono::seconds(10), generation.value(), true); } catch (...) { logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception()); } @@ -1724,7 +1728,7 @@ void gossiper::mark_alive(endpoint_state_ptr node) { // Enter the _background_msg gate so stop() would wait on it auto gh = _background_msg.hold(); logger.debug("Sending a EchoMessage to {}/{}, with generation_number={}", id, addr, generation); - (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, id] { + (void) send_echo(id, std::chrono::seconds(15), generation.value(), false).then([this, id] { logger.trace("Got EchoMessage Reply"); return real_mark_alive(id); }).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending), id] (auto ep) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index bd3f579a94..bfce37a625 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -109,6 +109,7 @@ private: future<> handle_shutdown_msg(locator::host_id from, std::optional generation_number_opt); future<> do_send_ack_msg(locator::host_id from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(locator::host_id from, utils::chunked_vector ack_msg_digest); + future<> send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up); future handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request); static constexpr uint32_t _default_cpuid = 0; void do_sort(utils::chunked_vector& g_digest_list) const; diff --git a/idl-compiler.py b/idl-compiler.py index e5b9f43bb2..412e89efc4 100755 --- a/idl-compiler.py +++ b/idl-compiler.py @@ -479,11 +479,9 @@ class RpcVerb(ASTBase): - [[with_timeout]] - an additional time_point parameter is supplied to the handler function and send* method uses send_message_*_timeout variant of internal function to actually send the message. - Incompatible with [[cancellable]]. - [[cancellable]] - an additional abort_source& parameter is supplied to the handler function and send* method uses send_message_*_cancellable variant of internal function to actually send the message. - Incompatible with [[with_timeout]]. - [[one_way]] - the handler function is annotated by future return type to designate that a client doesn't need to wait for an answer. @@ -697,8 +695,6 @@ def rpc_verb_parse_action(tokens): one_way = not raw_attrs.empty() and 'one_way' in raw_attrs.attr_items if one_way and 'return_values' in tokens: raise Exception(f"Invalid return type specification for one-way RPC verb '{name}'") - if with_timeout and cancellable: - raise Exception(f"Error in verb {name}: [[with_timeout]] cannot be used together with [[cancellable]] in the same verb") return RpcVerb(name=name, parameters=params, return_values=tokens.get('return_values'), with_client_info=with_client_info, with_timeout=with_timeout, cancellable=cancellable, one_way=one_way, ip=ip) diff --git a/idl/gossip.idl.hh b/idl/gossip.idl.hh index 77b43792e8..714a11bbe2 100644 --- a/idl/gossip.idl.hh +++ b/idl/gossip.idl.hh @@ -9,7 +9,7 @@ #include "gms/gossip_digest_syn.hh" namespace gms { -verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) +verb [[with_client_info, with_timeout, cancellable]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) verb [[with_client_info, one_way]] gossip_shutdown (gms::inet_address from, int64_t generation_number [[version 4.6.0]]) verb [[with_client_info, one_way, ip]] gossip_digest_syn (gms::gossip_digest_syn syn) verb [[with_client_info, one_way]] gossip_digest_ack (gms::gossip_digest_ack ask) diff --git a/message/rpc_protocol_impl.hh b/message/rpc_protocol_impl.hh index 4b0114a700..3627d93b29 100644 --- a/message/rpc_protocol_impl.hh +++ b/message/rpc_protocol_impl.hh @@ -250,6 +250,41 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locato return send_message_cancellable(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward(msg)...); } +template +auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) { + auto rpc_handler = ms->rpc()->make_client(verb); + using futurator = futurize>; + if (ms->is_shutting_down()) { + return futurator::make_exception_future(rpc::closed_error()); + } + auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id); + auto& rpc_client = *rpc_client_ptr; + + auto c = std::make_unique(); + auto& c_ref = *c; + auto sub = as.subscribe([c = std::move(c)] () noexcept { + c->cancel(); + }); + if (!sub) { + return futurator::make_exception_future(abort_requested_exception{}); + } + + return rpc_handler(rpc_client, timeout, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { + ms->increment_dropped_messages(verb); + if (try_catch(eptr)) { + // This is a transport error + ms->remove_error_rpc_client(verb, host_id); + return futurator::make_exception_future(std::move(eptr)); + } else if (try_catch(eptr)) { + // Translate low-level canceled_error into high-level abort_requested_exception. + return futurator::make_exception_future(abort_requested_exception{}); + } else { + // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. + return futurator::make_exception_future(std::move(eptr)); + } + }); +} + // Send one way message for verb template auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { diff --git a/test/manual/message.cc b/test/manual/message.cc index fd7e3d8ad9..50f463e8a3 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include "db/config.hh" @@ -157,15 +158,12 @@ public: future<> test_echo() { test_logger.info("=== {} ===", __func__); int64_t gen = 0x1; - return ser::gossip_rpc_verbs::send_gossip_echo(&ms, _server_id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen, false).then_wrapped([] (auto&& f) { - try { - f.get(); - return make_ready_future<>(); - } catch (std::runtime_error& e) { - test_logger.error("test_echo: {}", e.what()); - } - return make_ready_future<>(); - }); + abort_source as; + try { + co_await ser::gossip_rpc_verbs::send_gossip_echo(&ms, _server_id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), as, gen, false); + } catch (...) { + test_logger.error("test_echo: {}", std::current_exception()); + } } };