Merge 'Standardize node ops sync_nodes selection' from Benny Halevy

Use token_metadata get_endpoint_to_host_id_map_for_reading
to get all normal token owners for all node operations,
rather than using gossip for some operation and
token_metadata for others.

Fixes #12862

Closes #13256

* github.com:scylladb/scylladb:
  storage_service: node ops: standardize sync_nodes selection
  storage_service: get_ignore_dead_nodes_for_replace: make static and rename to parse_node_list
This commit is contained in:
Tomasz Grabiec
2023-04-10 13:14:55 +02:00
2 changed files with 58 additions and 46 deletions

View File

@@ -1414,10 +1414,10 @@ future<> storage_service::mark_existing_views_as_built(sharded<db::system_distri
});
}
std::unordered_set<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::unordered_set<gms::inet_address> storage_service::parse_node_list(sstring comma_separated_list, const token_metadata& tm) {
std::vector<sstring> ignore_nodes_strs;
std::unordered_set<gms::inet_address> ignore_nodes;
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
boost::split(ignore_nodes_strs, comma_separated_list, boost::is_any_of(","));
for (std::string n : ignore_nodes_strs) {
try {
std::replace(n.begin(), n.end(), '\"', ' ');
@@ -1428,7 +1428,7 @@ std::unordered_set<gms::inet_address> storage_service::get_ignore_dead_nodes_for
ignore_nodes.insert(ep_and_id.endpoint);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
throw std::runtime_error(format("Failed to parse node list: {}: invalid node={}: {}", ignore_nodes_strs, n, std::current_exception()));
}
}
return ignore_nodes;
@@ -1488,7 +1488,7 @@ future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, st
{
// Wait for normal state handler to finish for existing nodes in the cluster.
auto ignore_nodes = replacement_info ? get_ignore_dead_nodes_for_replace(get_token_metadata())
auto ignore_nodes = replacement_info ? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata())
// TODO: specify ignore_nodes for bootstrap
: std::unordered_set<gms::inet_address>{};
auto sync_nodes = get_nodes_to_sync_with(ignore_nodes).get();
@@ -2751,6 +2751,7 @@ public:
sstring desc;
locator::host_id host_id; // Host ID of the node operand (i.e. added, replaced, or leaving node)
inet_address endpoint; // IP address of the node operand (i.e. added, replaced, or leaving node)
locator::token_metadata_ptr tmptr;
std::unordered_set<gms::inet_address> sync_nodes;
std::unordered_set<gms::inet_address> ignore_nodes;
node_ops_cmd_request req;
@@ -2762,6 +2763,7 @@ public:
: ss(ss_)
, host_id(id)
, endpoint(ep)
, tmptr(ss.get_token_metadata_ptr())
, req(cmd, uuid)
, heartbeat_interval(ss._db.local().get_config().nodeops_heartbeat_interval_seconds())
{}
@@ -2776,8 +2778,25 @@ public:
return req.ops_uuid;
}
void start(sstring desc_) {
// may be called multiple times
void start(sstring desc_, std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
desc = std::move(desc_);
slogger.info("{}[{}]: Started {} operation: node={}/{}", desc, uuid(), desc, host_id, endpoint);
refresh_sync_nodes(std::move(sync_to_node));
}
void refresh_sync_nodes(std::function<bool(gms::inet_address)> sync_to_node = [] (gms::inet_address) { return true; }) {
// sync data with all normal token owners
sync_nodes.clear();
for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
seastar::thread::maybe_yield();
if (!ignore_nodes.contains(node) && sync_to_node(node)) {
sync_nodes.insert(node);
}
}
for (auto& node : sync_nodes) {
if (!ss.gossiper().is_alive(node)) {
nodes_down.emplace(node);
@@ -2789,7 +2808,7 @@ public:
throw std::runtime_error(msg);
}
slogger.info("{}[{}]: Started {} operation: node={}/{}, sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
slogger.info("{}[{}]: sync_nodes={}, ignore_nodes={}", desc, uuid(), desc, host_id, endpoint, sync_nodes, ignore_nodes);
}
future<> stop() noexcept {
@@ -2977,9 +2996,14 @@ future<> storage_service::decommission() {
auto& db = ss._db.local();
node_ops_ctl ctl(ss, node_ops_cmd::decommission_prepare, db.get_config().host_id, ss.get_broadcast_address());
auto stop_ctl = deferred_stop(ctl);
auto tmptr = ss.get_token_metadata_ptr();
// Step 1: Decide who needs to sync data
// TODO: wire ignore_nodes provided by user
ctl.start("decommission");
uuid = ctl.uuid();
auto endpoint = ctl.endpoint;
const auto& tmptr = ctl.tmptr;
if (!tmptr->is_normal_token_owner(endpoint)) {
throw std::runtime_error("local node is not a member of the token ring yet");
}
@@ -3015,17 +3039,6 @@ future<> storage_service::decommission() {
slogger.info("DECOMMISSIONING: starts");
ctl.req.leaving_nodes = std::list<gms::inet_address>{endpoint};
// TODO: wire ignore_nodes provided by user
// Step 1: Decide who needs to sync data
for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
seastar::thread::maybe_yield();
if (!ctl.ignore_nodes.contains(node)) {
ctl.sync_nodes.insert(node);
}
}
ctl.start("decommission");
assert(ss._group0);
raft_available = ss._group0->wait_for_raft().get();
@@ -3137,12 +3150,13 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
node_ops_ctl ctl(*this, node_ops_cmd::bootstrap_prepare, db.get_config().host_id, get_broadcast_address());
auto stop_ctl = deferred_stop(ctl);
const auto& uuid = ctl.uuid();
// Step 1: Decide who needs to sync data for bootstrap operation
// TODO: Specify ignore_nodes
ctl.start("bootstrap");
auto start_time = std::chrono::steady_clock::now();
for (;;) {
// Step 1: Decide who needs to sync data for bootstrap operation
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
ctl.sync_nodes.insert(get_broadcast_address());
// Step 2: Wait until no pending node operations
@@ -3167,11 +3181,11 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
}
slogger.warn("bootstrap[{}]: Found pending node ops = {}, sleep 5 seconds and check again", uuid, pending_ops);
sleep_abortable(std::chrono::seconds(5), _abort_source).get();
ctl.refresh_sync_nodes();
// the bootstrapping node will be added back when we loop
}
}
ctl.start("bootstrap");
auto tokens = std::list<dht::token>(bootstrap_tokens.begin(), bootstrap_tokens.end());
ctl.req.bootstrap_nodes = {
{get_broadcast_address(), tokens},
@@ -3200,15 +3214,15 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
auto stop_ctl = deferred_stop(ctl);
const auto& uuid = ctl.uuid();
gms::inet_address replace_address = replace_info.address;
auto tmptr = get_token_metadata_ptr();
ctl.ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr);
ctl.ignore_nodes = parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), *ctl.tmptr);
// Step 1: Decide who needs to sync data for replace operation
ctl.sync_nodes = get_nodes_to_sync_with(ctl.ignore_nodes).get();
ctl.sync_nodes.erase(replace_address);
// The replacing node is not a normal token owner yet
// Add it back explicitly after checking all other nodes.
ctl.start("replace", [&] (gms::inet_address node) {
return node != replace_address;
});
ctl.sync_nodes.insert(get_broadcast_address());
ctl.start("replace");
auto sync_nodes_generations = _gossiper.get_generation_for_nodes(ctl.sync_nodes).get();
// Map existing nodes to replacing nodes
ctl.req.replace_nodes = {
@@ -3321,8 +3335,10 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
ss.raft_removenode(host_id).get();
return;
}
auto uuid = node_ops_id::create_random_id();
auto tmptr = ss.get_token_metadata_ptr();
node_ops_ctl ctl(ss, node_ops_cmd::removenode_prepare, host_id, gms::inet_address());
auto stop_ctl = deferred_stop(ctl);
auto uuid = ctl.uuid();
const auto& tmptr = ctl.tmptr;
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
assert(ss._group0);
auto raft_id = raft::server_id{host_id.uuid()};
@@ -3349,17 +3365,15 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
throw std::runtime_error(message);
}
for (auto& hoep : ignore_nodes_params) {
hoep.resolve(*tmptr);
ctl.ignore_nodes.insert(hoep.endpoint);
}
bool removed_from_token_ring = !endpoint_opt;
if (endpoint_opt) {
auto endpoint = *endpoint_opt;
node_ops_ctl ctl(ss, node_ops_cmd::removenode_prepare, host_id, endpoint, uuid);
auto stop_ctl = deferred_stop(ctl);
auto tokens = tmptr->get_tokens(endpoint);
for (auto& hoep : ignore_nodes_params) {
hoep.resolve(*tmptr);
ctl.ignore_nodes.insert(hoep.endpoint);
}
ctl.endpoint = endpoint;
// Step 1: Make the node a group 0 non-voter before removing it from the token ring.
//
@@ -3380,14 +3394,11 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
// If the user want the removenode opeartion to succeed even if some of the nodes
// are not available, the user has to explicitly pass a list of
// node that can be skipped for the operation.
for (const auto& [node, hostid] : tmptr->get_endpoint_to_host_id_map_for_reading()) {
seastar::thread::maybe_yield();
if (node != endpoint && !ctl.ignore_nodes.contains(node)) {
ctl.sync_nodes.insert(node);
}
}
ctl.start("removenode", [&] (gms::inet_address node) {
return node != endpoint;
});
ctl.start("removenode");
auto tokens = tmptr->get_tokens(endpoint);
try {
// Step 3: Start heartbeat updater

View File

@@ -294,13 +294,14 @@ private:
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
std::unordered_set<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
future<std::unordered_set<gms::inet_address>> get_nodes_to_sync_with(
const std::unordered_set<gms::inet_address>& ignore_dead_nodes);
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);
public:
static std::unordered_set<gms::inet_address> parse_node_list(sstring comma_separated_list, const locator::token_metadata& tm);
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);