storage_service: node ops: standardize sync_nodes selection
Use token_metadata get_endpoint_to_host_id_map_for_reading to get all normal token owners for all node operations, rather than using gossip for some operation and token_metadata for others. Fixes #12862 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -2672,6 +2672,7 @@ public:
|
||||
sstring desc;
|
||||
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
|
||||
inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
|
||||
locator::token_metadata_ptr tmptr;
|
||||
std::unordered_set<gms::inet_address> sync_nodes;
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
node_ops_cmd_request req;
|
||||
@@ -2683,6 +2684,7 @@ public:
|
||||
: ss(ss_)
|
||||
, host_id(id)
|
||||
, endpoint(ep)
|
||||
, tmptr(ss.get_token_metadata_ptr())
|
||||
, req(cmd, uuid)
|
||||
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
|
||||
{}
|
||||
@@ -2697,8 +2699,25 @@ public:
|
||||
return req.ops_uuid;
|
||||
}
|
||||
|
||||
void start(sstring desc_) {
|
||||
// may be called multiple times
|
||||
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
|
||||
desc = std::move(desc_);
|
||||
|
||||
slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
|
||||
|
||||
refresh_sync_nodes(std::move(sync_to_node));
|
||||
}
|
||||
|
||||
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
|
||||
// sync data with all normal token owners
|
||||
sync_nodes.clear();
|
||||
for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (!ignore_nodes.contains(node) && sync_to_node(node)) {
|
||||
sync_nodes.insert(node);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& node : sync_nodes) {
|
||||
if (!ss.gossiper().is_alive(node)) {
|
||||
nodes_down.emplace(node);
|
||||
@@ -2710,7 +2729,7 @@ public:
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
slogger.info("{}[{}]: Started {} operation: node={}/{}, sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
|
||||
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
|
||||
}
|
||||
|
||||
future<> stop() noexcept {
|
||||
@@ -2898,9 +2917,14 @@ future<> storage_service::decommission() {
|
||||
auto& db = ss._db.local();
|
||||
node_ops_ctl ctl(ss, node_ops_cmd::decommission_prepare, db.get_config().host_id, ss.get_broadcast_address());
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
|
||||
// Step 1: Decide who needs to sync data
|
||||
// TODO: wire ignore_nodes provided by user
|
||||
ctl.start("decommission");
|
||||
|
||||
uuid = ctl.uuid();
|
||||
auto endpoint = ctl.endpoint;
|
||||
const auto& tmptr = ctl.tmptr;
|
||||
if (!tmptr->is_normal_token_owner(endpoint)) {
|
||||
throw std::runtime_error("local node is not a member of the token ring yet");
|
||||
}
|
||||
@@ -2936,17 +2960,6 @@ future<> storage_service::decommission() {
|
||||
|
||||
slogger.info("DECOMMISSIONING: starts");
|
||||
ctl.req.leaving_nodes = std::list<gms::inet_address>{endpoint};
|
||||
// TODO: wire ignore_nodes provided by user
|
||||
|
||||
// Step 1: Decide who needs to sync data
|
||||
for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (!ctl.ignore_nodes.contains(node)) {
|
||||
ctl.sync_nodes.insert(node);
|
||||
}
|
||||
}
|
||||
|
||||
ctl.start("decommission");
|
||||
|
||||
assert(ss._group0);
|
||||
raft_available = ss._group0->wait_for_raft().get();
|
||||
@@ -3058,12 +3071,13 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
node_ops_ctl ctl(*this, node_ops_cmd::bootstrap_prepare, db.get_config().host_id, get_broadcast_address());
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
const auto& uuid = ctl.uuid();
|
||||
|
||||
// Step 1: Decide who needs to sync data for bootstrap operation
|
||||
// TODO: Specify ignore_nodes
|
||||
ctl.start("bootstrap");
|
||||
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
for (;;) {
|
||||
// Step 1: Decide who needs to sync data for bootstrap operation
|
||||
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
|
||||
ctl.sync_nodes.insert(get_broadcast_address());
|
||||
|
||||
// Step 2: Wait until no pending node operations
|
||||
@@ -3088,11 +3102,11 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
}
|
||||
slogger.warn("bootstrap[{}]: Found pending node ops = {}, sleep 5 seconds and check again", uuid, pending_ops);
|
||||
sleep_abortable(std::chrono::seconds(5), _abort_source).get();
|
||||
ctl.refresh_sync_nodes();
|
||||
// the bootstrapping node will be added back when we loop
|
||||
}
|
||||
}
|
||||
|
||||
ctl.start("bootstrap");
|
||||
|
||||
auto tokens = std::list<dht::token>(bootstrap_tokens.begin(), bootstrap_tokens.end());
|
||||
ctl.req.bootstrap_nodes = {
|
||||
{get_broadcast_address(), tokens},
|
||||
@@ -3121,15 +3135,15 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
const auto& uuid = ctl.uuid();
|
||||
gms::inet_address replace_address = replace_info.address;
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *tmptr);
|
||||
ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *ctl.tmptr);
|
||||
// Step 1: Decide who needs to sync data for replace operation
|
||||
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
|
||||
ctl.sync_nodes.erase(replace_address);
|
||||
// The replacing node is not a normal token owner yet
|
||||
// Add it back explicitly after checking all other nodes.
|
||||
ctl.start("replace", [&] (gms::inet_address node) {
|
||||
return node != replace_address;
|
||||
});
|
||||
ctl.sync_nodes.insert(get_broadcast_address());
|
||||
|
||||
ctl.start("replace");
|
||||
|
||||
auto sync_nodes_generations = _gossiper.get_generation_for_nodes(ctl.sync_nodes).get();
|
||||
// Map existing nodes to replacing nodes
|
||||
ctl.req.replace_nodes = {
|
||||
@@ -3242,8 +3256,10 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
ss.raft_removenode(host_id).get();
|
||||
return;
|
||||
}
|
||||
auto uuid = node_ops_id::create_random_id();
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
node_ops_ctl ctl(ss, node_ops_cmd::removenode_prepare, host_id, gms::inet_address());
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
auto uuid = ctl.uuid();
|
||||
const auto& tmptr = ctl.tmptr;
|
||||
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
|
||||
assert(ss._group0);
|
||||
auto raft_id = raft::server_id{host_id.uuid()};
|
||||
@@ -3270,17 +3286,15 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
throw std::runtime_error(message);
|
||||
}
|
||||
|
||||
for (auto& hoep : ignore_nodes_params) {
|
||||
hoep.resolve(*tmptr);
|
||||
ctl.ignore_nodes.insert(hoep.endpoint);
|
||||
}
|
||||
|
||||
bool removed_from_token_ring = !endpoint_opt;
|
||||
if (endpoint_opt) {
|
||||
auto endpoint = *endpoint_opt;
|
||||
node_ops_ctl ctl(ss, node_ops_cmd::removenode_prepare, host_id, endpoint, uuid);
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
auto tokens = tmptr->get_tokens(endpoint);
|
||||
|
||||
for (auto& hoep : ignore_nodes_params) {
|
||||
hoep.resolve(*tmptr);
|
||||
ctl.ignore_nodes.insert(hoep.endpoint);
|
||||
}
|
||||
ctl.endpoint = endpoint;
|
||||
|
||||
// Step 1: Make the node a group 0 non-voter before removing it from the token ring.
|
||||
//
|
||||
@@ -3301,14 +3315,11 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
// If the user want the removenode opeartion to succeed even if some of the nodes
|
||||
// are not available, the user has to explicitly pass a list of
|
||||
// node that can be skipped for the operation.
|
||||
for (const auto& [node, hostid] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
|
||||
seastar::thread::maybe_yield();
|
||||
if (node != endpoint && !ctl.ignore_nodes.contains(node)) {
|
||||
ctl.sync_nodes.insert(node);
|
||||
}
|
||||
}
|
||||
ctl.start("removenode", [&] (gms::inet_address node) {
|
||||
return node != endpoint;
|
||||
});
|
||||
|
||||
ctl.start("removenode");
|
||||
auto tokens = tmptr->get_tokens(endpoint);
|
||||
|
||||
try {
|
||||
// Step 3: Start heartbeat updater
|
||||
|
||||
Reference in New Issue
Block a user