From 56be654edc63bfc29a148d71c22327ba89e7e435 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 20 Mar 2023 15:41:56 +0200 Subject: [PATCH 1/2] storage_service: get_ignore_dead_nodes_for_replace: make static and rename to parse_node_list Let the caller pass the string to parse to the function rather than the function itself get to it via _db.local().get_config() so it could be used as a general purpose function. Make it static now that it doesn't require an instance. Rename to `parse_node_list` as that's what the function does. It doesn't care if the nodes are to be ignored or something else (e.g. removed), they only need to be in token_metadata. Signed-off-by: Benny Halevy --- service/storage_service.cc | 10 +++++----- service/storage_service.hh | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index cd58e7a633..ba39407e38 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1332,10 +1332,10 @@ future<> storage_service::mark_existing_views_as_built(sharded storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) { +std::unordered_set storage_service::parse_node_list(sstring comma_separated_list, const token_metadata& tm) { std::vector ignore_nodes_strs; std::unordered_set ignore_nodes; - boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(",")); + boost::split(ignore_nodes_strs, comma_separated_list, boost::is_any_of(",")); for (std::string n : ignore_nodes_strs) { try { std::replace(n.begin(), n.end(), '\"', ' '); @@ -1346,7 +1346,7 @@ std::unordered_set storage_service::get_ignore_dead_nodes_for ignore_nodes.insert(ep_and_id.endpoint); } } catch (...) { - throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception())); + throw std::runtime_error(format("Failed to parse node list: {}: invalid node={}: {}", ignore_nodes_strs, n, std::current_exception())); } } return ignore_nodes; @@ -1406,7 +1406,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st { // Wait for normal state handler to finish for existing nodes in the cluster. - auto ignore_nodes = replacement_info ? get_ignore_dead_nodes_for_replace(get_token_metadata()) + auto ignore_nodes = replacement_info ? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata()) // TODO: specify ignore_nodes for bootstrap : std::unordered_set{}; auto sync_nodes = get_nodes_to_sync_with(ignore_nodes).get(); @@ -3122,7 +3122,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token const auto& uuid = ctl.uuid(); gms::inet_address replace_address = replace_info.address; auto tmptr = get_token_metadata_ptr(); - ctl.ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr); + ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *tmptr); // Step 1: Decide who needs to sync data for replace operation ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get(); ctl.sync_nodes.erase(replace_address); diff --git a/service/storage_service.hh b/service/storage_service.hh index 1b14463451..002eba1f6c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -294,13 +294,14 @@ private: void run_replace_ops(std::unordered_set& bootstrap_tokens, replacement_info replace_info); void run_bootstrap_ops(std::unordered_set& bootstrap_tokens); - std::unordered_set get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm); future> get_nodes_to_sync_with( const std::unordered_set& ignore_dead_nodes); future<> wait_for_ring_to_settle(std::chrono::milliseconds delay); public: + static std::unordered_set parse_node_list(sstring comma_separated_list, const locator::token_metadata& tm); + future<> check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); From ca61d88764f234851c95a26123dbe72abd28ed4b Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 20 Mar 2023 15:33:39 +0200 Subject: [PATCH 2/2] storage_service: node ops: standardize sync_nodes selection Use token_metadata get_endpoint_to_host_id_map_for_reading to get all normal token owners for all node operations, rather than using gossip for some operation and token_metadata for others. Fixes #12862 Signed-off-by: Benny Halevy --- service/storage_service.cc | 93 +++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index ba39407e38..5c2c256f8d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2672,6 +2672,7 @@ public: sstring desc; locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node) inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node) + locator::token_metadata_ptr tmptr; std::unordered_set sync_nodes; std::unordered_set ignore_nodes; node_ops_cmd_request req; @@ -2683,6 +2684,7 @@ public: : ss(ss_) , host_id(id) , endpoint(ep) + , tmptr(ss.get_token_metadata_ptr()) , req(cmd, uuid) , heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds()) {} @@ -2697,8 +2699,25 @@ public: return req.ops_uuid; } - void start(sstring desc_) { + // may be called multiple times + void start(sstring desc_, std::function sync_to_node = [] (gms::inet_address) { return true; }) { desc = std::move(desc_); + + slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint); + + refresh_sync_nodes(std::move(sync_to_node)); + } + + void refresh_sync_nodes(std::function sync_to_node = [] (gms::inet_address) { return true; }) { + // sync data with all normal token owners + sync_nodes.clear(); + for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) { + seastar::thread::maybe_yield(); + if (!ignore_nodes.contains(node) && sync_to_node(node)) { + sync_nodes.insert(node); + } + } + for (auto& node : sync_nodes) { if (!ss.gossiper().is_alive(node)) { nodes_down.emplace(node); @@ -2710,7 +2729,7 @@ public: throw std::runtime_error(msg); } - slogger.info("{}[{}]: Started {} operation: node={}/{}, sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes); + slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes); } future<> stop() noexcept { @@ -2898,9 +2917,14 @@ future<> storage_service::decommission() { auto& db = ss._db.local(); node_ops_ctl ctl(ss, node_ops_cmd::decommission_prepare, db.get_config().host_id, ss.get_broadcast_address()); auto stop_ctl = deferred_stop(ctl); - auto tmptr = ss.get_token_metadata_ptr(); + + // Step 1: Decide who needs to sync data + // TODO: wire ignore_nodes provided by user + ctl.start("decommission"); + uuid = ctl.uuid(); auto endpoint = ctl.endpoint; + const auto& tmptr = ctl.tmptr; if (!tmptr->is_normal_token_owner(endpoint)) { throw std::runtime_error("local node is not a member of the token ring yet"); } @@ -2936,17 +2960,6 @@ future<> storage_service::decommission() { slogger.info("DECOMMISSIONING: starts"); ctl.req.leaving_nodes = std::list{endpoint}; - // TODO: wire ignore_nodes provided by user - - // Step 1: Decide who needs to sync data - for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) { - seastar::thread::maybe_yield(); - if (!ctl.ignore_nodes.contains(node)) { - ctl.sync_nodes.insert(node); - } - } - - ctl.start("decommission"); assert(ss._group0); raft_available = ss._group0->wait_for_raft().get(); @@ -3058,12 +3071,13 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok node_ops_ctl ctl(*this, node_ops_cmd::bootstrap_prepare, db.get_config().host_id, get_broadcast_address()); auto stop_ctl = deferred_stop(ctl); const auto& uuid = ctl.uuid(); + + // Step 1: Decide who needs to sync data for bootstrap operation // TODO: Specify ignore_nodes + ctl.start("bootstrap"); auto start_time = std::chrono::steady_clock::now(); for (;;) { - // Step 1: Decide who needs to sync data for bootstrap operation - ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get(); ctl.sync_nodes.insert(get_broadcast_address()); // Step 2: Wait until no pending node operations @@ -3088,11 +3102,11 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok } slogger.warn("bootstrap[{}]: Found pending node ops = {}, sleep 5 seconds and check again", uuid, pending_ops); sleep_abortable(std::chrono::seconds(5), _abort_source).get(); + ctl.refresh_sync_nodes(); + // the bootstrapping node will be added back when we loop } } - ctl.start("bootstrap"); - auto tokens = std::list(bootstrap_tokens.begin(), bootstrap_tokens.end()); ctl.req.bootstrap_nodes = { {get_broadcast_address(), tokens}, @@ -3121,15 +3135,15 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token auto stop_ctl = deferred_stop(ctl); const auto& uuid = ctl.uuid(); gms::inet_address replace_address = replace_info.address; - auto tmptr = get_token_metadata_ptr(); - ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *tmptr); + ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *ctl.tmptr); // Step 1: Decide who needs to sync data for replace operation - ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get(); - ctl.sync_nodes.erase(replace_address); + // The replacing node is not a normal token owner yet + // Add it back explicitly after checking all other nodes. + ctl.start("replace", [&] (gms::inet_address node) { + return node != replace_address; + }); ctl.sync_nodes.insert(get_broadcast_address()); - ctl.start("replace"); - auto sync_nodes_generations = _gossiper.get_generation_for_nodes(ctl.sync_nodes).get(); // Map existing nodes to replacing nodes ctl.req.replace_nodes = { @@ -3242,8 +3256,10 @@ future<> storage_service::removenode(locator::host_id host_id, std::listget_endpoint_for_host_id(host_id); assert(ss._group0); auto raft_id = raft::server_id{host_id.uuid()}; @@ -3270,17 +3286,15 @@ future<> storage_service::removenode(locator::host_id host_id, std::listget_tokens(endpoint); - - for (auto& hoep : ignore_nodes_params) { - hoep.resolve(*tmptr); - ctl.ignore_nodes.insert(hoep.endpoint); - } + ctl.endpoint = endpoint; // Step 1: Make the node a group 0 non-voter before removing it from the token ring. // @@ -3301,14 +3315,11 @@ future<> storage_service::removenode(locator::host_id host_id, std::listget_endpoint_to_host_id_map_for_reading()) { - seastar::thread::maybe_yield(); - if (node != endpoint && !ctl.ignore_nodes.contains(node)) { - ctl.sync_nodes.insert(node); - } - } + ctl.start("removenode", [&] (gms::inet_address node) { + return node != endpoint; + }); - ctl.start("removenode"); + auto tokens = tmptr->get_tokens(endpoint); try { // Step 3: Start heartbeat updater