diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 0cb11f22d8..8b4e59ccbb 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -5,6 +5,8 @@ /* * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include + #include "service/raft/raft_group0.hh" #include "service/raft/raft_rpc.hh" #include "service/raft/raft_sys_table_storage.hh" @@ -432,13 +434,45 @@ future<> raft_group0::join_group0(std::vector seeds, bool as_ group0_log.info("server {} joined group 0 with group id {}", my_id, group0_id); } +struct group0_members { + const raft::server& _group0_server; + const raft_address_map<>& _address_map; + + + std::vector get_inet_addrs(std::experimental::source_location l = + std::experimental::source_location::current()) const { + const raft::config_member_set& members = _group0_server.get_configuration().current; + std::vector ret; + std::vector missing; + ret.reserve(members.size()); + for (const auto& srv: members) { + auto addr = _address_map.find(srv.addr.id); + if (!addr.has_value()) { + missing.push_back(srv.addr.id); + } else { + ret.push_back(addr.value()); + } + } + if (!missing.empty()) { + upgrade_log.info("{}: failed to resolve IP addresses of some of the cluster members ({})", + l.function_name(), missing); + ret.clear(); + } + return ret; + } + + bool is_joint() const { + return _group0_server.get_configuration().is_joint(); + } +}; + static future wait_for_peers_to_enter_synchronize_state( - const raft::server& group0_server, netw::messaging_service&, abort_source&, gate::holder pause_shutdown); + const group0_members& members0, netw::messaging_service&, abort_source&, gate::holder pause_shutdown); static future anyone_finished_upgrade( - const raft::server& group0_server, netw::messaging_service&, abort_source&); + const group0_members& members0, netw::messaging_service&, abort_source&); static future synchronize_schema( replica::database&, netw::messaging_service&, - const raft::server& group0_server, service::migration_manager&, + const group0_members& members0, service::migration_manager&, const noncopyable_function()>& can_finish_early, abort_source&); @@ -496,6 +530,7 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft..."); auto& group0_server = _raft_gr.group0(); + group0_members members0{group0_server, _raft_gr.address_map()}; // Perform a Raft read barrier so we know the set of group 0 members and our group 0 state is up-to-date. co_await group0_server.read_barrier(&_abort_source); @@ -511,12 +546,12 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord // // In a fully upgraded cluster this should finish immediately (if the network works well) - everyone is in `use_post_raft_procedures`. // In a cluster that is currently in the middle of `upgrade_to_group0`, this will cause us to wait until the procedure finishes. - group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft..."); - if (co_await wait_for_peers_to_enter_synchronize_state(group0_server, _ms, _abort_source, _shutdown_gate.hold())) { + group0_log.info("setup_group0: waiting for peers to synchronize state..."); + if (co_await wait_for_peers_to_enter_synchronize_state(members0, _ms, _abort_source, _shutdown_gate.hold())) { // Everyone entered `synchronize` state. That means we're bootstrapping in the middle of `upgrade_to_group0`. // We need to finish upgrade as others do. - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(group0_server), std::ref(_ms), std::ref(_abort_source)); - co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source); + auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms), std::ref(_abort_source)); + co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source); } } @@ -865,13 +900,6 @@ persistent_discovery::persistent_discovery(discovery_peer my_addr, const peer_li } } -static std::vector get_raft_members_inet_addrs(const raft::config_member_set& members) { - std::vector ret; - for (auto& srv: members) { - ret.push_back(raft_addr_to_inet_addr(srv.addr)); - } - return ret; -} // Given a function `fun` that takes an `abort_source&` as parameter, // call `fun` with an internally constructed abort source which is aborted after the given time duration. @@ -949,17 +977,31 @@ with_timeout(abort_source& as, db::timeout_clock::duration d, F&& fun) { return impl(as, d, std::forward(fun)); } +// A helper class to sleep in a loop with an exponentially +// increasing retry period. +struct sleep_with_exponential_backoff { + std::chrono::seconds _retry_period{1}; + static constexpr std::chrono::seconds _max_retry_period{16}; + future<> operator()(abort_source& as) { + co_await sleep_abortable(_retry_period, as); + _retry_period = std::max(_retry_period * 2, _max_retry_period); + } +}; + + // Precondition: we joined group 0 and the server is running. // Assumes we don't leave group 0 while running. -static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const raft::server& group0_server, abort_source& as) { - static constexpr auto retry_period = std::chrono::seconds{1}; +static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const group0_members& members0, abort_source& as) { - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // We fetch both config and peers on each iteration; we don't assume that they don't change. // No new node should join while the procedure is running, but nodes may leave. - auto group0_config = group0_server.get_configuration(); - auto current_config = get_raft_members_inet_addrs(group0_config.current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + // Not all addresses are known + continue; + } std::sort(current_config.begin(), current_config.end()); auto peers = co_await sys_ks.load_peers(); @@ -969,32 +1011,29 @@ static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, std::set_difference(peers.begin(), peers.end(), current_config.begin(), current_config.end(), std::back_inserter(missing_peers)); if (missing_peers.empty()) { - if (!group0_config.is_joint()) { + if (!members0.is_joint()) { co_return; } - upgrade_log.info("group 0 configuration is joint: {}. Sleeping for a while before retrying...", group0_config); - co_await sleep_abortable(retry_period, as); + upgrade_log.info("group 0 configuration is joint: {}. Sleeping for a while before retrying...", current_config); continue; } upgrade_log.info( "group 0 configuration does not contain all peers yet." - " Missing peers: {}. Current group 0 config: {}. Current group 0 config addresses: {}. Sleeping for a while before retrying...", - missing_peers, group0_config, current_config); - - co_await sleep_abortable(retry_period, as); + " Missing peers: {}. Current group 0 config: {}. Sleeping for a while before retrying...", + missing_peers, current_config); } } // Check if anyone entered `use_post_raft_procedures`. // This is a best-effort single round-trip check; we don't retry if some nodes fail to answer. static future anyone_finished_upgrade( - const raft::server& group0_server, netw::messaging_service& ms, abort_source& as) { + const group0_members& members0, netw::messaging_service& ms, abort_source& as) { static constexpr auto max_concurrency = 10; static constexpr auto rpc_timeout = std::chrono::seconds{5}; - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); + auto current_config = members0.get_inet_addrs(); bool finished = false; co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { try { @@ -1015,22 +1054,23 @@ static future anyone_finished_upgrade( // Check if it's possible to reach everyone through `get_group0_upgrade_state` RPC. static future<> check_remote_group0_upgrade_state_dry_run( - const noncopyable_function>()>& get_peers, + const noncopyable_function>()>& get_inet_addrs, netw::messaging_service& ms, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{16}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // Note: we strive to get a response from everyone in a 'single round-trip', // so we don't skip nodes which responded in earlier iterations. // We contact everyone in each iteration even if some of these guys already answered. // We fetch peers again on every attempt to handle the possibility of leaving nodes. - auto peers = co_await get_peers(); + auto cluster_config = co_await get_inet_addrs(); + if (cluster_config.empty()) { + continue; + } bool retry = false; - co_await max_concurrent_for_each(peers, max_concurrency, [&] (const gms::inet_address& node) -> future<> { + co_await max_concurrent_for_each(cluster_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { try { upgrade_log.info("check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})`", node); co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node)); @@ -1051,11 +1091,6 @@ static future<> check_remote_group0_upgrade_state_dry_run( } upgrade_log.warn("check_remote_group0_upgrade_state_dry_run: retrying in a while..."); - - co_await sleep_abortable(retry_period, as); - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1067,8 +1102,7 @@ static future<> check_remote_group0_upgrade_state_dry_run( // Returns `true` if we finished because everybody entered `synchronize`. // Returns `false` if we finished because somebody entered `use_post_raft_procedures`. static future wait_for_peers_to_enter_synchronize_state( - const raft::server& group0_server, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) { - static constexpr auto retry_period = std::chrono::seconds{1}; + const group0_members& members0, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) { static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; @@ -1079,9 +1113,12 @@ static future wait_for_peers_to_enter_synchronize_state( // For production code this line is unnecessary. entered_synchronize->insert(utils::fb_utilities::get_broadcast_address()); - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { // We fetch the config again on every attempt to handle the possibility of removing failed nodes. - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + continue; + } ::tracker tracker; auto retry = make_lw_shared(false); @@ -1147,27 +1184,26 @@ static future wait_for_peers_to_enter_synchronize_state( } upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: retrying in a while..."); - - co_await sleep_abortable(retry_period, as); } } // Returning nullopt means we finished early (`can_finish_early` returned true). static future>> collect_schema_versions_from_group0_members( - netw::messaging_service& ms, const raft::server& group0_server, + netw::messaging_service& ms, const group0_members& members0, const noncopyable_function()>& can_finish_early, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{16}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; std::unordered_map versions; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { + // We fetch the config on each iteration; some nodes may leave. - auto group0_config = group0_server.get_configuration(); - auto current_config = get_raft_members_inet_addrs(group0_config.current); + auto current_config = members0.get_inet_addrs(); + if (current_config.empty()) { + continue; + } bool failed = false; co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const gms::inet_address& node) -> future<> { @@ -1194,8 +1230,8 @@ collect_schema_versions_from_group0_members( if (failed) { upgrade_log.warn("synchronize_schema: there were some failures when collecting remote schema versions."); - } else if (group0_config.is_joint()) { - upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", group0_config); + } else if (members0.is_joint()) { + upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", current_config); } else { co_return versions; } @@ -1209,10 +1245,6 @@ collect_schema_versions_from_group0_members( upgrade_log.info( "synchronize_schema: could not finish early." " Sleeping for a while before retrying remote schema version collection..."); - co_await sleep_abortable(retry_period, as); - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1231,20 +1263,19 @@ collect_schema_versions_from_group0_members( // should cause everybody to arrive at the same result. static future synchronize_schema( replica::database& db, netw::messaging_service& ms, - const raft::server& group0_server, service::migration_manager& mm, + const group0_members& members0, + service::migration_manager& mm, const noncopyable_function()>& can_finish_early, abort_source& as) { - static constexpr auto max_retry_period = std::chrono::seconds{32}; static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; - auto retry_period = std::chrono::seconds{1}; bool last_pull_successful = false; size_t num_attempts_after_successful_pull = 0; - while (true) { + for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { upgrade_log.info("synchronize_schema: collecting schema versions from group 0 members..."); - auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, group0_server, can_finish_early, as); + auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, members0, can_finish_early, as); if (!remote_versions) { upgrade_log.info("synchronize_schema: finished early."); co_return false; @@ -1314,11 +1345,6 @@ static future synchronize_schema( } upgrade_log.info("synchronize_schema: sleeping for a while before collecting schema versions again..."); - co_await sleep_abortable(retry_period, as); - - if (retry_period < max_retry_period) { - retry_period *= 2; - } } } @@ -1409,7 +1435,12 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // step, on the other hand, allows nodes to leave and will unblock as soon as all remaining peers are // ready to answer. upgrade_log.info("Waiting until everyone is ready to start upgrade..."); - co_await check_remote_group0_upgrade_state_dry_run(std::bind_front(&db::system_keyspace::load_peers, &_sys_ks), _ms, _abort_source); + auto get_inet_addrs = [this]() -> future> { + auto current_config = co_await _sys_ks.load_peers(); + current_config.push_back(_gossiper.get_broadcast_address()); + co_return current_config; + }; + co_await check_remote_group0_upgrade_state_dry_run(get_inet_addrs, _ms, _abort_source); if (!joined_group0()) { upgrade_log.info("Joining group 0..."); @@ -1419,6 +1450,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { "We're already a member of group 0." " Apparently we're restarting after a previous upgrade attempt failed."); } + group0_members members0{_raft_gr.group0(), _raft_gr.address_map()}; // After we joined, we shouldn't be removed from group 0 until the end of the procedure. // The implementation of `leave_group0` waits until upgrade finishes before leaving the group. @@ -1428,10 +1460,8 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // and we won't do anything harmful to other nodes while in `synchronize`, worst case being // that we get stuck. - auto& group0_server = _raft_gr.group0(); - upgrade_log.info("Waiting until every peer has joined Raft group 0..."); - co_await wait_until_every_peer_joined_group0(_sys_ks, group0_server, _abort_source); + co_await wait_until_every_peer_joined_group0(_sys_ks, members0, _abort_source); upgrade_log.info("Every peer is a member of Raft group 0."); if (start_state == group0_upgrade_state::use_pre_raft_procedures) { @@ -1445,7 +1475,7 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // to do any additional schema pulls (only verify quickly that the schema is still in sync). upgrade_log.info("Waiting for schema to synchronize across all nodes in group 0..."); auto can_finish_early = [] { return make_ready_future(false); }; - co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source); + co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source); // Before entering `synchronize`, perform a round-trip of `get_group0_upgrade_state` RPC calls // to everyone as a dry run, just to check that nodes respond to this RPC. @@ -1455,9 +1485,8 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { // a very high chance that the following steps succeed as well (we would need to be very unlucky otherwise). upgrade_log.info("Performing a dry run of remote `get_group0_upgrade_state` calls..."); co_await check_remote_group0_upgrade_state_dry_run( - [&group0_server] { - auto current_config = get_raft_members_inet_addrs(group0_server.get_configuration().current); - return make_ready_future>(std::move(current_config)); + [members0] { + return make_ready_future>(members0.get_inet_addrs()); }, _ms, _abort_source); utils::get_local_injector().inject("group0_upgrade_before_synchronize", @@ -1470,14 +1499,14 @@ future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state) { } upgrade_log.info("Waiting for all peers to enter synchronize state..."); - if (!(co_await wait_for_peers_to_enter_synchronize_state(group0_server, _ms, _abort_source, _shutdown_gate.hold()))) { + if (!(co_await wait_for_peers_to_enter_synchronize_state(members0, _ms, _abort_source, _shutdown_gate.hold()))) { upgrade_log.info("Another node already finished upgrade. We can finish early."); co_return; } upgrade_log.info("All peers in synchronize state. Waiting for schema to synchronize..."); - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(group0_server), std::ref(_ms), std::ref(_abort_source)); - if (!(co_await synchronize_schema(_qp.db().real_database(), _ms, group0_server, _mm, can_finish_early, _abort_source))) { + auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms), std::ref(_abort_source)); + if (!(co_await synchronize_schema(_qp.db().real_database(), _ms, members0, _mm, can_finish_early, _abort_source))) { upgrade_log.info("Another node already finished upgrade. We can finish early."); co_return; }