mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
storage_service: Fix indentation after previous patch
And, while at it, s/ss/this/g and drop the ss variable. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -2435,223 +2435,221 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no
|
||||
}
|
||||
|
||||
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
|
||||
auto& ss = *this;
|
||||
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
|
||||
|
||||
return seastar::async([&ss, coordinator, req = std::move(req)] () mutable {
|
||||
auto ops_uuid = req.ops_uuid;
|
||||
slogger.debug("node_ops_cmd_handler cmd={}, ops_uuid={}", uint32_t(req.cmd), ops_uuid);
|
||||
if (req.cmd == node_ops_cmd::query_pending_ops) {
|
||||
bool ok = true;
|
||||
auto ops_uuids = boost::copy_range<std::list<utils::UUID>>(_node_ops| boost::adaptors::map_keys);
|
||||
node_ops_cmd_response resp(ok, ops_uuids);
|
||||
slogger.debug("node_ops_cmd_handler: Got query_pending_ops request from {}, pending_ops={}", coordinator, ops_uuids);
|
||||
return resp;
|
||||
} else if (req.cmd == node_ops_cmd::repair_updater) {
|
||||
slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator);
|
||||
_db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (database &db) {
|
||||
for (const auto& table_id : tables) {
|
||||
auto& table = db.find_column_family(table_id);
|
||||
table.update_off_strategy_trigger();
|
||||
slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}",
|
||||
ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator);
|
||||
}
|
||||
}).get();
|
||||
bool ok = true;
|
||||
return node_ops_cmd_response(ok);
|
||||
}
|
||||
|
||||
if (req.cmd == node_ops_cmd::query_pending_ops) {
|
||||
bool ok = true;
|
||||
auto ops_uuids = boost::copy_range<std::list<utils::UUID>>(ss._node_ops| boost::adaptors::map_keys);
|
||||
node_ops_cmd_response resp(ok, ops_uuids);
|
||||
slogger.debug("node_ops_cmd_handler: Got query_pending_ops request from {}, pending_ops={}", coordinator, ops_uuids);
|
||||
return resp;
|
||||
} else if (req.cmd == node_ops_cmd::repair_updater) {
|
||||
slogger.debug("repair[{}]: Got repair_updater request from {}", ops_uuid, coordinator);
|
||||
ss._db.invoke_on_all([coordinator, ops_uuid, tables = req.repair_tables] (database &db) {
|
||||
for (const auto& table_id : tables) {
|
||||
auto& table = db.find_column_family(table_id);
|
||||
table.update_off_strategy_trigger();
|
||||
slogger.debug("repair[{}]: Updated off_strategy_trigger for table {}.{} by node {}",
|
||||
ops_uuid, table.schema()->ks_name(), table.schema()->cf_name(), coordinator);
|
||||
}
|
||||
}).get();
|
||||
bool ok = true;
|
||||
return node_ops_cmd_response(ok);
|
||||
}
|
||||
node_ops_cmd_check(coordinator, req);
|
||||
|
||||
ss.node_ops_cmd_check(coordinator, req);
|
||||
|
||||
if (req.cmd == node_ops_cmd::removenode_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_done) {
|
||||
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
|
||||
auto it = ss._node_ops.find(ops_uuid);
|
||||
if (it == ss._node_ops.end()) {
|
||||
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
|
||||
}
|
||||
auto ops = it->second.get_ops_info();
|
||||
auto as = it->second.get_abort_source();
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
if (ss.is_repair_based_node_ops_enabled()) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
ss._repair.local().removenode_with_repair(ss.get_token_metadata_ptr(), node, ops).get();
|
||||
} else {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
ss.removenode_with_stream(node, as).get();
|
||||
}
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid] () mutable { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_done) {
|
||||
slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion");
|
||||
ss._db.invoke_on_all([](database &db) {
|
||||
for (auto& table : db.get_non_system_column_families()) {
|
||||
table->trigger_offstrategy_compaction();
|
||||
}
|
||||
}).get();
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare) {
|
||||
// Mark the replacing node as replacing
|
||||
if (req.replace_nodes.size() > 1) {
|
||||
auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->del_replacing_endpoint(existing_node);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
// Wait for local node has marked replacing node as alive
|
||||
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
|
||||
try {
|
||||
ss._gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000));
|
||||
} catch (...) {
|
||||
slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}",
|
||||
req.ops_uuid, req.replace_nodes, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) {
|
||||
// Update the pending_ranges for the replacing node
|
||||
slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
return ss.update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
||||
}).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
|
||||
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_done) {
|
||||
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_prepare) {
|
||||
// Mark the bootstrap node as bootstrapping
|
||||
if (req.bootstrap_nodes.size() > 1) {
|
||||
auto msg = format("bootstrap[{}]: Could not bootstrap more than one node at a time: bootstrap_nodes={}", req.ops_uuid, req.bootstrap_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
ss.mutate_token_metadata([coordinator, &req, &ss] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [&ss, coordinator, req = std::move(req)] () mutable {
|
||||
return ss.mutate_token_metadata([&ss, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
}
|
||||
return ss.update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
});
|
||||
},
|
||||
[&ss, ops_uuid ] { ss.node_ops_singal_abort(ops_uuid); });
|
||||
ss._node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_done) {
|
||||
slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
ss.node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_abort) {
|
||||
ss.node_ops_abort(ops_uuid);
|
||||
} else {
|
||||
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
|
||||
if (req.cmd == node_ops_cmd::removenode_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("removenode[{}]: Could not removenode more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
bool ok = true;
|
||||
node_ops_cmd_response resp(ok);
|
||||
return resp;
|
||||
});
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_done) {
|
||||
slogger.info("removenode[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::removenode_sync_data) {
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it == _node_ops.end()) {
|
||||
throw std::runtime_error(format("removenode[{}]: Can not find ops_uuid={}", ops_uuid, ops_uuid));
|
||||
}
|
||||
auto ops = it->second.get_ops_info();
|
||||
auto as = it->second.get_abort_source();
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
if (is_repair_based_node_ops_enabled()) {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using repair, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
_repair.local().removenode_with_repair(get_token_metadata_ptr(), node, ops).get();
|
||||
} else {
|
||||
slogger.info("removenode[{}]: Started to sync data for removing node={} using stream, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
removenode_with_stream(node, as).get();
|
||||
}
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::removenode_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_prepare) {
|
||||
if (req.leaving_nodes.size() > 1) {
|
||||
auto msg = format("decommission[{}]: Could not decommission more than one node at a time: leaving_nodes={}", req.ops_uuid, req.leaving_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Added node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->add_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& node : req.leaving_nodes) {
|
||||
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
|
||||
tmptr->del_leaving_endpoint(node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_done) {
|
||||
slogger.info("decommission[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
slogger.debug("Triggering off-strategy compaction for all non-system tables on decommission completion");
|
||||
_db.invoke_on_all([](database &db) {
|
||||
for (auto& table : db.get_non_system_column_families()) {
|
||||
table->trigger_offstrategy_compaction();
|
||||
}
|
||||
}).get();
|
||||
node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::decommission_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare) {
|
||||
// Mark the replacing node as replacing
|
||||
if (req.replace_nodes.size() > 1) {
|
||||
auto msg = format("replace[{}]: Could not replace more than one node at a time: replace_nodes={}", req.ops_uuid, req.replace_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Added replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->add_replacing_endpoint(existing_node, replacing_node);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.replace_nodes) {
|
||||
auto existing_node = x.first;
|
||||
auto replacing_node = x.second;
|
||||
slogger.info("replace[{}]: Removed replacing_node={} to replace existing_node={}, coordinator={}", req.ops_uuid, replacing_node, existing_node, coordinator);
|
||||
tmptr->del_replacing_endpoint(existing_node);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
// Wait for local node has marked replacing node as alive
|
||||
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
|
||||
try {
|
||||
_gossiper.wait_alive(nodes, std::chrono::milliseconds(120 * 1000));
|
||||
} catch (...) {
|
||||
slogger.warn("replace[{}]: Failed to wait for marking replacing node as up, replace_nodes={}: {}",
|
||||
req.ops_uuid, req.replace_nodes, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_pending_ranges) {
|
||||
// Update the pending_ranges for the replacing node
|
||||
slogger.debug("replace[{}]: Updated pending_ranges from coordinator={}", req.ops_uuid, coordinator);
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
|
||||
}).get();
|
||||
} else if (req.cmd == node_ops_cmd::replace_heartbeat) {
|
||||
slogger.debug("replace[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_done) {
|
||||
slogger.info("replace[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::replace_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_prepare) {
|
||||
// Mark the bootstrap node as bootstrapping
|
||||
if (req.bootstrap_nodes.size() > 1) {
|
||||
auto msg = format("bootstrap[{}]: Could not bootstrap more than one node at a time: bootstrap_nodes={}", req.ops_uuid, req.bootstrap_nodes);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
mutate_token_metadata([coordinator, &req, this] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Added node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->add_bootstrap_tokens(tokens, endpoint);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
}).get();
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, false, std::move(req.ignore_nodes)});
|
||||
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ops), [this, coordinator, req = std::move(req)] () mutable {
|
||||
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
|
||||
for (auto& x: req.bootstrap_nodes) {
|
||||
auto& endpoint = x.first;
|
||||
auto tokens = std::unordered_set<dht::token>(x.second.begin(), x.second.end());
|
||||
slogger.info("bootstrap[{}]: Removed node={} as bootstrap, coordinator={}", req.ops_uuid, endpoint, coordinator);
|
||||
tmptr->remove_bootstrap_tokens(tokens);
|
||||
}
|
||||
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_update_heartbeat(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_done) {
|
||||
slogger.info("bootstrap[{}]: Marked ops done from coordinator={}", req.ops_uuid, coordinator);
|
||||
node_ops_done(ops_uuid);
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_abort) {
|
||||
node_ops_abort(ops_uuid);
|
||||
} else {
|
||||
auto msg = format("node_ops_cmd_handler: ops_uuid={}, unknown cmd={}", req.ops_uuid, uint32_t(req.cmd));
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
bool ok = true;
|
||||
node_ops_cmd_response resp(ok);
|
||||
return resp;
|
||||
});
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
|
||||
Reference in New Issue
Block a user