table_helper: retry in setup_keyspace on concurrent operation

Currently, table_helper::setup_keyspace is used only for starting
the system_traces keyspace. We need to handle concurrent group 0
operations possible during concurrent bootstrap in the Raft-based
topology.
This commit is contained in:
Patryk Jędrzejczak
2023-10-02 11:42:16 +02:00
parent e2894a081a
commit e10036babe

View File

@@ -147,22 +147,23 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
if (!db.has_keyspace(keyspace_name)) {
while (!db.has_keyspace(keyspace_name)) {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
if (!db.has_keyspace(keyspace_name)) {
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), format("table_helper: create {} keyspace", keyspace_name));
try {
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), format("table_helper: create {} keyspace", keyspace_name));
} catch (service::group0_concurrent_modification&) {
tlogger.info("Concurrent operation is detected while creating {} keyspace, retrying.", keyspace_name);
}
}
}
qs.get_client_state().set_keyspace(db.real_database(), keyspace_name);
if (std::all_of(tables.begin(), tables.end(), [db] (table_helper* t) { return db.has_schema(t->_keyspace, t->_name); })) {
co_return;
}
while (std::any_of(tables.begin(), tables.end(), [db] (table_helper* t) { return !db.has_schema(t->_keyspace, t->_name); })) {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> table_mutations;
@@ -174,8 +175,15 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
}
});
if (!table_mutations.empty()) {
co_await mm.announce(std::move(table_mutations), std::move(group0_guard),
format("table_helper: create tables for {} keyspace", keyspace_name));
if (table_mutations.empty()) {
co_return;
}
try {
co_return co_await mm.announce(std::move(table_mutations), std::move(group0_guard),
format("table_helper: create tables for {} keyspace", keyspace_name));
} catch (service::group0_concurrent_modification&) {
tlogger.info("Concurrent operation is detected while creating tables for {} keyspace, retrying.", keyspace_name);
}
}
}