mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-22 07:42:16 +00:00
db/system_keyspace: replace make_remove_view_building_task_mutation() with mutation builder
Again, get rid of system keyspace method in favor of mutation builder, because `system.view_building_tasks` is a single parition table.
This commit is contained in:
@@ -9,6 +9,7 @@
|
|||||||
|
|
||||||
#include "db/schema_tables.hh"
|
#include "db/schema_tables.hh"
|
||||||
|
|
||||||
|
#include "db/view/view_building_task_mutation_builder.hh"
|
||||||
#include "service/migration_manager.hh"
|
#include "service/migration_manager.hh"
|
||||||
#include "service/storage_proxy.hh"
|
#include "service/storage_proxy.hh"
|
||||||
#include "gms/feature_service.hh"
|
#include "gms/feature_service.hh"
|
||||||
@@ -1799,7 +1800,8 @@ static void make_update_indices_mutations(
|
|||||||
utils::chunked_vector<mutation>& mutations)
|
utils::chunked_vector<mutation>& mutations)
|
||||||
{
|
{
|
||||||
mutation indices_mutation(indexes(), partition_key::from_singular(*indexes(), old_table->ks_name()));
|
mutation indices_mutation(indexes(), partition_key::from_singular(*indexes(), old_table->ks_name()));
|
||||||
std::vector<mutation> view_building_muts;
|
view::view_building_task_mutation_builder vb_mut_builder(timestamp);
|
||||||
|
std::vector<mutation> view_status_muts;
|
||||||
|
|
||||||
auto diff = difference(old_table->all_indices(), new_table->all_indices());
|
auto diff = difference(old_table->all_indices(), new_table->all_indices());
|
||||||
auto& db = sp.local_db();
|
auto& db = sp.local_db();
|
||||||
@@ -1834,8 +1836,7 @@ static void make_update_indices_mutations(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
|
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
|
||||||
auto mut = sys_ks.make_remove_view_building_task_mutation(timestamp, id).get();
|
vb_mut_builder.del_task(id);
|
||||||
view_building_muts.push_back(std::move(mut));
|
|
||||||
slogger.trace("Aborting view building task with ID: {} because the index is being dropped", id);
|
slogger.trace("Aborting view building task with ID: {} because the index is being dropped", id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1843,7 +1844,7 @@ static void make_update_indices_mutations(
|
|||||||
|
|
||||||
// Remove entries from `system.view_build_status_v2`
|
// Remove entries from `system.view_build_status_v2`
|
||||||
auto build_status_mut = sys_ks.make_remove_view_build_status_mutation(timestamp, {view->ks_name(), view->cf_name()}).get();
|
auto build_status_mut = sys_ks.make_remove_view_build_status_mutation(timestamp, {view->ks_name(), view->cf_name()}).get();
|
||||||
view_building_muts.push_back(std::move(build_status_mut));
|
view_status_muts.push_back(std::move(build_status_mut));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1862,7 +1863,8 @@ static void make_update_indices_mutations(
|
|||||||
}
|
}
|
||||||
|
|
||||||
mutations.emplace_back(std::move(indices_mutation));
|
mutations.emplace_back(std::move(indices_mutation));
|
||||||
mutations.insert(mutations.end(), std::make_move_iterator(view_building_muts.begin()), std::make_move_iterator(view_building_muts.end()));
|
mutations.emplace_back(vb_mut_builder.build());
|
||||||
|
mutations.insert(mutations.end(), std::make_move_iterator(view_status_muts.begin()), std::make_move_iterator(view_status_muts.end()));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void add_drop_column_to_mutations(schema_ptr table, const sstring& name, const schema::dropped_column& dc, api::timestamp_type timestamp, utils::chunked_vector<mutation>& mutations) {
|
static void add_drop_column_to_mutations(schema_ptr table, const sstring& name, const schema::dropped_column& dc, api::timestamp_type timestamp, utils::chunked_vector<mutation>& mutations) {
|
||||||
|
|||||||
@@ -2819,16 +2819,6 @@ future<std::pair<db::view::building_tasks, std::optional<utils::UUID>>> system_k
|
|||||||
co_return std::pair{std::move(tasks), std::move(min_task_id)};
|
co_return std::pair{std::move(tasks), std::move(min_task_id)};
|
||||||
}
|
}
|
||||||
|
|
||||||
future<mutation> system_keyspace::make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id) {
|
|
||||||
static const sstring stmt = format("DELETE FROM {}.{} WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
|
|
||||||
|
|
||||||
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {id});
|
|
||||||
if (muts.size() != 1) {
|
|
||||||
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
|
|
||||||
}
|
|
||||||
co_return std::move(muts[0]);
|
|
||||||
}
|
|
||||||
|
|
||||||
static constexpr auto VIEW_BUILDING_PROCESSING_BASE_ID_KEY = "view_building_processing_base_id";
|
static constexpr auto VIEW_BUILDING_PROCESSING_BASE_ID_KEY = "view_building_processing_base_id";
|
||||||
|
|
||||||
future<std::optional<table_id>> system_keyspace::get_view_building_processing_base_id() {
|
future<std::optional<table_id>> system_keyspace::get_view_building_processing_base_id() {
|
||||||
|
|||||||
@@ -573,7 +573,6 @@ public:
|
|||||||
|
|
||||||
// system.view_building_tasks
|
// system.view_building_tasks
|
||||||
future<std::pair<db::view::building_tasks, std::optional<utils::UUID>>> get_view_building_tasks();
|
future<std::pair<db::view::building_tasks, std::optional<utils::UUID>>> get_view_building_tasks();
|
||||||
future<mutation> make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id);
|
|
||||||
|
|
||||||
// system.scylla_local, view_building_processing_base key
|
// system.scylla_local, view_building_processing_base key
|
||||||
future<std::optional<table_id>> get_view_building_processing_base_id();
|
future<std::optional<table_id>> get_view_building_processing_base_id();
|
||||||
|
|||||||
@@ -674,13 +674,13 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
|
|||||||
using namespace db::view;
|
using namespace db::view;
|
||||||
mlogger.info("Cleaning view building state for all views in keyspace {} ", ks_meta->name());
|
mlogger.info("Cleaning view building state for all views in keyspace {} ", ks_meta->name());
|
||||||
|
|
||||||
|
db::view::view_building_task_mutation_builder builder(ts);
|
||||||
auto& sys_ks = sp.system_keyspace();
|
auto& sys_ks = sp.system_keyspace();
|
||||||
auto& vb_state_machine = sp.view_building_state_machine();
|
auto& vb_state_machine = sp.view_building_state_machine();
|
||||||
|
|
||||||
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) -> future<> {
|
auto drop_all_tasks_in_task_map = [&] (const task_map& task_map) {
|
||||||
for (auto& [id, _]: task_map) {
|
for (auto& [id, _]: task_map) {
|
||||||
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
|
builder.del_task(id);
|
||||||
out.push_back(std::move(mut));
|
|
||||||
mlogger.trace("Aborting view building task with ID: {} because the keyspace is being dropped", id);
|
mlogger.trace("Aborting view building task with ID: {} because the keyspace is being dropped", id);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -694,9 +694,9 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
|
|||||||
|
|
||||||
for (auto [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(tid)) {
|
for (auto [_, replica_tasks]: vb_state_machine.building_state.tasks_state.at(tid)) {
|
||||||
for (auto& [_, views_tasks]: replica_tasks.view_tasks) {
|
for (auto& [_, views_tasks]: replica_tasks.view_tasks) {
|
||||||
co_await drop_all_tasks_in_task_map(views_tasks);
|
drop_all_tasks_in_task_map(views_tasks);
|
||||||
}
|
}
|
||||||
co_await drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
|
drop_all_tasks_in_task_map(replica_tasks.staging_tasks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -705,6 +705,7 @@ static future<> add_cleanup_view_building_state_drop_keyspace_mutations(storage_
|
|||||||
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
||||||
out.push_back(std::move(build_status_mut));
|
out.push_back(std::move(build_status_mut));
|
||||||
}
|
}
|
||||||
|
out.emplace_back(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
future<utils::chunked_vector<mutation>> prepare_keyspace_drop_announcement(storage_proxy& sp, const sstring& ks_name, api::timestamp_type ts) {
|
future<utils::chunked_vector<mutation>> prepare_keyspace_drop_announcement(storage_proxy& sp, const sstring& ks_name, api::timestamp_type ts) {
|
||||||
@@ -871,6 +872,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
|
|||||||
using namespace db::view;
|
using namespace db::view;
|
||||||
mlogger.info("Cleaning view building state for view {} ({}.{})", view->id(), view->ks_name(), view->cf_name());
|
mlogger.info("Cleaning view building state for view {} ({}.{})", view->id(), view->ks_name(), view->cf_name());
|
||||||
|
|
||||||
|
db::view::view_building_task_mutation_builder builder(ts);
|
||||||
auto& sys_ks = sp.system_keyspace();
|
auto& sys_ks = sp.system_keyspace();
|
||||||
auto& vb_state_machine = sp.view_building_state_machine();
|
auto& vb_state_machine = sp.view_building_state_machine();
|
||||||
|
|
||||||
@@ -884,8 +886,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
|
|||||||
|
|
||||||
// Abort all view building tasks for this view
|
// Abort all view building tasks for this view
|
||||||
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
|
for (auto& [id, _]: replica_tasks.view_tasks.at(view->id())) {
|
||||||
auto mut = co_await sys_ks.make_remove_view_building_task_mutation(ts, id);
|
builder.del_task(id);
|
||||||
out.push_back(std::move(mut));
|
|
||||||
mlogger.trace("Aborting view building task with ID: {} because the view is being dropped", id);
|
mlogger.trace("Aborting view building task with ID: {} because the view is being dropped", id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -894,6 +895,7 @@ static future<> add_cleanup_view_building_state_drop_view_mutations(storage_prox
|
|||||||
// Remove entries from `system.view_build_status_v2`
|
// Remove entries from `system.view_build_status_v2`
|
||||||
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
auto build_status_mut = co_await sys_ks.make_remove_view_build_status_mutation(ts, {view->ks_name(), view->cf_name()});
|
||||||
out.push_back(std::move(build_status_mut));
|
out.push_back(std::move(build_status_mut));
|
||||||
|
out.emplace_back(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
future<utils::chunked_vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
|
future<utils::chunked_vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
|
||||||
|
|||||||
Reference in New Issue
Block a user