tasks, system_keyspace: Introduce get_topology_request_entry_opt()

It's a cleanup. Better to return std::nullopt than faking an entry
with an id when require_entry == false.
This commit is contained in:
Tomasz Grabiec
2025-11-25 16:08:53 +01:00
committed by Aleksandra Martyniuk
parent 902803babd
commit 71e6ef90f4
6 changed files with 22 additions and 19 deletions

View File

@@ -3609,18 +3609,20 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
return entry;
}
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id, bool require_entry) {
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id) {
auto r = co_await get_topology_request_entry_opt(id);
if (!r) {
on_internal_error(slogger, format("no entry for request id {}", id));
}
co_return std::move(*r);
}
future<std::optional<system_keyspace::topology_requests_entry>> system_keyspace::get_topology_request_entry_opt(utils::UUID id) {
auto rs = co_await execute_cql(
format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
if (!rs || rs->empty()) {
if (require_entry) {
on_internal_error(slogger, format("no entry for request id {}", id));
} else {
co_return topology_requests_entry{
.id = utils::null_uuid()
};
}
co_return std::nullopt;
}
const auto& row = rs->one();

View File

@@ -689,7 +689,8 @@ public:
future<service::topology_request_state> get_topology_request_state(utils::UUID id, bool require_entry);
topology_requests_entry topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row);
future<topology_requests_entry> get_topology_request_entry(utils::UUID id, bool require_entry);
future<topology_requests_entry> get_topology_request_entry(utils::UUID id);
future<std::optional<topology_requests_entry>> get_topology_request_entry_opt(utils::UUID id);
future<topology_requests_entries> get_node_ops_request_entries(db_clock::time_point end_time_limit);
public:

View File

@@ -61,11 +61,11 @@ static future<db::system_keyspace::topology_requests_entries> get_entries(db::sy
}
future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) {
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry(id.uuid(), false);
auto started = entry.id;
if (!started) {
auto entry_opt = co_await _ss._sys_ks.local().get_topology_request_entry_opt(id.uuid());
if (!entry_opt) {
co_return std::nullopt;
}
auto& entry = *entry_opt;
co_return tasks::task_status{
.task_id = id,
.type = request_type_to_task_type(entry.request_type),
@@ -84,7 +84,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status(task
.entity = "",
.progress_units = "",
.progress = tasks::task_manager::task::progress{},
.children = started ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{}
.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()))
};
}
@@ -106,8 +106,8 @@ future<std::optional<tasks::virtual_task_hint>> node_ops_virtual_task::contains(
}
}
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry(task_id.uuid(), false);
co_return bool(entry.id) && std::holds_alternative<service::topology_request>(entry.request_type) ? empty_hint : std::nullopt;
auto entry = co_await _ss._sys_ks.local().get_topology_request_entry_opt(task_id.uuid());
co_return entry && std::holds_alternative<service::topology_request>(entry->request_type) ? empty_hint : std::nullopt;
}
future<tasks::is_abortable> node_ops_virtual_task::is_abortable(tasks::virtual_task_hint) const {

View File

@@ -1113,7 +1113,7 @@ private:
// only for a truncate which is still waiting.
if (_topology_state_machine._topology.global_request) {
utils::UUID ongoing_global_request_id = _topology_state_machine._topology.global_request_id.value();
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id, true);
const auto topology_requests_entry = co_await _sys_ks.local().get_topology_request_entry(ongoing_global_request_id);
auto global_request = std::get<service::global_topology_request>(topology_requests_entry.request_type);
if (global_request == global_topology_request::truncate_table) {
std::optional<topology::transition_state>& tstate = _topology_state_machine._topology.tstate;

View File

@@ -5228,7 +5228,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
request_id = _topology_state_machine._topology.global_request_id.value();
} else if (!_topology_state_machine._topology.global_requests_queue.empty()) {
request_id = _topology_state_machine._topology.global_requests_queue[0];
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id, true);
auto req_entry = co_await _sys_ks.local().get_topology_request_entry(request_id);
curr_req = std::get<global_topology_request>(req_entry.request_type);
} else {
request_id = utils::UUID{};

View File

@@ -953,7 +953,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
} else {
assert(_feature_service.topology_global_request_queue);
req_id = _topo_sm._topology.global_requests_queue[0];
req_entry = co_await _sys_ks.get_topology_request_entry(req_id, true);
req_entry = co_await _sys_ks.get_topology_request_entry(req_id);
req = std::get<global_topology_request>(req_entry.request_type);
}
switch (req) {
@@ -2051,7 +2051,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate
// handler performed the truncate and cleared the session, but crashed before finalizing the request
if (_topo_sm._topology.session) {
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true);
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id);
const table_id& table_id = topology_requests_entry.truncate_table_id;
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);