db: schema_tables: coroutinize do_merge_schema()

It is now using an internal thread, so unpeel is and replace
future::get() with co_await.
This commit is contained in:
Avi Kivity
2021-08-01 16:36:26 +03:00
parent 9680d9e76c
commit 78fc05922b

View File

@@ -1061,7 +1061,7 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable {
{
slogger.trace("do_merge_schema: {}", mutations);
schema_ptr s = keyspaces();
// compare before/after schemas of the affected keyspaces only
@@ -1076,55 +1076,56 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
}
// current state of the schema
auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0();
auto&& old_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0();
auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
auto&& old_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto&& old_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& old_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
auto&& old_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
auto old_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
#if 0 // not in 2.1.8
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
#endif
proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0();
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
if (do_flush) {
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {
return parallel_for_each(cfs.begin(), cfs.end(), [&db] (const utils::UUID& id) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
auto& cfs = column_families;
co_await parallel_for_each(cfs.begin(), cfs.end(), [&] (const utils::UUID& id) -> future<> {
auto& cf = db.find_column_family(id);
return cf.flush();
co_await cf.flush();
});
}).get();
});
}
// with new data applied
auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0();
auto&& new_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0();
auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto&& new_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& new_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
auto&& new_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
auto new_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
#if 0 // not in 2.1.8
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
#endif
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
auto types_to_drop = merge_types(proxy, std::move(old_types), std::move(new_types)).get0();
merge_tables_and_views(proxy,
std::set<sstring> keyspaces_to_drop = co_await merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
co_await merge_tables_and_views(proxy,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views)).get0();
merge_functions(proxy, std::move(old_functions), std::move(new_functions)).get0();
std::move(old_views), std::move(new_views));
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
#if 0
mergeAggregates(oldAggregates, newAggregates);
#endif
types_to_drop.drop().get0();
co_await types_to_drop.drop();
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
return do_for_each(keyspaces_to_drop, [&db] (sstring keyspace_to_drop) {
for (auto keyspace_to_drop : keyspaces_to_drop) {
db.drop_keyspace(keyspace_to_drop);
return db.get_notifier().drop_keyspace(keyspace_to_drop);
});
}).get0();
});
co_await db.get_notifier().drop_keyspace(keyspace_to_drop);
}
});
}
}
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after)