diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 2280719481..e9b68891f4 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1061,7 +1061,7 @@ future<> store_column_mapping(distributed& proxy, schema static future<> do_merge_schema(distributed& proxy, std::vector mutations, bool do_flush) { - return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable { + { slogger.trace("do_merge_schema: {}", mutations); schema_ptr s = keyspaces(); // compare before/after schemas of the affected keyspaces only @@ -1076,55 +1076,56 @@ static future<> do_merge_schema(distributed& proxy, std: } // current state of the schema - auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0(); - auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0(); - auto&& old_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0(); - auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0(); - auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0(); + auto&& old_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces); + auto&& old_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables()); + auto&& old_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces); + auto&& old_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views()); + auto old_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces); #if 0 // not in 2.1.8 /*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0(); #endif - proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0(); + co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()); if (do_flush) { - proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) { - return parallel_for_each(cfs.begin(), cfs.end(), [&db] (const utils::UUID& id) { + co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> { + auto& cfs = column_families; + co_await parallel_for_each(cfs.begin(), cfs.end(), [&] (const utils::UUID& id) -> future<> { auto& cf = db.find_column_family(id); - return cf.flush(); + co_await cf.flush(); }); - }).get(); + }); } // with new data applied - auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0(); - auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables()).get0(); - auto&& new_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0(); - auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views()).get0(); - auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0(); + auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces); + auto&& new_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables()); + auto&& new_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces); + auto&& new_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views()); + auto new_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces); #if 0 // not in 2.1.8 /*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0(); #endif - std::set keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0(); - auto types_to_drop = merge_types(proxy, std::move(old_types), std::move(new_types)).get0(); - merge_tables_and_views(proxy, + std::set keyspaces_to_drop = co_await merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)); + auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types)); + co_await merge_tables_and_views(proxy, std::move(old_column_families), std::move(new_column_families), - std::move(old_views), std::move(new_views)).get0(); - merge_functions(proxy, std::move(old_functions), std::move(new_functions)).get0(); + std::move(old_views), std::move(new_views)); + co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions)); #if 0 mergeAggregates(oldAggregates, newAggregates); #endif - types_to_drop.drop().get0(); + co_await types_to_drop.drop(); - proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) { + co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> { // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted - return do_for_each(keyspaces_to_drop, [&db] (sstring keyspace_to_drop) { + for (auto keyspace_to_drop : keyspaces_to_drop) { db.drop_keyspace(keyspace_to_drop); - return db.get_notifier().drop_keyspace(keyspace_to_drop); - }); - }).get0(); - }); + co_await db.get_notifier().drop_keyspace(keyspace_to_drop); + } + }); + } } future> merge_keyspaces(distributed& proxy, schema_result&& before, schema_result&& after)