redis: keyspace_utils: create_keyspace_if_not_exists_impl: call announce twice only

The code would previously `announce` schema mutations once per each keyspace and
once per each table. This can be reduced to two calls of `announce`:
once to create all keyspaces, and once to create all tables.

This should be further reduced to a single `announce` in the future.
Left a FIXME.

Motivation: after migrating to Raft, each `announce` will require a
`read_barrier` to achieve linearizability of schema operations. This
introduces latency, as it requires contacting a leader which then must
contact a quorum. The fewer announce calls, the better. Also, if all
sub-operations are reduced to a single `announce`, we get atomicity -
either all of these sub-operations succeed or none do.
This commit is contained in:
Kamil Braun
2022-01-03 16:43:29 +01:00
parent 188cedd533
commit 63d3449bc3

View File

@@ -146,32 +146,6 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
keyspace_replication_strategy_options["class"] = "SimpleStrategy";
keyspace_replication_strategy_options["replication_factor"] = fmt::format("{}", default_replication_factor);
}
auto keyspace_gen = [&proxy, db, &mm, &config, keyspace_replication_strategy_options = std::move(keyspace_replication_strategy_options)] (sstring name) -> future<> {
auto& mml = mm.local();
auto& proxyl = proxy.local();
if (db.has_keyspace(name)) {
co_return;
}
auto attrs = make_shared<cql3::statements::ks_prop_defs>();
attrs->add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
std::map<sstring, sstring> replication_properties;
for (auto&& option : keyspace_replication_strategy_options) {
replication_properties.emplace(option.first, option.second);
}
attrs->add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs->validate();
const auto& tm = *proxyl.get_token_metadata_ptr();
co_return co_await mml.announce(mml.prepare_new_keyspace_announcement(attrs->as_ks_metadata(name, tm)));
};
auto table_gen = [&proxy, db, &mm] (sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
auto& mml= mm.local();
auto& proxyl = proxy.local();
if (db.has_schema(ks_name, cf_name)) {
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
co_return co_await mml.announce(co_await mml.prepare_new_column_family_announcement(schema));
};
struct table {
const char* name;
@@ -184,9 +158,11 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
table{redis::HASHes, hashes_schema},
table{redis::ZSETs, zsets_schema}};
bool schema_ok = boost::algorithm::all_of(boost::irange<unsigned>(0, config.redis_database_count()) |
boost::adaptors::transformed([] (unsigned i) { return fmt::format("REDIS_{}", i); }),
[&] (auto ks_name) {
auto ks_names = boost::copy_range<std::vector<sstring>>(
boost::irange<unsigned>(0, config.redis_database_count()) |
boost::adaptors::transformed([] (unsigned i) { return fmt::format("REDIS_{}", i); }));
bool schema_ok = boost::algorithm::all_of(ks_names, [&] (auto& ks_name) {
auto check = [&] (table t) {
return db.has_schema(ks_name, t.name);
};
@@ -198,17 +174,58 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
co_return; // if schema is created already do nothing
}
co_await mm.local().schema_read_barrier();
// FIXME: fix this code to `announce` once
auto& mml = mm.local();
auto tm = proxy.local().get_token_metadata_ptr();
co_await mml.schema_read_barrier();
std::vector<mutation> ks_mutations;
for (auto& ks_name: ks_names) {
if (db.has_keyspace(ks_name)) {
continue;
}
cql3::statements::ks_prop_defs attrs;
attrs.add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
std::map<sstring, sstring> replication_properties;
for (auto&& option : keyspace_replication_strategy_options) {
replication_properties.emplace(option.first, option.second);
}
attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs.validate();
auto muts = mml.prepare_new_keyspace_announcement(attrs.as_ks_metadata(ks_name, *tm));
std::move(muts.begin(), muts.end(), std::back_inserter(ks_mutations));
}
if (!ks_mutations.empty()) {
co_await mml.announce(std::move(ks_mutations));
}
std::vector<mutation> table_mutations;
auto table_gen = std::bind_front(
[] (data_dictionary::database db, service::migration_manager& mml, std::vector<mutation>& table_mutations,
sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
if (db.has_schema(ks_name, cf_name)) {
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
auto muts = co_await mml.prepare_new_column_family_announcement(schema);
std::move(muts.begin(), muts.end(), std::back_inserter(table_mutations));
}, db, std::ref(mml), std::ref(table_mutations));
co_await parallel_for_each(ks_names, [table_gen = std::move(table_gen)] (const sstring& ks_name) mutable {
return parallel_for_each(tables, [ks_name, table_gen = std::move(table_gen)] (table t) {
return table_gen(ks_name, t.name, t.schema(ks_name));
}).discard_result();
});
// create default databases for redis.
co_return co_await parallel_for_each(boost::irange<unsigned>(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) {
auto ks_name = fmt::format("REDIS_{}", c);
return keyspace_gen(ks_name).then([ks_name, table_gen] {
return parallel_for_each(tables, [ks_name, table_gen] (table t) {
return table_gen(ks_name, t.name, t.schema(ks_name));
}).discard_result();
});
});
if (!table_mutations.empty()) {
co_await mml.announce(std::move(table_mutations));
}
}
future<> maybe_create_keyspace(seastar::sharded<service::storage_proxy>& proxy, data_dictionary::database db, seastar::sharded<service::migration_manager>& mm, db::config& config, sharded<gms::gossiper>& gossiper) {