After previous commits, we can drop entire task's state and replace it with single boolean flag, which determines if a task was aborted. Once a task was aborted, it cannot get resurrected to a normal state.
133 lines
4.5 KiB
C++
133 lines
4.5 KiB
C++
/*
|
|
* Copyright (C) 2025-present ScyllaDB
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "db/view/view_building_state.hh"
|
|
|
|
namespace db {
|
|
|
|
namespace view {
|
|
|
|
view_building_task::view_building_task(utils::UUID id, task_type type, bool aborted, table_id base_id, std::optional<table_id> view_id, locator::tablet_replica replica, dht::token last_token)
|
|
: id(id)
|
|
, type(type)
|
|
, aborted(aborted)
|
|
, base_id(base_id)
|
|
, view_id(view_id)
|
|
, replica(replica)
|
|
, last_token(last_token) {}
|
|
|
|
view_building_state::view_building_state(building_tasks tasks_state, std::optional<table_id> processed_base_table)
|
|
: tasks_state(std::move(tasks_state))
|
|
, currently_processed_base_table(std::move(processed_base_table)) {}
|
|
|
|
views_state::views_state(std::map<table_id, std::vector<table_id>> views_per_base, view_build_status_map status_map)
|
|
: views_per_base(std::move(views_per_base))
|
|
, status_map(std::move(status_map)) {}
|
|
|
|
view_building_task::task_type task_type_from_string(std::string_view str) {
|
|
if (str == "BUILD_RANGE") {
|
|
return view_building_task::task_type::build_range;
|
|
}
|
|
if (str == "PROCESS_STAGING") {
|
|
return view_building_task::task_type::process_staging;
|
|
}
|
|
throw std::runtime_error(fmt::format("Unknown view building task type: {}", str));
|
|
}
|
|
|
|
seastar::sstring task_type_to_sstring(view_building_task::task_type type) {
|
|
switch (type) {
|
|
case view_building_task::task_type::build_range:
|
|
return "BUILD_RANGE";
|
|
case view_building_task::task_type::process_staging:
|
|
return "PROCESS_STAGING";
|
|
}
|
|
}
|
|
|
|
std::optional<std::reference_wrapper<const view_building_task>> view_building_state::get_task(table_id base_id, locator::tablet_replica replica, utils::UUID id) const {
|
|
if (!tasks_state.contains(base_id) || !tasks_state.at(base_id).contains(replica)) {
|
|
return {};
|
|
}
|
|
|
|
for (const auto& [_, view_tasks]: tasks_state.at(base_id).at(replica).view_tasks) {
|
|
if (view_tasks.contains(id)) {
|
|
return view_tasks.at(id);
|
|
}
|
|
}
|
|
auto& staging_tasks = tasks_state.at(base_id).at(replica).staging_tasks;
|
|
if (staging_tasks.contains(id)) {
|
|
return staging_tasks.at(id);
|
|
}
|
|
return {};
|
|
}
|
|
|
|
std::vector<std::reference_wrapper<const view_building_task>> view_building_state::get_tasks_for_host(table_id base_id, locator::host_id host) const {
|
|
if (!tasks_state.contains(base_id)) {
|
|
return {};
|
|
}
|
|
|
|
std::vector<std::reference_wrapper<const view_building_task>> host_tasks;
|
|
for (auto& [replica, replica_tasks]: tasks_state.at(base_id)) {
|
|
if (replica.host != host) {
|
|
continue;
|
|
}
|
|
|
|
for (auto& [_, view_tasks]: replica_tasks.view_tasks) {
|
|
for (auto& [_, task]: view_tasks) {
|
|
host_tasks.emplace_back(task);
|
|
}
|
|
}
|
|
for (auto& [_, task]: replica_tasks.staging_tasks) {
|
|
host_tasks.emplace_back(task);
|
|
}
|
|
}
|
|
return host_tasks;
|
|
}
|
|
|
|
std::map<dht::token, std::vector<view_building_task>> view_building_state::collect_tasks_by_last_token(table_id base_table_id) const {
|
|
if (!tasks_state.contains(base_table_id)) {
|
|
return {};
|
|
}
|
|
|
|
std::map<dht::token, std::vector<view_building_task>> map;
|
|
auto merge_maps = [&] (std::map<dht::token, std::vector<view_building_task>>&& other) {
|
|
for (auto&& [token, tasks]: std::move(other)) {
|
|
auto& tasks_vec = map[token];
|
|
tasks_vec.insert(tasks_vec.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
|
|
}
|
|
};
|
|
|
|
for (auto& [replica, _]: tasks_state.at(base_table_id)) {
|
|
merge_maps(collect_tasks_by_last_token(base_table_id, replica));
|
|
}
|
|
|
|
return map;
|
|
}
|
|
|
|
std::map<dht::token, std::vector<view_building_task>> view_building_state::collect_tasks_by_last_token(table_id base_table_id, const locator::tablet_replica& replica) const {
|
|
if (!tasks_state.contains(base_table_id) || !tasks_state.at(base_table_id).contains(replica)) {
|
|
return {};
|
|
}
|
|
|
|
std::map<dht::token, std::vector<view_building_task>> tasks;
|
|
auto& replica_tasks = tasks_state.at(base_table_id).at(replica);
|
|
for (auto& [_, view_tasks]: replica_tasks.view_tasks) {
|
|
for (auto& [_, task]: view_tasks) {
|
|
tasks[task.last_token].push_back(task);
|
|
}
|
|
}
|
|
for (auto& [_, task]: replica_tasks.staging_tasks) {
|
|
tasks[task.last_token].push_back(task);
|
|
}
|
|
return tasks;
|
|
}
|
|
|
|
}
|
|
|
|
}
|