From 83c69642f7508e52472b2e709f5cd5438b21452e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 18 Jan 2023 18:02:09 +0200 Subject: [PATCH 1/5] gossiper: failure_detector_loop_for_node: check if abort_requested in loop condition The same as the loop condition in the direct_failure_detector. Signed-off-by: Benny Halevy --- gms/gossiper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index f408d02ace..2b5215b018 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -960,7 +960,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene auto echo_interval = std::chrono::seconds(2); auto max_duration = echo_interval + std::chrono::milliseconds(_gcfg.failure_detector_timeout_ms()); auto node = _address_map.get(host_id); - while (is_enabled()) { + while (is_enabled() && !_abort_source.abort_requested()) { bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}/{}, status = started", host_id, node); From e06d226d08ca4f25966b3e3e57254550d7f687d4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 18 Jan 2023 18:02:09 +0200 Subject: [PATCH 2/5] gossiper: failure_detector_loop_for_node: ignore abort_requested_exception Aborting the failure detector happens normally when the node shuts down. There's no need to log anything about it, as long as we abort the function cleanly. Signed-off-by: Benny Halevy --- gms/gossiper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 2b5215b018..8d28bf16c4 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -992,7 +992,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version); co_return; } else { - co_await sleep_abortable(echo_interval, _abort_source); + co_await sleep_abortable(echo_interval, _abort_source).handle_exception_type([] (const abort_requested_exception&) {}); } } co_return; From 0b97806771832618f008ae2d5efd7256ddd1146f Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 10 Apr 2025 17:37:02 +0300 Subject: [PATCH 3/5] idl, message: make with_timeout and cancellable verb attributes composable And define `send_message_timeout_cancellable` in rpc_protocol_impl.hh using the newly introduced rpc_handler entry point in seastar that accepts both timeout and cancellable params. Note that the interface to the user still uses abort_source while internally the funtion allocates a seastar::rpc::cancellable object. It is possible to provide an interface that will accept a rpc::cancellable& from the caller, but the existing messaging api uses abort_source. Changing it may be considered in the future. Signed-off-by: Benny Halevy --- idl-compiler.py | 4 ---- message/rpc_protocol_impl.hh | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/idl-compiler.py b/idl-compiler.py index e5b9f43bb2..412e89efc4 100755 --- a/idl-compiler.py +++ b/idl-compiler.py @@ -479,11 +479,9 @@ class RpcVerb(ASTBase): - [[with_timeout]] - an additional time_point parameter is supplied to the handler function and send* method uses send_message_*_timeout variant of internal function to actually send the message. - Incompatible with [[cancellable]]. - [[cancellable]] - an additional abort_source& parameter is supplied to the handler function and send* method uses send_message_*_cancellable variant of internal function to actually send the message. - Incompatible with [[with_timeout]]. - [[one_way]] - the handler function is annotated by future return type to designate that a client doesn't need to wait for an answer. @@ -697,8 +695,6 @@ def rpc_verb_parse_action(tokens): one_way = not raw_attrs.empty() and 'one_way' in raw_attrs.attr_items if one_way and 'return_values' in tokens: raise Exception(f"Invalid return type specification for one-way RPC verb '{name}'") - if with_timeout and cancellable: - raise Exception(f"Error in verb {name}: [[with_timeout]] cannot be used together with [[cancellable]] in the same verb") return RpcVerb(name=name, parameters=params, return_values=tokens.get('return_values'), with_client_info=with_client_info, with_timeout=with_timeout, cancellable=cancellable, one_way=one_way, ip=ip) diff --git a/message/rpc_protocol_impl.hh b/message/rpc_protocol_impl.hh index 4b0114a700..3627d93b29 100644 --- a/message/rpc_protocol_impl.hh +++ b/message/rpc_protocol_impl.hh @@ -250,6 +250,41 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locato return send_message_cancellable(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward(msg)...); } +template +auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) { + auto rpc_handler = ms->rpc()->make_client(verb); + using futurator = futurize>; + if (ms->is_shutting_down()) { + return futurator::make_exception_future(rpc::closed_error()); + } + auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id); + auto& rpc_client = *rpc_client_ptr; + + auto c = std::make_unique(); + auto& c_ref = *c; + auto sub = as.subscribe([c = std::move(c)] () noexcept { + c->cancel(); + }); + if (!sub) { + return futurator::make_exception_future(abort_requested_exception{}); + } + + return rpc_handler(rpc_client, timeout, c_ref, std::forward(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) { + ms->increment_dropped_messages(verb); + if (try_catch(eptr)) { + // This is a transport error + ms->remove_error_rpc_client(verb, host_id); + return futurator::make_exception_future(std::move(eptr)); + } else if (try_catch(eptr)) { + // Translate low-level canceled_error into high-level abort_requested_exception. + return futurator::make_exception_future(abort_requested_exception{}); + } else { + // This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error. + return futurator::make_exception_future(std::move(eptr)); + } + }); +} + // Send one way message for verb template auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) { From fa1c3e86a9bf90899d38160465bd13248b7897d1 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 13 Dec 2022 13:03:47 +0200 Subject: [PATCH 4/5] gossiper: add send_echo helper CAll send_gossip_echo using a centralized helper. A following patch will make it abortable. Signed-off-by: Benny Halevy --- gms/gossiper.cc | 10 +++++++--- gms/gossiper.hh | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 8d28bf16c4..ae1b43d9de 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -954,6 +954,10 @@ future> gossiper::get_unreachable_members_synchronized() }); } +future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up) { + return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, generation_number, notify_up); +} + future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) { auto last = gossiper::clk::now(); auto diff = gossiper::clk::duration(0); @@ -964,7 +968,7 @@ future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, gene bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}/{}, status = started", host_id, node); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false); + co_await send_echo(host_id, max_duration, gossip_generation.value(), false); logger.debug("failure_detector_loop: Send echo to node {}/{}, status = ok", host_id, node); } catch (...) { failed = true; @@ -1688,7 +1692,7 @@ future<> gossiper::notify_nodes_on_up(std::unordered_set dsts) if (dst != _gcfg.host_id) { try { auto generation = my_endpoint_state().get_heart_beat_state().get_generation(); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, dst, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), generation.value(), true); + co_await send_echo(dst, std::chrono::seconds(10), generation.value(), true); } catch (...) { logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception()); } @@ -1724,7 +1728,7 @@ void gossiper::mark_alive(endpoint_state_ptr node) { // Enter the _background_msg gate so stop() would wait on it auto gh = _background_msg.hold(); logger.debug("Sending a EchoMessage to {}/{}, with generation_number={}", id, addr, generation); - (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(15), generation.value(), false).then([this, id] { + (void) send_echo(id, std::chrono::seconds(15), generation.value(), false).then([this, id] { logger.trace("Got EchoMessage Reply"); return real_mark_alive(id); }).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending), id] (auto ep) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index bd3f579a94..bfce37a625 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -109,6 +109,7 @@ private: future<> handle_shutdown_msg(locator::host_id from, std::optional generation_number_opt); future<> do_send_ack_msg(locator::host_id from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(locator::host_id from, utils::chunked_vector ack_msg_digest); + future<> send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up); future handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request); static constexpr uint32_t _default_cpuid = 0; void do_sort(utils::chunked_vector& g_digest_list) const; From 4bd0845fce71fe6b6a4a014f76917eb2590c59a8 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 13 Dec 2022 13:03:47 +0200 Subject: [PATCH 5/5] gossiper: make send_gossip_echo cancellable Currently send_gossip_echo has a 22 seconds timeout during which _abort_source is ignored. Mark the verb as cancellable so it can be canceled on shutdown / abort. Signed-off-by: Benny Halevy --- gms/gossiper.cc | 2 +- idl/gossip.idl.hh | 2 +- test/manual/message.cc | 16 +++++++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index ae1b43d9de..babc63d9c9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -955,7 +955,7 @@ future> gossiper::get_unreachable_members_synchronized() } future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up) { - return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, generation_number, notify_up); + return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, _abort_source, generation_number, notify_up); } future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) { diff --git a/idl/gossip.idl.hh b/idl/gossip.idl.hh index 77b43792e8..714a11bbe2 100644 --- a/idl/gossip.idl.hh +++ b/idl/gossip.idl.hh @@ -9,7 +9,7 @@ #include "gms/gossip_digest_syn.hh" namespace gms { -verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) +verb [[with_client_info, with_timeout, cancellable]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) verb [[with_client_info, one_way]] gossip_shutdown (gms::inet_address from, int64_t generation_number [[version 4.6.0]]) verb [[with_client_info, one_way, ip]] gossip_digest_syn (gms::gossip_digest_syn syn) verb [[with_client_info, one_way]] gossip_digest_ack (gms::gossip_digest_ack ask) diff --git a/test/manual/message.cc b/test/manual/message.cc index fd7e3d8ad9..50f463e8a3 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include "db/config.hh" @@ -157,15 +158,12 @@ public: future<> test_echo() { test_logger.info("=== {} ===", __func__); int64_t gen = 0x1; - return ser::gossip_rpc_verbs::send_gossip_echo(&ms, _server_id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen, false).then_wrapped([] (auto&& f) { - try { - f.get(); - return make_ready_future<>(); - } catch (std::runtime_error& e) { - test_logger.error("test_echo: {}", e.what()); - } - return make_ready_future<>(); - }); + abort_source as; + try { + co_await ser::gossip_rpc_verbs::send_gossip_echo(&ms, _server_id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), as, gen, false); + } catch (...) { + test_logger.error("test_echo: {}", std::current_exception()); + } } };