From 05ffcefd503d084ffcb704e487a8312a0b4e2f5d Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 19 Mar 2025 12:47:19 +0200 Subject: [PATCH] migration_manager: add notification for creating multiple tables Add prepare_new_column_families_announcement for preparing multiple new tables that are created in a single operation. A listener can receive a notification when multiple tables are created. This is useful if the listener needs to have all the new tables, and not work on each new table independently. For example, if there are dependencies between the new tables. --- service/migration_listener.hh | 2 ++ service/migration_manager.cc | 56 +++++++++++++++++++++++++++-------- service/migration_manager.hh | 3 ++ 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/service/migration_listener.hh b/service/migration_listener.hh index e381e8f17a..d13ca03cd5 100644 --- a/service/migration_listener.hh +++ b/service/migration_listener.hh @@ -81,6 +81,7 @@ public: // and its column families together. Therefore, listeners can't load the keyspace from the // database. Instead, they should use the `ksm` parameter if needed. virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector&, api::timestamp_type) {} + virtual void on_before_create_column_families(const keyspace_metadata& ksm, const std::vector& cfms, std::vector& mutations, api::timestamp_type timestamp); virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector&, api::timestamp_type) {} virtual void on_before_drop_column_family(const schema&, std::vector&, api::timestamp_type) {} virtual void on_before_drop_keyspace(const sstring& keyspace_name, std::vector&, api::timestamp_type) {} @@ -147,6 +148,7 @@ public: future<> drop_aggregate(const db::functions::function_name& fun_name, const std::vector& arg_types); void before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector&, api::timestamp_type); + void before_create_column_families(const keyspace_metadata& ksm, const std::vector&, std::vector&, api::timestamp_type); void before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector&, api::timestamp_type); void before_drop_column_family(const schema&, std::vector&, api::timestamp_type); void before_drop_keyspace(const sstring& keyspace_name, std::vector&, api::timestamp_type); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 65228ff58d..1c0cec77c7 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -588,6 +588,14 @@ void migration_notifier::before_create_column_family(const keyspace_metadata& ks }); } +void migration_notifier::before_create_column_families(const keyspace_metadata& ksm, + const std::vector& schemas, std::vector& mutations, api::timestamp_type timestamp) { + _listeners.thread_for_each([&ksm, &schemas, &mutations, timestamp] (migration_listener* listener) { + // allow exceptions. so a listener can effectively kill a create-table + listener->on_before_create_column_families(ksm, schemas, mutations, timestamp); + }); +} + void migration_notifier::before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector& mutations, api::timestamp_type ts) { _listeners.thread_for_each([&mutations, &new_schema, &old_schema, ts] (migration_listener* listener) { @@ -638,27 +646,40 @@ static future> include_keyspace( co_return std::move(mutations); } -static future> do_prepare_new_column_family_announcement(storage_proxy& sp, - const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) { +static future> do_prepare_new_column_families_announcement(storage_proxy& sp, + const keyspace_metadata& ksm, std::vector cfms, api::timestamp_type timestamp) { auto& db = sp.local_db(); - if (db.has_schema(cfm->ks_name(), cfm->cf_name())) { - throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name()); - } - if (db.column_family_exists(cfm->id())) { - throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id()))); + for (auto cfm : cfms) { + if (db.has_schema(cfm->ks_name(), cfm->cf_name())) { + throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name()); + } + if (db.column_family_exists(cfm->id())) { + throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id()))); + } } - mlogger.info("Create new ColumnFamily: {}", cfm); + for (auto cfm : cfms) { + mlogger.info("Create new ColumnFamily: {}", cfm); + } - return seastar::async([&db, &ksm, cfm, timestamp] { - auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp); - db.get_notifier().before_create_column_family(ksm, *cfm, mutations, timestamp); + return seastar::async([&db, &ksm, timestamp, cfms = std::move(cfms)] { + std::vector mutations; + for (schema_ptr cfm : cfms) { + auto table_muts = db::schema_tables::make_create_table_mutations(cfm, timestamp); + mutations.insert(mutations.end(), std::make_move_iterator(table_muts.begin()), std::make_move_iterator(table_muts.end())); + } + db.get_notifier().before_create_column_families(ksm, cfms, mutations, timestamp); return mutations; }).then([&sp, &ksm](std::vector mutations) { return include_keyspace(sp, ksm, std::move(mutations)); }); } +static future> do_prepare_new_column_family_announcement(storage_proxy& sp, + const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) { + return do_prepare_new_column_families_announcement(sp, ksm, std::vector{std::move(cfm)}, timestamp); +} + future> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) { return validate(cfm).then([&sp, cfm, timestamp] { try { @@ -673,10 +694,15 @@ future> prepare_new_column_family_announcement(storage_pro future<> prepare_new_column_family_announcement(std::vector& mutations, storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) { + return prepare_new_column_families_announcement(mutations, sp, ksm, std::vector{std::move(cfm)}, timestamp); +} + +future<> prepare_new_column_families_announcement(std::vector& mutations, + storage_proxy& sp, const keyspace_metadata& ksm, std::vector cfms, api::timestamp_type timestamp) { auto& db = sp.local_db(); // If the keyspace exists, ensure that we use the current metadata. const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm; - auto new_mutations = co_await do_prepare_new_column_family_announcement(sp, current_ksm, cfm, timestamp); + auto new_mutations = co_await do_prepare_new_column_families_announcement(sp, current_ksm, cfms, timestamp); std::move(new_mutations.begin(), new_mutations.end(), std::back_inserter(mutations)); } @@ -1182,4 +1208,10 @@ void migration_manager::set_concurrent_ddl_retries(size_t n) { _concurrent_ddl_retries = n; } +void migration_listener::on_before_create_column_families(const keyspace_metadata& ksm, const std::vector& cfms, std::vector& mutations, api::timestamp_type timestamp) { + for (auto cfm : cfms) { + on_before_create_column_family(ksm, *cfm, mutations, timestamp); + } +} + } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 1eb961fd3e..6983d761bf 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -219,6 +219,9 @@ future> prepare_new_column_family_announcement(storage_pro // This function allows announcing a new keyspace together with its tables at once. future<> prepare_new_column_family_announcement(std::vector& mutations, storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp); +// Announce multiple tables in one operation +future<> prepare_new_column_families_announcement(std::vector& mutations, + storage_proxy& sp, const keyspace_metadata& ksm, std::vector cfms, api::timestamp_type timestamp); future> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts);