diff --git a/redis/keyspace_utils.cc b/redis/keyspace_utils.cc index 57103b3eef..b371d7cdb9 100644 --- a/redis/keyspace_utils.cc +++ b/redis/keyspace_utils.cc @@ -146,32 +146,6 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded future<> { - auto& mml = mm.local(); - auto& proxyl = proxy.local(); - if (db.has_keyspace(name)) { - co_return; - } - auto attrs = make_shared(); - attrs->add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true"); - std::map replication_properties; - for (auto&& option : keyspace_replication_strategy_options) { - replication_properties.emplace(option.first, option.second); - } - attrs->add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties); - attrs->validate(); - const auto& tm = *proxyl.get_token_metadata_ptr(); - co_return co_await mml.announce(mml.prepare_new_keyspace_announcement(attrs->as_ks_metadata(name, tm))); - }; - auto table_gen = [&proxy, db, &mm] (sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> { - auto& mml= mm.local(); - auto& proxyl = proxy.local(); - if (db.has_schema(ks_name, cf_name)) { - co_return; - } - logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name); - co_return co_await mml.announce(co_await mml.prepare_new_column_family_announcement(schema)); - }; struct table { const char* name; @@ -184,9 +158,11 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded(0, config.redis_database_count()) | - boost::adaptors::transformed([] (unsigned i) { return fmt::format("REDIS_{}", i); }), - [&] (auto ks_name) { + auto ks_names = boost::copy_range>( + boost::irange(0, config.redis_database_count()) | + boost::adaptors::transformed([] (unsigned i) { return fmt::format("REDIS_{}", i); })); + + bool schema_ok = boost::algorithm::all_of(ks_names, [&] (auto& ks_name) { auto check = [&] (table t) { return db.has_schema(ks_name, t.name); }; @@ -198,17 +174,58 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded ks_mutations; + for (auto& ks_name: ks_names) { + if (db.has_keyspace(ks_name)) { + continue; + } + + cql3::statements::ks_prop_defs attrs; + attrs.add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true"); + std::map replication_properties; + for (auto&& option : keyspace_replication_strategy_options) { + replication_properties.emplace(option.first, option.second); + } + attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties); + attrs.validate(); + + auto muts = mml.prepare_new_keyspace_announcement(attrs.as_ks_metadata(ks_name, *tm)); + std::move(muts.begin(), muts.end(), std::back_inserter(ks_mutations)); + } + + if (!ks_mutations.empty()) { + co_await mml.announce(std::move(ks_mutations)); + } + + std::vector table_mutations; + auto table_gen = std::bind_front( + [] (data_dictionary::database db, service::migration_manager& mml, std::vector& table_mutations, + sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> { + if (db.has_schema(ks_name, cf_name)) { + co_return; + } + logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name); + auto muts = co_await mml.prepare_new_column_family_announcement(schema); + std::move(muts.begin(), muts.end(), std::back_inserter(table_mutations)); + }, db, std::ref(mml), std::ref(table_mutations)); + + co_await parallel_for_each(ks_names, [table_gen = std::move(table_gen)] (const sstring& ks_name) mutable { + return parallel_for_each(tables, [ks_name, table_gen = std::move(table_gen)] (table t) { + return table_gen(ks_name, t.name, t.schema(ks_name)); + }).discard_result(); + }); // create default databases for redis. - co_return co_await parallel_for_each(boost::irange(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) { - auto ks_name = fmt::format("REDIS_{}", c); - return keyspace_gen(ks_name).then([ks_name, table_gen] { - return parallel_for_each(tables, [ks_name, table_gen] (table t) { - return table_gen(ks_name, t.name, t.schema(ks_name)); - }).discard_result(); - }); - }); + if (!table_mutations.empty()) { + co_await mml.announce(std::move(table_mutations)); + } } future<> maybe_create_keyspace(seastar::sharded& proxy, data_dictionary::database db, seastar::sharded& mm, db::config& config, sharded& gossiper) {