From b6315d3af7fcdec2022f087b4e4bee1ac2ce4034 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 23 Jul 2021 15:36:40 +0300 Subject: [PATCH] storage_service: Fix indentation after previous patch And, while at it, s/ss/this/g and drop the ss variable. Signed-off-by: Pavel Emelyanov --- service/storage_service.cc | 420 ++++++++++++++++++------------------- 1 file changed, 209 insertions(+), 211 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 509fb3e4ba..1a343b7b62 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2435,223 +2435,221 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no } future storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) { - auto& ss = *this; + return seastar::async([this, coordinator, req = std::move(req)] () mutable { + auto ops_uuid = req.ops_uuid; + slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); - return seastar::async([&ss, coordinator, req = std::move(req)] () mutable { - auto ops_uuid = req.ops_uuid; - slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid); + if (req.cmd == node_ops_cmd::query_pending_ops) { + bool ok = true; + auto ops_uuids = boost::copy_range>(_node_ops| boost::adaptors::map_keys); + node_ops_cmd_response resp(ok, ops_uuids); + slogger.debug("node_ops_cmd_handler: Got query_pending_ops request from {}, pending_ops={}", coordinator, ops_uuids); + return resp; + } else if (req.cmd == node_ops_cmd::repair_updater) { + slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator); + _db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (database &db) { + for (const auto& table_id : tables) { + auto& table = db.find_column_family(table_id); + table.update_off_strategy_trigger(); + slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}", + ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator); + } + }).get(); + bool ok = true; + return node_ops_cmd_response(ok); + } - if (req.cmd == node_ops_cmd::query_pending_ops) { - bool ok = true; - auto ops_uuids = boost::copy_range>(ss._node_ops| boost::adaptors::map_keys); - node_ops_cmd_response resp(ok, ops_uuids); - slogger.debug("node_ops_cmd_handler: Got query_pending_ops request from {}, pending_ops={}", coordinator, ops_uuids); - return resp; - } else if (req.cmd == node_ops_cmd::repair_updater) { - slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator); - ss._db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (database &db) { - for (const auto& table_id : tables) { - auto& table = db.find_column_family(table_id); - table.update_off_strategy_trigger(); - slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}", - ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator); - } - }).get(); - bool ok = true; - return node_ops_cmd_response(ok); - } + node_ops_cmd_check(coordinator, req); - ss.node_ops_cmd_check(coordinator, req); - - if (req.cmd == node_ops_cmd::removenode_prepare) { - if (req.leaving_nodes.size() > 1) { - auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); - slogger.warn("{}", msg); - throw std::runtime_error(msg); - } - ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& node : req.leaving_nodes) { - slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); - tmptr->add_leaving_endpoint(node); - } - return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); - }).get(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { - return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& node : req.leaving_nodes) { - slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); - tmptr->del_leaving_endpoint(node); - } - return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); - }); - }, - [&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); }); - ss._node_ops.emplace(ops_uuid, std::move(meta)); - } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { - slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_update_heartbeat(ops_uuid); - } else if (req.cmd == node_ops_cmd::removenode_done) { - slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_done(ops_uuid); - } else if (req.cmd == node_ops_cmd::removenode_sync_data) { - auto it = ss._node_ops.find(ops_uuid); - if (it == ss._node_ops.end()) { - throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid)); - } - auto ops = it->second.get_ops_info(); - auto as = it->second.get_abort_source(); - for (auto& node : req.leaving_nodes) { - if (ss.is_repair_based_node_ops_enabled()) { - slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator); - ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get(); - } else { - slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); - ss.removenode_with_stream(node, as).get(); - } - } - } else if (req.cmd == node_ops_cmd::removenode_abort) { - ss.node_ops_abort(ops_uuid); - } else if (req.cmd == node_ops_cmd::decommission_prepare) { - if (req.leaving_nodes.size() > 1) { - auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); - slogger.warn("{}", msg); - throw std::runtime_error(msg); - } - ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& node : req.leaving_nodes) { - slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); - tmptr->add_leaving_endpoint(node); - } - return ss.update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); - }).get(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { - return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& node : req.leaving_nodes) { - slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); - tmptr->del_leaving_endpoint(node); - } - return ss.update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); - }); - }, - [&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); }); - ss._node_ops.emplace(ops_uuid, std::move(meta)); - } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { - slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_update_heartbeat(ops_uuid); - } else if (req.cmd == node_ops_cmd::decommission_done) { - slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); - slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion"); - ss._db.invoke_on_all([](database &db) { - for (auto& table : db.get_non_system_column_families()) { - table->trigger_offstrategy_compaction(); - } - }).get(); - ss.node_ops_done(ops_uuid); - } else if (req.cmd == node_ops_cmd::decommission_abort) { - ss.node_ops_abort(ops_uuid); - } else if (req.cmd == node_ops_cmd::replace_prepare) { - // Mark the replacing node as replacing - if (req.replace_nodes.size() > 1) { - auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); - slogger.warn("{}", msg); - throw std::runtime_error(msg); - } - ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& x: req.replace_nodes) { - auto existing_node = x.first; - auto replacing_node = x.second; - slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); - tmptr->add_replacing_endpoint(existing_node, replacing_node); - } - return make_ready_future<>(); - }).get(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { - return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& x: req.replace_nodes) { - auto existing_node = x.first; - auto replacing_node = x.second; - slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); - tmptr->del_replacing_endpoint(existing_node); - } - return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); - }); - }, - [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); - ss._node_ops.emplace(ops_uuid, std::move(meta)); - } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { - // Wait for local node has marked replacing node as alive - auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); - try { - ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); - } catch (...) { - slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", - req.ops_uuid, req.replace_nodes, std::current_exception()); - throw; - } - } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { - // Update the pending_ranges for the replacing node - slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); - ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { - return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); - }).get(); - } else if (req.cmd == node_ops_cmd::replace_heartbeat) { - slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_update_heartbeat(ops_uuid); - } else if (req.cmd == node_ops_cmd::replace_done) { - slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_done(ops_uuid); - } else if (req.cmd == node_ops_cmd::replace_abort) { - ss.node_ops_abort(ops_uuid); - } else if (req.cmd == node_ops_cmd::bootstrap_prepare) { - // Mark the bootstrap node as bootstrapping - if (req.bootstrap_nodes.size() > 1) { - auto msg = format("bootstrap[{}]: Could not bootstrap more than one node at a time: bootstrap_nodes={}", req.ops_uuid, req.bootstrap_nodes); - slogger.warn("{}", msg); - throw std::runtime_error(msg); - } - ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& x: req.bootstrap_nodes) { - auto& endpoint = x.first; - auto tokens = std::unordered_set(x.second.begin(), x.second.end()); - slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); - tmptr->add_bootstrap_tokens(tokens, endpoint); - } - return ss.update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); - }).get(); - auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); - auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable { - return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { - for (auto& x: req.bootstrap_nodes) { - auto& endpoint = x.first; - auto tokens = std::unordered_set(x.second.begin(), x.second.end()); - slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); - tmptr->remove_bootstrap_tokens(tokens); - } - return ss.update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); - }); - }, - [&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); }); - ss._node_ops.emplace(ops_uuid, std::move(meta)); - } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { - slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_update_heartbeat(ops_uuid); - } else if (req.cmd == node_ops_cmd::bootstrap_done) { - slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); - ss.node_ops_done(ops_uuid); - } else if (req.cmd == node_ops_cmd::bootstrap_abort) { - ss.node_ops_abort(ops_uuid); - } else { - auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); + if (req.cmd == node_ops_cmd::removenode_prepare) { + if (req.leaving_nodes.size() > 1) { + auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); slogger.warn("{}", msg); throw std::runtime_error(msg); } - bool ok = true; - node_ops_cmd_response resp(ok); - return resp; - }); + mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->add_leaving_endpoint(node); + } + return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable { + return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->del_leaving_endpoint(node); + } + return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes)); + }); + }, + [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::removenode_heartbeat) { + slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::removenode_done) { + slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::removenode_sync_data) { + auto it = _node_ops.find(ops_uuid); + if (it == _node_ops.end()) { + throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid)); + } + auto ops = it->second.get_ops_info(); + auto as = it->second.get_abort_source(); + for (auto& node : req.leaving_nodes) { + if (is_repair_based_node_ops_enabled()) { + slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator); + _repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get(); + } else { + slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator); + removenode_with_stream(node, as).get(); + } + } + } else if (req.cmd == node_ops_cmd::removenode_abort) { + node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::decommission_prepare) { + if (req.leaving_nodes.size() > 1) { + auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->add_leaving_endpoint(node); + } + return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable { + return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& node : req.leaving_nodes) { + slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator); + tmptr->del_leaving_endpoint(node); + } + return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes)); + }); + }, + [this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::decommission_heartbeat) { + slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::decommission_done) { + slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion"); + _db.invoke_on_all([](database &db) { + for (auto& table : db.get_non_system_column_families()) { + table->trigger_offstrategy_compaction(); + } + }).get(); + node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::decommission_abort) { + node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_prepare) { + // Mark the replacing node as replacing + if (req.replace_nodes.size() > 1) { + auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->add_replacing_endpoint(existing_node, replacing_node); + } + return make_ready_future<>(); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable { + return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.replace_nodes) { + auto existing_node = x.first; + auto replacing_node = x.second; + slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator); + tmptr->del_replacing_endpoint(existing_node); + } + return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }); + }, + [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) { + // Wait for local node has marked replacing node as alive + auto nodes = boost::copy_range>(req.replace_nodes| boost::adaptors::map_values); + try { + _gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000)); + } catch (...) { + slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}", + req.ops_uuid, req.replace_nodes, std::current_exception()); + throw; + } + } else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) { + // Update the pending_ranges for the replacing node + slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator); + mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes)); + }).get(); + } else if (req.cmd == node_ops_cmd::replace_heartbeat) { + slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_done) { + slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::replace_abort) { + node_ops_abort(ops_uuid); + } else if (req.cmd == node_ops_cmd::bootstrap_prepare) { + // Mark the bootstrap node as bootstrapping + if (req.bootstrap_nodes.size() > 1) { + auto msg = format("bootstrap[{}]: Could not bootstrap more than one node at a time: bootstrap_nodes={}", req.ops_uuid, req.bootstrap_nodes); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.bootstrap_nodes) { + auto& endpoint = x.first; + auto tokens = std::unordered_set(x.second.begin(), x.second.end()); + slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); + tmptr->add_bootstrap_tokens(tokens, endpoint); + } + return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); + }).get(); + auto ops = seastar::make_shared(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)}); + auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable { + return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable { + for (auto& x: req.bootstrap_nodes) { + auto& endpoint = x.first; + auto tokens = std::unordered_set(x.second.begin(), x.second.end()); + slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator); + tmptr->remove_bootstrap_tokens(tokens); + } + return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes)); + }); + }, + [this, ops_uuid ] { node_ops_singal_abort(ops_uuid); }); + _node_ops.emplace(ops_uuid, std::move(meta)); + } else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) { + slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator); + node_ops_update_heartbeat(ops_uuid); + } else if (req.cmd == node_ops_cmd::bootstrap_done) { + slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator); + node_ops_done(ops_uuid); + } else if (req.cmd == node_ops_cmd::bootstrap_abort) { + node_ops_abort(ops_uuid); + } else { + auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd)); + slogger.warn("{}", msg); + throw std::runtime_error(msg); + } + bool ok = true; + node_ops_cmd_response resp(ok); + return resp; + }); } // Runs inside seastar::async context