db: schema_tables: futurize and coroutinize merge_functions()

Right now, merge_functions() expects to be called in a thread.
Remove that requirement by converting it into a coroutine and returning
a future.

De-threading helps reduce errors where something expects to be called
in a thread, but isn't.
This commit is contained in:
Avi Kivity
2021-08-01 16:02:05 +03:00
parent 9cbae212bf
commit 9680d9e76c

View File

@@ -181,7 +181,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
schema_result before,
schema_result after);
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
@@ -1111,7 +1111,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
merge_tables_and_views(proxy,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views)).get0();
merge_functions(proxy, std::move(old_functions), std::move(new_functions));
merge_functions(proxy, std::move(old_functions), std::move(new_functions)).get0();
#if 0
mergeAggregates(oldAggregates, newAggregates);
#endif
@@ -1598,11 +1598,11 @@ static shared_ptr<cql3::functions::user_function> create_func(database& db, cons
row.get_nonnull<bool>("called_on_null_input"), std::move(bitcode), std::move(cfg));
}
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
std::function<shared_ptr<cql3::functions::function>(database& db, const query::result_set_row& row)> create) {
auto diff = diff_rows(before, after);
proxy.local().get_db().invoke_on_all([&diff, create] (database& db) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) {
for (const auto& val : diff.created) {
cql3::functions::functions::add_function(create(db, *val));
}
@@ -1613,11 +1613,11 @@ static void merge_functions(distributed<service::storage_proxy>& proxy, schema_r
for (const auto& val : diff.altered) {
cql3::functions::functions::replace_function(create(db, *val));
}
}).get();
});
}
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
return merge_functions(proxy, before, after, create_func);
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
co_await merge_functions(proxy, before, after, create_func);
}
template<typename... Args>