db/system_keyspace: replace make_remove_view_building_task_mutation() with mutation builder

Again, get rid of system keyspace method in favor of mutation builder,
because `system.view_building_tasks` is a single parition table.
This commit is contained in:
Michał Jadwiszczak
2025-10-14 20:16:13 +02:00
parent 2561cc1546
commit 1a32ccd8f6
4 changed files with 16 additions and 23 deletions

View File

@@ -9,6 +9,7 @@
#include "db/schema_tables.hh"
#include "db/view/view_building_task_mutation_builder.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "gms/feature_service.hh"
@@ -1799,7 +1800,8 @@ static void make_update_indices_mutations(
utils::chunked_vector<mutation>& mutations)
{
mutation indices_mutation(indexes(), partition_key::from_singular(*indexes(), old_table->ks_name()));
std::vector<mutation> view_building_muts;
view::view_building_task_mutation_builder vb_mut_builder(timestamp);
std::vector<mutation> view_status_muts;
auto diff = difference(old_table->all_indices(), new_table->all_indices());
auto& db = sp.local_db();
@@ -1834,8 +1836,7 @@ static void make_update_indices_mutations(
}
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
auto mut = sys_ks.make_remove_view_building_task_mutation(timestamp, id).get();
view_building_muts.push_back(std::move(mut));
vb_mut_builder.del_task(id);
slogger.trace("Aborting view building task with ID: {} because the index is being dropped", id);
}
}
@@ -1843,7 +1844,7 @@ static void make_update_indices_mutations(
// Remove entries from `system.view_build_status_v2`
auto build_status_mut = sys_ks.make_remove_view_build_status_mutation(timestamp, {view->ks_name(), view->cf_name()}).get();
view_building_muts.push_back(std::move(build_status_mut));
view_status_muts.push_back(std::move(build_status_mut));
}
}
@@ -1862,7 +1863,8 @@ static void make_update_indices_mutations(
}
mutations.emplace_back(std::move(indices_mutation));
mutations.insert(mutations.end(), std::make_move_iterator(view_building_muts.begin()), std::make_move_iterator(view_building_muts.end()));
mutations.emplace_back(vb_mut_builder.build());
mutations.insert(mutations.end(), std::make_move_iterator(view_status_muts.begin()), std::make_move_iterator(view_status_muts.end()));
}
static void add_drop_column_to_mutations(schema_ptr table, const sstring& name, const schema::dropped_column& dc, api::timestamp_type timestamp, utils::chunked_vector<mutation>& mutations) {

View File

@@ -2819,16 +2819,6 @@ future<std::pair<db::view::building_tasks, std::optional<utils::UUID>>> system_k
co_return std::pair{std::move(tasks), std::move(min_task_id)};
}
future<mutation> system_keyspace::make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id) {
static const sstring stmt = format("DELETE FROM {}.{} WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {id});
if (muts.size() != 1) {
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
}
co_return std::move(muts[0]);
}
static constexpr auto VIEW_BUILDING_PROCESSING_BASE_ID_KEY = "view_building_processing_base_id";
future<std::optional<table_id>> system_keyspace::get_view_building_processing_base_id() {

View File

@@ -573,7 +573,6 @@ public:
// system.view_building_tasks
future<std::pair<db::view::building_tasks, std::optional<utils::UUID>>> get_view_building_tasks();
future<mutation> make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id);
// system.scylla_local, view_building_processing_base key
future<std::optional<table_id>> get_view_building_processing_base_id();

View File

@@ -674,13 +674,13 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
using namespace db::view;
mlogger.info("Cleaning view building state for all views in keyspace {} ", ks_meta->name());
db::view::view_building_task_mutation_builder builder(ts);
auto& sys_ks = sp.system_keyspace();
auto& vb_state_machine = sp.view_building_state_machine();
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) -> future<> {
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) {
for (auto& [id, _]: task_map) {
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
out.push_back(std::move(mut));
builder.del_task(id);
mlogger.trace("Aborting view building task with ID: {} because the keyspace is being dropped", id);
}
};
@@ -694,9 +694,9 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
for (auto [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(tid)) {
for (auto& [_, views_tasks]: replica_tasks.view_tasks) {
co_await drop_all_tasks_in_task_map(views_tasks);
drop_all_tasks_in_task_map(views_tasks);
}
co_await drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
}
}
@@ -705,6 +705,7 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
out.push_back(std::move(build_status_mut));
}
out.emplace_back(builder.build());
}
future<utils::chunked_vector<mutation>> prepare_keyspace_drop_announcement(storage_proxy& sp, const sstring& ks_name, api::timestamp_type ts) {
@@ -871,6 +872,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
using namespace db::view;
mlogger.info("Cleaning view building state for view {} ({}.{})", view->id(), view->ks_name(), view->cf_name());
db::view::view_building_task_mutation_builder builder(ts);
auto& sys_ks = sp.system_keyspace();
auto& vb_state_machine = sp.view_building_state_machine();
@@ -884,8 +886,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
// Abort all view building tasks for this view
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
out.push_back(std::move(mut));
builder.del_task(id);
mlogger.trace("Aborting view building task with ID: {} because the view is being dropped", id);
}
}
@@ -894,6 +895,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
// Remove entries from `system.view_build_status_v2`
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
out.push_back(std::move(build_status_mut));
out.emplace_back(builder.build());
}
future<utils::chunked_vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {