db: schema_tables: futurize and coroutinize read_tables_for_keyspaces()

Right now, read_tables_for_keyspaces() expects to be called in a thread.
Remove that requirement by converting it into a coroutine and returning
a future.

De-threading helps reduce errors where something expects to be called
in a thread, but isn't.
This commit is contained in:
Avi Kivity
2021-08-01 14:36:02 +03:00
parent cd1003daad
commit ee8b02f437

View File

@@ -951,21 +951,20 @@ static utils::UUID table_id_from_mutations(const schema_mutations& sm) {
return table_row.get_nonnull<utils::UUID>("id");
}
// Call inside a seastar thread
static
std::map<utils::UUID, schema_mutations>
future<std::map<utils::UUID, schema_mutations>>
read_tables_for_keyspaces(distributed<service::storage_proxy>& proxy, const std::set<sstring>& keyspace_names, schema_ptr s)
{
std::map<utils::UUID, schema_mutations> result;
for (auto&& keyspace_name : keyspace_names) {
for (auto&& table_name : read_table_names_of_keyspace(proxy, keyspace_name, s).get0()) {
for (auto&& table_name : co_await read_table_names_of_keyspace(proxy, keyspace_name, s)) {
auto qn = qualified_name(keyspace_name, table_name);
auto muts = read_table_mutations(proxy, qn, s).get0();
auto muts = co_await read_table_mutations(proxy, qn, s);
auto id = table_id_from_mutations(muts);
result.emplace(std::move(id), std::move(muts));
}
}
return result;
co_return result;
}
mutation compact_for_schema_digest(const mutation& m) {
@@ -1078,9 +1077,9 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
// current state of the schema
auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0();
auto&& old_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views());
auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0();
auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
#if 0 // not in 2.1.8
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
@@ -1099,9 +1098,9 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
// with new data applied
auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0();
auto&& new_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views());
auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0();
auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
#if 0 // not in 2.1.8
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();