storage_service: node ops: Extract node_ops_insert() to reduce code duplication

This commit is contained in:
Tomasz Grabiec
2023-03-01 18:41:46 +01:00
parent d5021d5a1b
commit 2d935e255a
2 changed files with 21 additions and 29 deletions

View File

@@ -2630,11 +2630,6 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no
}
}
static
std::chrono::seconds watchdog_timeout(const db::config& cfg) {
return std::chrono::seconds(cfg.nodeops_watchdog_timeout_seconds());
}
void storage_service::on_node_ops_registered(node_ops_id ops_uuid) {
utils::get_local_injector().inject("storage_service_nodeops_prepare_handler_sleep3", std::chrono::seconds{3}).get();
utils::get_local_injector().inject("storage_service_nodeops_abort_after_1s", [this, ops_uuid] {
@@ -2646,6 +2641,17 @@ void storage_service::on_node_ops_registered(node_ops_id ops_uuid) {
});
}
void storage_service::node_ops_insert(node_ops_id ops_uuid,
gms::inet_address coordinator,
std::list<inet_address> ignore_nodes,
std::function<future<>()> abort_func) {
auto watchdog_interval = std::chrono::seconds(_db.local().get_config().nodeops_watchdog_timeout_seconds());
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(ignore_nodes), watchdog_interval, std::move(abort_func),
[this, ops_uuid]() mutable { node_ops_singal_abort(ops_uuid); });
_node_ops.emplace(ops_uuid, std::move(meta));
on_node_ops_registered(ops_uuid);
}
future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req) {
return seastar::async([this, coordinator, req = std::move(req)] () mutable {
auto ops_uuid = req.ops_uuid;
@@ -2692,8 +2698,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
}).get();
auto watchdog_interval = watchdog_timeout(_db.local().get_config());
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), watchdog_interval, [this, coordinator, req = std::move(req)] () mutable {
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("removenode[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
@@ -2701,10 +2706,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("removenode {}", req.leaving_nodes));
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
_node_ops.emplace(ops_uuid, std::move(meta));
on_node_ops_registered(ops_uuid);
});
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid).get();
@@ -2744,8 +2746,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
}).get();
auto watchdog_interval = watchdog_timeout(_db.local().get_config());
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), watchdog_interval, [this, coordinator, req = std::move(req)] () mutable {
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& node : req.leaving_nodes) {
slogger.info("decommission[{}]: Removed node={} as leaving node, coordinator={}", req.ops_uuid, node, coordinator);
@@ -2753,10 +2754,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("decommission {}", req.leaving_nodes));
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
_node_ops.emplace(ops_uuid, std::move(meta));
on_node_ops_registered(ops_uuid);
});
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid).get();
@@ -2808,8 +2806,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return make_ready_future<>();
}).get();
auto watchdog_interval = watchdog_timeout(_db.local().get_config());
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), watchdog_interval, [this, coordinator, req = std::move(req)] () mutable {
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.replace_nodes) {
auto existing_node = x.first;
@@ -2819,10 +2816,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("replace {}", req.replace_nodes));
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
_node_ops.emplace(ops_uuid, std::move(meta));
on_node_ops_registered(ops_uuid);
});
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
// Wait for local node has marked replacing node as alive
auto nodes = boost::copy_range<std::vector<inet_address>>(req.replace_nodes| boost::adaptors::map_values);
@@ -2864,8 +2858,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
}).get();
auto watchdog_interval = watchdog_timeout(_db.local().get_config());
auto meta = node_ops_meta_data(ops_uuid, coordinator, std::move(req.ignore_nodes), watchdog_interval, [this, coordinator, req = std::move(req)] () mutable {
node_ops_insert(ops_uuid, coordinator, std::move(req.ignore_nodes), [this, coordinator, req = std::move(req)] () mutable {
return mutate_token_metadata([this, coordinator, req = std::move(req)] (mutable_token_metadata_ptr tmptr) mutable {
for (auto& x: req.bootstrap_nodes) {
auto& endpoint = x.first;
@@ -2875,10 +2868,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
}
return update_pending_ranges(tmptr, format("bootstrap {}", req.bootstrap_nodes));
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
_node_ops.emplace(ops_uuid, std::move(meta));
on_node_ops_registered(ops_uuid);
});
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
node_ops_update_heartbeat(ops_uuid).get();

View File

@@ -162,6 +162,8 @@ private:
seastar::condition_variable _node_ops_abort_cond;
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
future<> _node_ops_abort_thread;
void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list<inet_address> ignore_nodes,
std::function<future<>()> abort_func);
future<> node_ops_update_heartbeat(node_ops_id ops_uuid);
future<> node_ops_done(node_ops_id ops_uuid);
future<> node_ops_abort(node_ops_id ops_uuid);