Wait for booting node to be marked UP before complete booting.

Currently a node does not wait to be marked UP by other nodes before
complete booting which creates a usability issue: during a rolling restart
it is not enough to wait for local CQL port to be opened before
restarting next node, but it is also needed to check that all other
nodes already see this node as alive otherwise if next node is restarted
some nodes may see two node as dead instead of one.

This patch improves the situation by making sure that boot process does
not complete before all other nodes do not see the booting one as alive.
This is still a best effort thing: if some nodes are unreachable or
gossiper propagation takes too much time the boot process continues
anyway.

Fixes scylladb/scylladb#19206
This commit is contained in:
Gleb Natapov
2024-06-17 13:01:02 +03:00
parent 09556bff0e
commit 28c0a27467
5 changed files with 60 additions and 11 deletions

View File

@@ -42,6 +42,7 @@
#include <utility>
#include "gms/generation-number.hh"
#include "locator/token_metadata.hh"
#include "seastar/rpc/rpc_types.hh"
#include "utils/exceptions.hh"
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
@@ -409,7 +410,7 @@ future<> gossiper::handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg) {
co_await apply_state_locally(std::move(remote_ep_state_map));
}
future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t> generation_number_opt) {
future<> gossiper::handle_echo_msg(gms::inet_address from, seastar::rpc::opt_time_point timeout, std::optional<int64_t> generation_number_opt, bool notify_up) {
bool respond = true;
if (!_advertise_myself) {
respond = false;
@@ -434,9 +435,22 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t
}
}
if (!respond) {
return make_exception_future(std::runtime_error("Not ready to respond gossip echo message"));
throw std::runtime_error("Not ready to respond gossip echo message");
}
if (notify_up) {
if (!timeout) {
on_internal_error(logger, "UP notification should have a timeout");
}
co_await container().invoke_on(0, [from, timeout] (gossiper& g) -> future<> {
try {
while (rpc::rpc_clock_type::now() < *timeout && !g.is_alive(from)) {
co_await sleep_abortable(std::chrono::milliseconds(100), g._abort_source);
}
} catch(...) {
logger.warn("handle_echo_msg: UP notification from {} failed with {}", from, std::current_exception());
}
});
}
return make_ready_future<>();
}
future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt) {
@@ -513,9 +527,9 @@ void gossiper::init_messaging_service_handler() {
return gossiper.handle_ack2_msg(from, std::move(msg));
});
});
ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point, rpc::optional<int64_t> generation_number_opt) {
ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point timeout, rpc::optional<int64_t> generation_number_opt, rpc::optional<bool> notify_up_opt) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return handle_echo_msg(from, generation_number_opt);
return handle_echo_msg(from, timeout, generation_number_opt, notify_up_opt.value_or(false));
});
ser::gossip_rpc_verbs::register_gossip_shutdown(&_messaging, [this] (inet_address from, rpc::optional<int64_t> generation_number_opt) {
return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) {
@@ -951,7 +965,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera
bool failed = false;
try {
logger.debug("failure_detector_loop: Send echo to node {}, status = started", node);
co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value());
co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false);
logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node);
} catch (...) {
failed = true;
@@ -1651,6 +1665,19 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
}
}
future<> gossiper::notify_nodes_on_up(std::unordered_set<inet_address> dsts) {
co_await coroutine::parallel_for_each(dsts, [this] (inet_address dst) -> future<> {
if (dst != get_broadcast_address()) {
try {
auto generation = my_endpoint_state().get_heart_beat_state().get_generation();
co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(dst), netw::messaging_service::clock_type::now() + std::chrono::seconds(10), generation.value(), true);
} catch (...) {
logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception());
}
}
});
}
void gossiper::mark_alive(inet_address addr) {
// Enter the _background_msg gate so stop() would wait on it
auto inserted = _pending_mark_alive_endpoints.insert(addr).second;
@@ -1673,7 +1700,7 @@ void gossiper::mark_alive(inet_address addr) {
// Enter the _background_msg gate so stop() would wait on it
auto gh = _background_msg.hold();
logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation);
(void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value()).then([this, addr] {
(void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value(), false).then([this, addr] {
logger.trace("Got EchoMessage Reply");
return real_mark_alive(addr);
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending)] (auto ep) {

View File

@@ -111,7 +111,7 @@ private:
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
future<> handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg);
future<> handle_echo_msg(inet_address from, std::optional<int64_t> generation_number_opt);
future<> handle_echo_msg(inet_address from, seastar::rpc::opt_time_point, std::optional<int64_t> generation_number_opt, bool notify_up);
future<> handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt);
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
@@ -704,6 +704,8 @@ public:
void check_snitch_name_matches(sstring local_snitch_name) const;
int get_down_endpoint_count() const noexcept;
int get_up_endpoint_count() const noexcept;
// Send UP notification to all nodes in the set
future<> notify_nodes_on_up(std::unordered_set<inet_address>);
private:
future<> failure_detector_loop();
future<> failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version);

View File

@@ -9,7 +9,7 @@
#include "gms/gossip_digest_syn.hh"
namespace gms {
verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]])
verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]])
verb [[one_way]] gossip_shutdown (gms::inet_address from, int64_t generation_number [[version 4.6.0]])
verb [[with_client_info, one_way]] gossip_digest_syn (gms::gossip_digest_syn syn)
verb [[with_client_info, one_way]] gossip_digest_ack (gms::gossip_digest_ack ask)

View File

@@ -1824,6 +1824,17 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// Initializes monitor only after updating local topology.
start_tablet_split_monitor();
std::unordered_set<inet_address> ips;
const auto& am = _group0->address_map();
for (auto id : _topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys) {
auto ip = am.find(id);
if (ip) {
ips.insert(*ip);
}
}
co_await _gossiper.notify_nodes_on_up(std::move(ips));
co_return;
}
@@ -1996,6 +2007,15 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
}
// Other errors are handled internally by track_upgrade_progress_to_topology_coordinator
})(*this, sys_dist_ks, proxy);
std::unordered_set<inet_address> ips;
_gossiper.for_each_endpoint_state([this, &ips] (const inet_address& addr, const gms::endpoint_state&) {
if (_gossiper.is_normal(addr)) {
ips.insert(addr);
}
});
co_await _gossiper.notify_nodes_on_up(std::move(ips));
}
future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {

View File

@@ -109,7 +109,7 @@ public:
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
});
ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional<int64_t> gen_opt) {
ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional<int64_t> gen_opt, rpc::optional<bool> notify_up) {
test_logger.info("Server got gossip echo msg");
throw std::runtime_error("I'm throwing runtime_error exception");
return make_ready_future<>();
@@ -155,7 +155,7 @@ public:
test_logger.info("=== {} ===", __func__);
auto id = get_msg_addr();
int64_t gen = 0x1;
return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen).then_wrapped([] (auto&& f) {
return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen, false).then_wrapped([] (auto&& f) {
try {
f.get();
return make_ready_future<>();