From b35af73fdfd1fce016f6228c9760f6a5ae83af62 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov Date: Fri, 23 Sep 2022 22:59:49 +0300 Subject: [PATCH] raft: (upgrade) do not use IP addresses from Raft config Always use raft address map to obtain the IP addresses of upgrade peers. Right now the map is populated from Raft configuration, so it's an equivalent transformation, but in the future raft address map will be populated from other sources: discovery and gossip, hence the logic of upgrade will change as well. Do not proceed with the upgrade if an address is missing from the map, since it means we failed to contact a raft member. --- service/raft/raft_group0.cc | 185 +++++++++++++++++++++--------------- 1 file changed, 107 insertions(+), 78 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 0cb11f22d8..8b4e59ccbb 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -5,6 +5,8 @@ /* * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include + #include "service/raft/raft_group0.hh" #include "service/raft/raft_rpc.hh" #include "service/raft/raft_sys_table_storage.hh" @@ -432,13 +434,45 @@ future<> raft_group0::join_group0(std::vector seeds, bool as_ group0_log.info("server {} joined group 0 with group id {}", my_id, group0_id); } +struct group0_members { + const raft::server& _group0_server; + const raft_address_map<>& _address_map; + + + std::vector get_inet_addrs(std::experimental::source_location l = + std::experimental::source_location::current()) const { + const raft::config_member_set& members = _group0_server.get_configuration().current; + std::vector ret; + std::vector missing; + ret.reserve(members.size()); + for (const auto& srv: members) { + auto addr = _address_map.find(srv.addr.id); + if (!addr.has_value()) { + missing.push_back(srv.addr.id); + } else { + ret.push_back(addr.value()); + } + } + if (!missing.empty()) { + upgrade_log.info("{}: failed to resolve IP addresses of some of the cluster members ({})", + l.function_name(), missing); + ret.clear(); + } + return ret; + } + + bool is_joint() const { + return _group0_server.get_configuration().is_joint(); + } +}; + static future wait_for_peers_to_enter_synchronize_state( - const raft::server& group0_server, netw::messaging_service&, abort_source&, gate::holder pause_shutdown); + const group0_members& members0, netw::messaging_service&, abort_source&, gate::holder pause_shutdown); static future anyone_finished_upgrade( - const raft::server& group0_server, netw::messaging_service&, abort_source&); + const group0_members& members0, netw::messaging_service&, abort_source&); static future synchronize_schema( replica::database&, netw::messaging_service&, - const raft::server& group0_server, service::migration_manager&, + const group0_members& members0, service::migration_manager&, const noncopyable_function()>& can_finish_early, abort_source&); @@ -496,6 +530,7 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft..."); auto& group0_server = _raft_gr.group0(); + group0_members members0{group0_server, _raft_gr.address_map()}; // Perform a Raft read barrier so we know the set of group 0 members and our group 0 state is up-to-date. co_await group0_server.read_barrier(&_abort_source); @@ -511,12 +546,12 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord // // In a fully upgraded cluster this should finish immediately (if the network works well) - everyone is in `use_post_raft_procedures`. // In a cluster that is currently in the middle of `upgrade_to_group0`, this will cause us to wait until the procedure finishes. - group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft..."); - if (co_await wait_for_peers_to_enter_synchronize_state(group0_server, _ms, _abort_source, _shutdown_gate.hold())) { + group0_log.info("setup_group0: waiting for peers to synchronize state..."); + if (co_await wait_for_peers_to_enter_synchronize_state(members0, _ms, _abort_source, _shutdown_gate.hold())) { // Everyone entered `synchronize` state. That means we're bootstrapping in the middle of `upgrade_to_group0`. // We need to finish upgrade as others do. - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(group0_server), std::ref(_ms), std::ref(_abort_source)); - co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source); + auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms), std::ref(_abort_source)); + co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source); } } @@ -865,13 +900,6 @@ persistent_discovery::persistent_discovery(discovery_peer my_addr, const peer_li } } -static std::vector get_raft_members_inet_addrs(const raft::config_member_set& members) { - std::vector ret; - for (auto& srv: members) { - ret.push_back(raft_addr_to_inet_addr(srv.addr)); - } - return ret; -} // Given a function `fun` that takes an `abort_source&` as parameter, // call `fun` with an internally constructed abort source which is aborted after the given time duration. @@ -949,17 +977,31 @@ with_timeout(abort_source& as, db::timeout_clock::duration d, F&& fun) { return impl(as, d, std::forward(fun)); } +// A helper class to sleep in a loop with an exponentially +// increasing retry period. +struct sleep_with_exponential_backoff { + std::chrono::seconds _retry_period{1}; + static constexpr std::chrono::seconds _max_retry_period{16}; + future<> operator()(abort_source& as) { + co_await sleep_abortable(_retry_period, as); + _retry_period = std::max(_retry_period * 2, _max_retry_period); + } +}; + + // Precondition: we joined group 0 and the server is running. // Assumes we don't leave group 0 while running. -static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const raft::server& group0_server, abort_source& as) { - static constexpr auto retry_period = std::chrono::seconds{1}; +static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const group0_members& members0, abort_source& as) { - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // We fetch both config and peers on each iteration; we don't assume that they don't change. // No new node should join while the procedure is running, but nodes may leave. - auto group0_config = group0_server.get_configuration(); - auto current_config = get_raft_members_inet_addrs(group0_config.current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + // Not all addresses are known + continue; + } std::sort(current_config.begin(), current_config.end()); auto peers = co_await sys_ks.load_peers(); @@ -969,32 +1011,29 @@ static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, std::set_difference(peers.begin(), peers.end(), current_config.begin(), current_config.end(), std::back_inserter(missing_peers)); if (missing_peers.empty()) { - if (!group0_config.is_joint()) { + if (!members0.is_joint()) { co_return; } - upgrade_log.info("group 0 configuration is joint: {}. Sleeping for a while before retrying...", group0_config); - co_await sleep_abortable(retry_period, as); + upgrade_log.info("group 0 configuration is joint: {}. Sleeping for a while before retrying...", current_config); continue; } upgrade_log.info( "group 0 configuration does not contain all peers yet." - " Missing peers: {}. Current group 0 config: {}. Current group 0 config addresses: {}. Sleeping for a while before retrying...", - missing_peers, group0_config, current_config); - - co_await sleep_abortable(retry_period, as); + " Missing peers: {}. Current group 0 config: {}. Sleeping for a while before retrying...", + missing_peers, current_config); } } // Check if anyone entered `use_post_raft_procedures`. // This is a best-effort single round-trip check; we don't retry if some nodes fail to answer. static future anyone_finished_upgrade( - const raft::server& group0_server, netw::messaging_service& ms, abort_source& as) { + const group0_members& members0, netw::messaging_service& ms, abort_source& as) { static constexpr auto max_concurrency = 10; static constexpr auto rpc_timeout = std::chrono::seconds{5}; - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); + auto current_config = members0.get_inet_addrs(); bool finished = false; co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { try { @@ -1015,22 +1054,23 @@ static future anyone_finished_upgrade( // Check if it's possible to reach everyone through `get_group0_upgrade_state` RPC. static future<> check_remote_group0_upgrade_state_dry_run( - const noncopyable_function>()>& get_peers, + const noncopyable_function>()>& get_inet_addrs, netw::messaging_service& ms, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{16}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // Note: we strive to get a response from everyone in a 'single round-trip', // so we don't skip nodes which responded in earlier iterations. // We contact everyone in each iteration even if some of these guys already answered. // We fetch peers again on every attempt to handle the possibility of leaving nodes. - auto peers = co_await get_peers(); + auto cluster_config = co_await get_inet_addrs(); + if (cluster_config.empty()) { + continue; + } bool retry = false; - co_await max_concurrent_for_each(peers, max_concurrency, [&] (const gms::inet_address& node) -> future<> { + co_await max_concurrent_for_each(cluster_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { try { upgrade_log.info("check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})`", node); co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node)); @@ -1051,11 +1091,6 @@ static future<> check_remote_group0_upgrade_state_dry_run( } upgrade_log.warn("check_remote_group0_upgrade_state_dry_run: retrying in a while..."); - - co_await sleep_abortable(retry_period, as); - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1067,8 +1102,7 @@ static future<> check_remote_group0_upgrade_state_dry_run( // Returns `true` if we finished because everybody entered `synchronize`. // Returns `false` if we finished because somebody entered `use_post_raft_procedures`. static future wait_for_peers_to_enter_synchronize_state( - const raft::server& group0_server, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) { - static constexpr auto retry_period = std::chrono::seconds{1}; + const group0_members& members0, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) { static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; @@ -1079,9 +1113,12 @@ static future wait_for_peers_to_enter_synchronize_state( // For production code this line is unnecessary. entered_synchronize->insert(utils::fb_utilities::get_broadcast_address()); - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // We fetch the config again on every attempt to handle the possibility of removing failed nodes. - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + continue; + } ::tracker tracker; auto retry = make_lw_shared(false); @@ -1147,27 +1184,26 @@ static future wait_for_peers_to_enter_synchronize_state( } upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: retrying in a while..."); - - co_await sleep_abortable(retry_period, as); } } // Returning nullopt means we finished early (`can_finish_early` returned true). static future>> collect_schema_versions_from_group0_members( - netw::messaging_service& ms, const raft::server& group0_server, + netw::messaging_service& ms, const group0_members& members0, const noncopyable_function()>& can_finish_early, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{16}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; std::unordered_map versions; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { + // We fetch the config on each iteration; some nodes may leave. - auto group0_config = group0_server.get_configuration(); - auto current_config = get_raft_members_inet_addrs(group0_config.current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + continue; + } bool failed = false; co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { @@ -1194,8 +1230,8 @@ collect_schema_versions_from_group0_members( if (failed) { upgrade_log.warn("synchronize_schema: there were some failures when collecting remote schema versions."); - } else if (group0_config.is_joint()) { - upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", group0_config); + } else if (members0.is_joint()) { + upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", current_config); } else { co_return versions; } @@ -1209,10 +1245,6 @@ collect_schema_versions_from_group0_members( upgrade_log.info( "synchronize_schema: could not finish early." " Sleeping for a while before retrying remote schema version collection..."); - co_await sleep_abortable(retry_period, as); - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1231,20 +1263,19 @@ collect_schema_versions_from_group0_members( // should cause everybody to arrive at the same result. static future synchronize_schema( replica::database& db, netw::messaging_service& ms, - const raft::server& group0_server, service::migration_manager& mm, + const group0_members& members0, + service::migration_manager& mm, const noncopyable_function()>& can_finish_early, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{32}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; bool last_pull_successful = false; size_t num_attempts_after_successful_pull = 0; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { upgrade_log.info("synchronize_schema: collecting schema versions from group 0 members..."); - auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, group0_server, can_finish_early, as); + auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, members0, can_finish_early, as); if (!remote_versions) { upgrade_log.info("synchronize_schema: finished early."); co_return false; @@ -1314,11 +1345,6 @@ static future synchronize_schema( } upgrade_log.info("synchronize_schema: sleeping for a while before collecting schema versions again..."); - co_await sleep_abortable(retry_period, as); - - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1409,7 +1435,12 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // step, on the other hand, allows nodes to leave and will unblock as soon as all remaining peers are // ready to answer. upgrade_log.info("Waiting until everyone is ready to start upgrade..."); - co_await check_remote_group0_upgrade_state_dry_run(std::bind_front(&db::system_keyspace::load_peers, &_sys_ks), _ms, _abort_source); + auto get_inet_addrs = [this]() -> future> { + auto current_config = co_await _sys_ks.load_peers(); + current_config.push_back(_gossiper.get_broadcast_address()); + co_return current_config; + }; + co_await check_remote_group0_upgrade_state_dry_run(get_inet_addrs, _ms, _abort_source); if (!joined_group0()) { upgrade_log.info("Joining group 0..."); @@ -1419,6 +1450,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { "We're already a member of group 0." " Apparently we're restarting after a previous upgrade attempt failed."); } + group0_members members0{_raft_gr.group0(), _raft_gr.address_map()}; // After we joined, we shouldn't be removed from group 0 until the end of the procedure. // The implementation of `leave_group0` waits until upgrade finishes before leaving the group. @@ -1428,10 +1460,8 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // and we won't do anything harmful to other nodes while in `synchronize`, worst case being // that we get stuck. - auto& group0_server = _raft_gr.group0(); - upgrade_log.info("Waiting until every peer has joined Raft group 0..."); - co_await wait_until_every_peer_joined_group0(_sys_ks, group0_server, _abort_source); + co_await wait_until_every_peer_joined_group0(_sys_ks, members0, _abort_source); upgrade_log.info("Every peer is a member of Raft group 0."); if (start_state == group0_upgrade_state::use_pre_raft_procedures) { @@ -1445,7 +1475,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // to do any additional schema pulls (only verify quickly that the schema is still in sync). upgrade_log.info("Waiting for schema to synchronize across all nodes in group 0..."); auto can_finish_early = [] { return make_ready_future(false); }; - co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source); + co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source); // Before entering `synchronize`, perform a round-trip of `get_group0_upgrade_state` RPC calls // to everyone as a dry run, just to check that nodes respond to this RPC. @@ -1455,9 +1485,8 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // a very high chance that the following steps succeed as well (we would need to be very unlucky otherwise). upgrade_log.info("Performing a dry run of remote `get_group0_upgrade_state` calls..."); co_await check_remote_group0_upgrade_state_dry_run( - [&group0_server] { - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); - return make_ready_future>(std::move(current_config)); + [members0] { + return make_ready_future>(members0.get_inet_addrs()); }, _ms, _abort_source); utils::get_local_injector().inject("group0_upgrade_before_synchronize", @@ -1470,14 +1499,14 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { } upgrade_log.info("Waiting for all peers to enter synchronize state..."); - if (!(co_await wait_for_peers_to_enter_synchronize_state(group0_server, _ms, _abort_source, _shutdown_gate.hold()))) { + if (!(co_await wait_for_peers_to_enter_synchronize_state(members0, _ms, _abort_source, _shutdown_gate.hold()))) { upgrade_log.info("Another node already finished upgrade. We can finish early."); co_return; } upgrade_log.info("All peers in synchronize state. Waiting for schema to synchronize..."); - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(group0_server), std::ref(_ms), std::ref(_abort_source)); - if (!(co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source))) { + auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms), std::ref(_abort_source)); + if (!(co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source))) { upgrade_log.info("Another node already finished upgrade. We can finish early."); co_return; }