migration_manager: add notification for creating multiple tables

Add prepare_new_column_families_announcement for preparing multiple new
tables that are created in a single operation.

A listener can receive a notification when multiple tables are created.
This is useful if the listener needs to have all the new tables, and not
work on each new table independently. For example, if there are
dependencies between the new tables.
This commit is contained in:
Michael Litvak
2025-03-19 12:47:19 +02:00
parent 064ac25ff9
commit 05ffcefd50
3 changed files with 49 additions and 12 deletions

View File

@@ -81,6 +81,7 @@ public:
// and its column families together. Therefore, listeners can't load the keyspace from the
// database. Instead, they should use the `ksm` parameter if needed.
virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, std::vector<mutation>& mutations, api::timestamp_type timestamp);
virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_drop_column_family(const schema&, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>&, api::timestamp_type) {}
@@ -147,6 +148,7 @@ public:
future<> drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
void before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector<mutation>&, api::timestamp_type);
void before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>&, std::vector<mutation>&, api::timestamp_type);
void before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector<mutation>&, api::timestamp_type);
void before_drop_column_family(const schema&, std::vector<mutation>&, api::timestamp_type);
void before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>&, api::timestamp_type);

View File

@@ -588,6 +588,14 @@ void migration_notifier::before_create_column_family(const keyspace_metadata& ks
});
}
void migration_notifier::before_create_column_families(const keyspace_metadata& ksm,
const std::vector<schema_ptr>& schemas, std::vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_before_create_column_families(ksm, schemas, mutations, timestamp);
});
}
void migration_notifier::before_update_column_family(const schema& new_schema,
const schema& old_schema, std::vector<mutation>& mutations, api::timestamp_type ts) {
_listeners.thread_for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) {
@@ -638,27 +646,40 @@ static future<std::vector<mutation>> include_keyspace(
co_return std::move(mutations);
}
static future<std::vector<mutation>> do_prepare_new_column_family_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
static future<std::vector<mutation>> do_prepare_new_column_families_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
auto& db = sp.local_db();
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
for (auto cfm : cfms) {
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
}
mlogger.info("Create new ColumnFamily: {}", cfm);
for (auto cfm : cfms) {
mlogger.info("Create new ColumnFamily: {}", cfm);
}
return seastar::async([&db, &ksm, cfm, timestamp] {
auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp);
db.get_notifier().before_create_column_family(ksm, *cfm, mutations, timestamp);
return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] {
std::vector<mutation> mutations;
for (schema_ptr cfm : cfms) {
auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp);
mutations.insert(mutations.end(), std::make_move_iterator(table_muts.begin()), std::make_move_iterator(table_muts.end()));
}
db.get_notifier().before_create_column_families(ksm, cfms, mutations, timestamp);
return mutations;
}).then([&sp, &ksm](std::vector<mutation> mutations) {
return include_keyspace(sp, ksm, std::move(mutations));
});
}
static future<std::vector<mutation>> do_prepare_new_column_family_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
return do_prepare_new_column_families_announcement(sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
}
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
return validate(cfm).then([&sp, cfm, timestamp] {
try {
@@ -673,10 +694,15 @@ future<std::vector<mutation>> prepare_new_column_family_announcement(storage_pro
future<> prepare_new_column_family_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
return prepare_new_column_families_announcement(mutations, sp, ksm, std::vector<schema_ptr>{std::move(cfm)}, timestamp);
}
future<> prepare_new_column_families_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp) {
auto& db = sp.local_db();
// If the keyspace exists, ensure that we use the current metadata.
const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm;
auto new_mutations = co_await do_prepare_new_column_family_announcement(sp, current_ksm, cfm, timestamp);
auto new_mutations = co_await do_prepare_new_column_families_announcement(sp, current_ksm, cfms, timestamp);
std::move(new_mutations.begin(), new_mutations.end(), std::back_inserter(mutations));
}
@@ -1182,4 +1208,10 @@ void migration_manager::set_concurrent_ddl_retries(size_t n) {
_concurrent_ddl_retries = n;
}
void migration_listener::on_before_create_column_families(const keyspace_metadata& ksm, const std::vector<schema_ptr>& cfms, std::vector<mutation>& mutations, api::timestamp_type timestamp) {
for (auto cfm : cfms) {
on_before_create_column_family(ksm, *cfm, mutations, timestamp);
}
}
}

View File

@@ -219,6 +219,9 @@ future<std::vector<mutation>> prepare_new_column_family_announcement(storage_pro
// This function allows announcing a new keyspace together with its tables at once.
future<> prepare_new_column_family_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp);
// Announce multiple tables in one operation
future<> prepare_new_column_families_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, std::vector<schema_ptr> cfms, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts);