From 7222c2f9a1dbcfcfd4ba0748320387d3c9216c5f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 23 Nov 2022 10:08:47 +0100 Subject: [PATCH 1/5] service: storage_service: sleep 2*ring_delay instead of BROADCAST_INTERVAL before replace Most of the sleeps related to gossiping are based on `ring_delay`, which is configurable and can be set to lower value e.g. during tests. But for some reason there was one case where we slept for a hardcoded value, `service::load_broadcaster::BROADCAST_INTERVAL` - 60 seconds. Use `2 * get_ring_delay()` instead. With the default value of `ring_delay` (30 seconds) this will give the same behavior. --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 6e7228561f..410d12c0ab 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -497,7 +497,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // Sleep additionally to make sure that the server actually is not alive // and giving it more time to gossip if alive. slogger.info("Sleeping before replacing {}...", *replace_addr); - co_await sleep_abortable(service::load_broadcaster::BROADCAST_INTERVAL, _abort_source); + co_await sleep_abortable(2 * get_ring_delay(), _abort_source); // check for operator errors... const auto tmptr = get_token_metadata_ptr(); From 45bb5bfb52d970e931226ada92c90eb04ca81d91 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 29 Nov 2022 15:57:22 +0100 Subject: [PATCH 2/5] gms/gossiper: fetch RAFT_SERVER_ID during shadow round During the replace operation we need the Raft ID of the replaced node. The shadow round is used for fetching all necessary information before the replace operation starts. --- gms/gossiper.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 4938b2fa81..e7df039f06 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1885,7 +1885,8 @@ future<> gossiper::do_shadow_round(std::unordered_set nodes) gms::application_state::DC, gms::application_state::RACK, gms::application_state::SUPPORTED_FEATURES, - gms::application_state::SNITCH_NAME}}; + gms::application_state::SNITCH_NAME, + gms::application_state::RAFT_SERVER_ID}}; logger.info("Gossip shadow round started with nodes={}", nodes); std::unordered_set nodes_talked; size_t nodes_down = 0; From 4429885543282c48fdbc4041b036bce67fee7f81 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Mon, 28 Nov 2022 16:58:38 +0100 Subject: [PATCH 3/5] service: handle replace correctly with Raft enabled We must place the Raft ID obtained during the shadow round in the address map. It won't be placed by the regular gossiping route if we're replacing using the same IP, because we override the application state of the replaced node. Even if we replace a node with a different IP, it is not guaranteed that background gossiping manages to update the address map before we need it, especially in tests where we set ring_delay to 0 and disable wait_for_gossip_to_settle. The shadow round, on the other hand, performs a synchronous request (and if it fails during bootstrap, bootstrap will fail - because we also won't be able to obtain the tokens and Host ID of the replaced node). Fetch the Raft ID of the replaced node in `prepare_replacement_info`, which runs the shadow round. Return it in `replacement_info`. Then `join_token_ring` passes it to `setup_group0`, which stores it in the address map. It does that after `join_group0` so the entry is non-expiring (the replaced node is a member of group 0). Later in the replace procedure, we call `remove_from_group0` for the replaced node. `remove_from_group0` will be able to reverse-translate the IP of the replaced node to its Raft ID using the address map. --- service/raft/raft_group0.cc | 30 +++++++++++++++++++++++++++++- service/raft/raft_group0.hh | 21 ++++++++++++++++----- service/storage_service.cc | 22 +++++++++++++++++++++- service/storage_service.hh | 4 +++- 4 files changed, 69 insertions(+), 8 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index f4036d2f69..57241e1322 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -488,7 +488,9 @@ static future synchronize_schema( const noncopyable_function()>& can_finish_early, abort_source&); -future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes) { +future<> raft_group0::setup_group0( + db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes, + std::optional replace_info) { assert(this_shard_id() == 0); if (!_raft_gr.is_enabled()) { @@ -524,6 +526,13 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord co_return; } + if (replace_info && !replace_info->raft_id) { + auto msg = format("Cannot perform replace operation: Raft ID of the replaced server (IP: {}) is missing.", + replace_info->ip_addr); + group0_log.error("{}", msg); + throw std::runtime_error{std::move(msg)}; + } + std::vector seeds; for (auto& addr: initial_contact_nodes) { if (addr != _gossiper.get_broadcast_address()) { @@ -535,6 +544,25 @@ future<> raft_group0::setup_group0(db::system_keyspace& sys_ks, const std::unord co_await join_group0(std::move(seeds), false /* non-voter */); group0_log.info("setup_group0: successfully joined group 0."); + if (replace_info) { + // Insert the replaced node's (Raft ID, IP address) pair into `raft_address_map`. + // In general, the mapping won't be obtained through the regular gossiping route: + // if we (the replacing node) use the replaced node's IP address, the replaced node's + // application states are gone by this point (our application states overrode them + // - the application states map is using the IP address as the key). + // Even when we use a different IP, there's no guarantee the IPs were exchanged by now. + // Instead, we obtain `replace_info` during the shadow round (which guarantees to contact + // another node and fetch application states from it) and pass it to `setup_group0`. + + // Checked earlier. + assert(replace_info->raft_id); + group0_log.info("Replacing a node with Raft ID: {}, IP address: {}", + *replace_info->raft_id, replace_info->ip_addr); + + // `opt_add_entry` is shard-local, but that's fine - we only need this info on shard 0. + _raft_gr.address_map().opt_add_entry(*replace_info->raft_id, replace_info->ip_addr); + } + // Enter `synchronize` upgrade state in case the cluster we're joining has recently enabled Raft // and is currently in the middle of `upgrade_to_group0()`. For that procedure to finish // every member of group 0 (now including us) needs to enter `synchronize` state. diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index 3aeb773e2c..a7ee9ee019 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -95,6 +95,14 @@ class raft_group0 { } _status_for_monitoring; public: + // Passed to `setup_group0` when replacing a node. + struct replace_info { + gms::inet_address ip_addr; + // Optional because it might be missing when Raft is disabled or in RECOVERY mode. + // `setup_group0` will verify that it's present when it's required. + std::optional raft_id; + }; + // Assumes that the provided services are fully started. raft_group0(seastar::abort_source& abort_source, service::raft_group_registry& raft_gr, @@ -114,18 +122,21 @@ public: // Call before destroying the object. future<> abort(); - // Call during the startup procedure. + // Call during the startup procedure, after gossiping has started. + // + // If we're performing the replace operation, pass the IP and Raft ID of the replaced node + // obtained using the shadow round through the `replace_info` parameter. // // If the local RAFT feature is enabled, does one of the following: // - join group 0 (if we're bootstrapping), // - start existing group 0 server (if we bootstrapped before), - // - (TODO: not implemented yet) prepare us for the upgrade procedure, which will create group 0 later. + // - prepare us for the upgrade procedure, which will create group 0 later (if we're upgrading). // // Cannot be called twice. // // Also make sure to call `finish_setup_after_join` after the node has joined the cluster and entered NORMAL state. - // TODO: specify dependencies on other services: where during startup should we setup group 0? - future<> setup_group0(db::system_keyspace&, const std::unordered_set& initial_contact_nodes); + future<> setup_group0(db::system_keyspace&, const std::unordered_set& initial_contact_nodes, + std::optional); // Call at the end of the startup procedure, after the node entered NORMAL state. // `setup_group0()` must have finished earlier. @@ -157,7 +168,7 @@ public: // In both cases, `setup_group0()` must have finished earlier. // // The provided address may be our own - if we're replacing a node that had the same address as ours. - // We'll look for the other node's Raft ID in the group 0 configuration. + // We'll look for the other node's Raft ID in the Raft address map. future<> remove_from_group0(gms::inet_address host); // Assumes that this node's Raft server ID is already initialized and returns it. diff --git a/service/storage_service.cc b/service/storage_service.cc index 410d12c0ab..0673c41754 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -313,6 +313,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi bool replacing_a_node_with_same_ip = false; bool replacing_a_node_with_diff_ip = false; + std::optional raft_replace_info; auto tmlock = std::make_unique(co_await get_token_metadata_lock()); auto tmptr = co_await get_mutable_token_metadata_ptr(); if (is_replacing()) { @@ -330,6 +331,10 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi get_broadcast_address(), *replace_address); tmptr->update_topology(*replace_address, std::move(ri.dc_rack)); co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address); + raft_replace_info = raft_group0::replace_info { + .ip_addr = *replace_address, + .raft_id = std::move(ri.raft_id) + }; } else if (should_bootstrap()) { co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features); } else { @@ -441,7 +446,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi co_await _gossiper.start_gossiping(generation_number, app_states, advertise); assert(_group0); - co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes); + co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info); auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable { _migration_manager.local().passive_announce(std::move(schema_version)); @@ -1650,6 +1655,19 @@ future<> storage_service::remove_endpoint(inet_address endpoint) { } } +// If the Raft ID is present in gossiper and non-empty, return it. +// Otherwise return nullopt. +static std::optional get_raft_id_for(gms::gossiper& g, gms::inet_address ep) { + auto app_state = g.get_application_state_ptr(ep, gms::application_state::RAFT_SERVER_ID); + if (app_state) { + auto value = utils::UUID{app_state->value}; + if (value) { + return raft::server_id{value}; + } + } + return std::nullopt; +} + future storage_service::prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { if (!get_replace_address()) { @@ -1689,6 +1707,7 @@ storage_service::prepare_replacement_info(std::unordered_set } auto dc_rack = get_dc_rack_for(replace_address); + auto raft_id = get_raft_id_for(_gossiper, replace_address); // use the replacee's host Id as our own so we receive hints, etc auto host_id = _gossiper.get_host_id(replace_address); @@ -1699,6 +1718,7 @@ storage_service::prepare_replacement_info(std::unordered_set co_return replacement_info { .tokens = std::move(tokens), .dc_rack = std::move(dc_rack), + .raft_id = std::move(raft_id), }; } diff --git a/service/storage_service.hh b/service/storage_service.hh index bdcdc02da7..a385ee051f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -41,6 +41,7 @@ #include #include "locator/snitch_base.hh" #include "cdc/generation_id.hh" +#include "raft/raft.hh" class node_ops_cmd_request; class node_ops_cmd_response; @@ -289,10 +290,11 @@ private: future<> do_stop_ms(); future<> shutdown_protocol_servers(); - // Tokens and the CDC streams timestamp of the replaced node. struct replacement_info { std::unordered_set tokens; locator::endpoint_dc_rack dc_rack; + // Present only if Raft is enabled. + std::optional raft_id; }; future prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); From ee19411783337cfaa73127a4d4cdc6f43c656c2a Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 25 Nov 2022 20:56:33 +0100 Subject: [PATCH 4/5] service/raft: report an error when Raft ID can't be found in `raft_group0::remove_from_group0` Also simplify the code and improve logging in general. The previous code did this: search for the ID in the address map. If it couldn't be found, perform a read barrier and search again. If it again couldn't be found, return. This algorithm depended on the fact that IP addresses were stored in group 0 configuration. The read barrier was used to obtain the most recent configuration, and if the IP was not a part of address map after the read barrier, that meant it's simply not a member of group 0. This logic no longer applies so we can simplify the code. Furthermore, when I was fixing the replace operation with Raft enabled, at some point I had a "working" solution with all tests passing. But I was suspicious and checked if the replaced node got removed from group 0. It wasn't. So the replace finished "successfully", but we had an additional (voting!) member of group 0 which didn't correspond to a token ring member. The last version of my fixes ensure that the node gets removed by the replacing node. But the system is fragile and nothing prevents us from breaking this again. At least log an error for now. Regression tests will be added later. --- service/raft/raft_group0.cc | 28 ++++++++++++---------------- service/storage_service.cc | 2 -- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 57241e1322..9e43729471 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -836,23 +836,15 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) { // (if they are still a member of group 0); hence we provide `my_id` to skip us in the search. auto their_id = _raft_gr.address_map().find_replace_id(node, my_id); if (!their_id) { - // The address map is updated with the ID of every member of the configuration. - // We could not find them in the address map. This could mean two things: - // 1. they are not a member. - // 2. They are a member, but we don't know about it yet; e.g. we just upgraded - // and joined group 0 but the leader is still pushing entires to us (including config entries) - // and we don't yet have the entry which contains `their_id`. + // This may mean that the node is a member of group 0 but due to a bug we're missing + // an entry from the address map so we're not able to identify the member that must be removed. + // In this case group 0 will have a 'garbage' member corresponding to a node that has left, + // and that member may be a voter, which is bad because group 0's availability is reduced. // - // To handle the second case we perform a read barrier now and check the address again. - // Ignore the returned guard, we don't need it. - group0_log.info("remove_from_group0({}): did not find them in group 0 configuration, synchronizing Raft before retrying...", node); - co_await _raft_gr.group0().read_barrier(&_abort_source); - - their_id = _raft_gr.address_map().find_replace_id(node, my_id); - if (!their_id) { - group0_log.info("remove_from_group0({}): did not find them in group 0 configuration. Skipping.", node); - co_return; - } + // TODO: there is no API to remove members from group 0 manually. #12153 + group0_log.error("remove_from_group0({}): could not find the Raft ID of this node." + " Manual removal from group 0 may be required.", node); + co_return; } group0_log.info( @@ -860,6 +852,10 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) { node, *their_id); co_await remove_from_raft_config(*their_id); + + group0_log.info( + "remove_from_group0({}): finished removing from group 0 configuration (Raft ID: {})", + node, *their_id); } future<> raft_group0::remove_from_raft_config(raft::server_id id) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 0673c41754..fdb4174ac6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -740,7 +740,6 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st slogger.info("Replace: removing {} from group 0...", *replace_addr); assert(_group0); _group0->remove_from_group0(*replace_addr).get(); - slogger.info("Replace: {} removed from group 0.", *replace_addr); slogger.info("Starting to bootstrap..."); run_replace_ops(bootstrap_tokens); @@ -2485,7 +2484,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::listremove_from_group0(endpoint).get(); - slogger.info("removenode[{}]: node {} removed from group 0", uuid, endpoint); slogger.info("removenode[{}]: Finished removenode operation, removing node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes); } catch (...) { From 3f8aaeeab996f2f6857aa9954b2836d6f6b087a8 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 30 Nov 2022 17:33:11 +0100 Subject: [PATCH 5/5] test/topology: enable replace tests Also add some TODOs for enhancing existing tests. --- test/topology/test_topology.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/topology/test_topology.py b/test/topology/test_topology.py index 495de57a71..351a46e8d1 100644 --- a/test/topology/test_topology.py +++ b/test/topology/test_topology.py @@ -60,6 +60,7 @@ async def test_remove_node_add_column(manager, random_tables): await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1] await table.add_column() await random_tables.verify_schema() + # TODO: check that group 0 no longer contains the removed node (#12153) @pytest.mark.asyncio @@ -92,24 +93,23 @@ async def test_decommission_node_add_column(manager, random_tables): await manager.decommission_node(decommission_target.server_id) await table.add_column() await random_tables.verify_schema() + # TODO: check that group 0 no longer contains the decommissioned node (#12153) @pytest.mark.asyncio -@pytest.mark.skip(reason="Replace operation sleeps for 60 seconds") async def test_replace_different_ip(manager: ManagerClient, random_tables) -> None: servers = await manager.running_servers() await manager.server_stop(servers[0].server_id) replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False) await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node + # TODO: check that group 0 no longer contains the replaced node (#12153) @pytest.mark.asyncio -@pytest.mark.skip(reason="As above + the new node cannot join group 0") -async def replace_reuse_ip(manager: ManagerClient, random_tables) -> None: +async def test_replace_reuse_ip(manager: ManagerClient, random_tables) -> None: servers = await manager.running_servers() await manager.server_stop(servers[0].server_id) replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True) await manager.server_add(replace_cfg) - # TODO: check that group 0 no longer contains the replaced node + # TODO: check that group 0 no longer contains the replaced node (#12153) @pytest.mark.asyncio @@ -162,6 +162,7 @@ async def test_remove_node_with_concurrent_ddl(manager, random_tables): await manager.wait_for_host_down(initiator_ip, target_ip) logger.info(f'do_remove_node [{i}], invoking remove_node') await manager.remove_node(initiator_ip, target_ip, target_host_id) + # TODO: check that group 0 no longer contains the removed node (#12153) logger.info(f'do_remove_node [{i}], remove_node done') new_server_ip = await manager.server_add() logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')