diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 4938b2fa81..e7df039f06 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1885,7 +1885,8 @@ future<> gossiper::do_shadow_round(std::unordered_set nodes) gms::application_state::DC, gms::application_state::RACK, gms::application_state::SUPPORTED_FEATURES, - gms::application_state::SNITCH_NAME}}; + gms::application_state::SNITCH_NAME, + gms::application_state::RAFT_SERVER_ID}}; logger.info("Gossip shadow round started with nodes={}", nodes); std::unordered_set nodes_talked; size_t nodes_down = 0; diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index f4036d2f69..9e43729471 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -488,7 +488,9 @@ static future synchronize_schema( const noncopyable_function()>& can_finish_early, abort_source&); -future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes) { +future<> raft_group0::setup_group0( + db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes, + std::optional replace_info) { assert(this_shard_id() == 0); if (!_raft_gr.is_enabled()) { @@ -524,6 +526,13 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord co_return; } + if (replace_info && !replace_info->raft_id) { + auto msg = format("Cannot perform replace operation: Raft ID of the replaced server (IP: {}) is missing.", + replace_info->ip_addr); + group0_log.error("{}", msg); + throw std::runtime_error{std::move(msg)}; + } + std::vector seeds; for (auto& addr: initial_contact_nodes) { if (addr != _gossiper.get_broadcast_address()) { @@ -535,6 +544,25 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord co_await join_group0(std::move(seeds), false /* non-voter */); group0_log.info("setup_group0: successfully joined group 0."); + if (replace_info) { + // Insert the replaced node's (Raft ID, IP address) pair into `raft_address_map`. + // In general, the mapping won't be obtained through the regular gossiping route: + // if we (the replacing node) use the replaced node's IP address, the replaced node's + // application states are gone by this point (our application states overrode them + // - the application states map is using the IP address as the key). + // Even when we use a different IP, there's no guarantee the IPs were exchanged by now. + // Instead, we obtain `replace_info` during the shadow round (which guarantees to contact + // another node and fetch application states from it) and pass it to `setup_group0`. + + // Checked earlier. + assert(replace_info->raft_id); + group0_log.info("Replacing a node with Raft ID: {}, IP address: {}", + *replace_info->raft_id, replace_info->ip_addr); + + // `opt_add_entry` is shard-local, but that's fine - we only need this info on shard 0. + _raft_gr.address_map().opt_add_entry(*replace_info->raft_id, replace_info->ip_addr); + } + // Enter `synchronize` upgrade state in case the cluster we're joining has recently enabled Raft // and is currently in the middle of `upgrade_to_group0()`. For that procedure to finish // every member of group 0 (now including us) needs to enter `synchronize` state. @@ -808,23 +836,15 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) { // (if they are still a member of group 0); hence we provide `my_id` to skip us in the search. auto their_id = _raft_gr.address_map().find_replace_id(node, my_id); if (!their_id) { - // The address map is updated with the ID of every member of the configuration. - // We could not find them in the address map. This could mean two things: - // 1. they are not a member. - // 2. They are a member, but we don't know about it yet; e.g. we just upgraded - // and joined group 0 but the leader is still pushing entires to us (including config entries) - // and we don't yet have the entry which contains `their_id`. + // This may mean that the node is a member of group 0 but due to a bug we're missing + // an entry from the address map so we're not able to identify the member that must be removed. + // In this case group 0 will have a 'garbage' member corresponding to a node that has left, + // and that member may be a voter, which is bad because group 0's availability is reduced. // - // To handle the second case we perform a read barrier now and check the address again. - // Ignore the returned guard, we don't need it. - group0_log.info("remove_from_group0({}): did not find them in group 0 configuration, synchronizing Raft before retrying...", node); - co_await _raft_gr.group0().read_barrier(&_abort_source); - - their_id = _raft_gr.address_map().find_replace_id(node, my_id); - if (!their_id) { - group0_log.info("remove_from_group0({}): did not find them in group 0 configuration. Skipping.", node); - co_return; - } + // TODO: there is no API to remove members from group 0 manually. #12153 + group0_log.error("remove_from_group0({}): could not find the Raft ID of this node." + " Manual removal from group 0 may be required.", node); + co_return; } group0_log.info( @@ -832,6 +852,10 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) { node, *their_id); co_await remove_from_raft_config(*their_id); + + group0_log.info( + "remove_from_group0({}): finished removing from group 0 configuration (Raft ID: {})", + node, *their_id); } future<> raft_group0::remove_from_raft_config(raft::server_id id) { diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index 3aeb773e2c..a7ee9ee019 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -95,6 +95,14 @@ class raft_group0 { } _status_for_monitoring; public: + // Passed to `setup_group0` when replacing a node. + struct replace_info { + gms::inet_address ip_addr; + // Optional because it might be missing when Raft is disabled or in RECOVERY mode. + // `setup_group0` will verify that it's present when it's required. + std::optional raft_id; + }; + // Assumes that the provided services are fully started. raft_group0(seastar::abort_source& abort_source, service::raft_group_registry& raft_gr, @@ -114,18 +122,21 @@ public: // Call before destroying the object. future<> abort(); - // Call during the startup procedure. + // Call during the startup procedure, after gossiping has started. + // + // If we're performing the replace operation, pass the IP and Raft ID of the replaced node + // obtained using the shadow round through the `replace_info` parameter. // // If the local RAFT feature is enabled, does one of the following: // - join group 0 (if we're bootstrapping), // - start existing group 0 server (if we bootstrapped before), - // - (TODO: not implemented yet) prepare us for the upgrade procedure, which will create group 0 later. + // - prepare us for the upgrade procedure, which will create group 0 later (if we're upgrading). // // Cannot be called twice. // // Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state. - // TODO: specify dependencies on other services: where during startup should we setup group 0? - future<> setup_group0(db::system_keyspace&, const std::unordered_set& initial_contact_nodes); + future<> setup_group0(db::system_keyspace&, const std::unordered_set& initial_contact_nodes, + std::optional); // Call at the end of the startup procedure, after the node entered NORMAL state. // `setup_group0()` must have finished earlier. @@ -157,7 +168,7 @@ public: // In both cases, `setup_group0()` must have finished earlier. // // The provided address may be our own - if we're replacing a node that had the same address as ours. - // We'll look for the other node's Raft ID in the group 0 configuration. + // We'll look for the other node's Raft ID in the Raft address map. future<> remove_from_group0(gms::inet_address host); // Assumes that this node's Raft server ID is already initialized and returns it. diff --git a/service/storage_service.cc b/service/storage_service.cc index 2843f60bd5..d2749baa73 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -313,6 +313,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi bool replacing_a_node_with_same_ip = false; bool replacing_a_node_with_diff_ip = false; + std::optional raft_replace_info; auto tmlock = std::make_unique(co_await get_token_metadata_lock()); auto tmptr = co_await get_mutable_token_metadata_ptr(); if (is_replacing()) { @@ -330,6 +331,10 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi get_broadcast_address(), *replace_address); tmptr->update_topology(*replace_address, std::move(ri.dc_rack)); co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address); + raft_replace_info = raft_group0::replace_info { + .ip_addr = *replace_address, + .raft_id = std::move(ri.raft_id) + }; } else if (should_bootstrap()) { co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features); } else { @@ -441,7 +446,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi co_await _gossiper.start_gossiping(generation_number, app_states, advertise); assert(_group0); - co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes); + co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info); auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable { _migration_manager.local().passive_announce(std::move(schema_version)); @@ -497,7 +502,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // Sleep additionally to make sure that the server actually is not alive // and giving it more time to gossip if alive. slogger.info("Sleeping before replacing {}...", *replace_addr); - co_await sleep_abortable(service::load_broadcaster::BROADCAST_INTERVAL, _abort_source); + co_await sleep_abortable(2 * get_ring_delay(), _abort_source); // check for operator errors... const auto tmptr = get_token_metadata_ptr(); @@ -735,7 +740,6 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st slogger.info("Replace: removing {} from group 0...", *replace_addr); assert(_group0); _group0->remove_from_group0(*replace_addr).get(); - slogger.info("Replace: {} removed from group 0.", *replace_addr); slogger.info("Starting to bootstrap..."); run_replace_ops(bootstrap_tokens); @@ -1674,6 +1678,19 @@ future<> storage_service::remove_endpoint(inet_address endpoint) { } } +// If the Raft ID is present in gossiper and non-empty, return it. +// Otherwise return nullopt. +static std::optional get_raft_id_for(gms::gossiper& g, gms::inet_address ep) { + auto app_state = g.get_application_state_ptr(ep, gms::application_state::RAFT_SERVER_ID); + if (app_state) { + auto value = utils::UUID{app_state->value}; + if (value) { + return raft::server_id{value}; + } + } + return std::nullopt; +} + future storage_service::prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { if (!get_replace_address()) { @@ -1713,6 +1730,7 @@ storage_service::prepare_replacement_info(std::unordered_set } auto dc_rack = get_dc_rack_for(replace_address); + auto raft_id = get_raft_id_for(_gossiper, replace_address); // use the replacee's host Id as our own so we receive hints, etc auto host_id = _gossiper.get_host_id(replace_address); @@ -1723,6 +1741,7 @@ storage_service::prepare_replacement_info(std::unordered_set co_return replacement_info { .tokens = std::move(tokens), .dc_rack = std::move(dc_rack), + .raft_id = std::move(raft_id), }; } @@ -2489,7 +2508,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::listremove_from_group0(endpoint).get(); - slogger.info("removenode[{}]: node {} removed from group 0", uuid, endpoint); slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); } catch (...) { diff --git a/service/storage_service.hh b/service/storage_service.hh index bdcdc02da7..a385ee051f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -41,6 +41,7 @@ #include #include "locator/snitch_base.hh" #include "cdc/generation_id.hh" +#include "raft/raft.hh" class node_ops_cmd_request; class node_ops_cmd_response; @@ -289,10 +290,11 @@ private: future<> do_stop_ms(); future<> shutdown_protocol_servers(); - // Tokens and the CDC streams timestamp of the replaced node. struct replacement_info { std::unordered_set tokens; locator::endpoint_dc_rack dc_rack; + // Present only if Raft is enabled. + std::optional raft_id; }; future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py index 495de57a71..351a46e8d1 100644 --- a/test/topology/test_topology.py +++ b/test/topology/test_topology.py @@ -60,6 +60,7 @@ async def test_remove_node_add_column(manager, random_tables): await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1] await table.add_column() await random_tables.verify_schema() + # TODO: check that group 0 no longer contains the removed node (#12153) @pytest.mark.asyncio @@ -92,24 +93,23 @@ async def test_decommission_node_add_column(manager, random_tables): await manager.decommission_node(decommission_target.server_id) await table.add_column() await random_tables.verify_schema() + # TODO: check that group 0 no longer contains the decommissioned node (#12153) @pytest.mark.asyncio -@pytest.mark.skip(reason="Replace operation sleeps for 60 seconds") async def test_replace_different_ip(manager: ManagerClient, random_tables) -> None: servers = await manager.running_servers() await manager.server_stop(servers[0].server_id) replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False) await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node + # TODO: check that group 0 no longer contains the replaced node (#12153) @pytest.mark.asyncio -@pytest.mark.skip(reason="As above + the new node cannot join group 0") -async def replace_reuse_ip(manager: ManagerClient, random_tables) -> None: +async def test_replace_reuse_ip(manager: ManagerClient, random_tables) -> None: servers = await manager.running_servers() await manager.server_stop(servers[0].server_id) replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True) await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node + # TODO: check that group 0 no longer contains the replaced node (#12153) @pytest.mark.asyncio @@ -162,6 +162,7 @@ async def test_remove_node_with_concurrent_ddl(manager, random_tables): await manager.wait_for_host_down(initiator_ip, target_ip) logger.info(f'do_remove_node [{i}], invoking remove_node') await manager.remove_node(initiator_ip, target_ip, target_host_id) + # TODO: check that group 0 no longer contains the removed node (#12153) logger.info(f'do_remove_node [{i}], remove_node done') new_server_ip = await manager.server_add() logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')