Merge 'db: schema_tables: coroutinize' from Avi Kivity

schema_tables is quite hairy, but can be easily simplified with coroutines.

In addition to switching future-returning functions to coroutines, we also
switch Seastar threads to coroutines. This is less of a clear-cut win; the
motivation is to reduce the chances of someone calling a function that
expects to run in a thread from a non-thread context. This sometimes works
by accident, but when it doesn't, it's pretty bad. So a uniform calling convention
has some benefit.

I left the extra indents in, since the indent-fixing patch is hard to rebase in case
a rebase is needed. I will follow up with an indent fix post merge.

Test: unit (dev, debug, release)

Closes #9118

* github.com:scylladb/scylla:
  db: schema_tables: drop now redundant #includes
  db: schema_tables: coroutinize drop_column_mapping()
  db: schema_tables: coroutinize column_mapping_exists()
  db: schema_tables: coroutinize get_column_mapping()
  db: schema_tables: coroutinize read_table_mutations()
  db: schema_tables: coroutinize create_views_from_schema_partition()
  db: schema_tables: coroutinize create_views_from_table_row()
  db: schema_tables: unpeel lw_shared_ptr in create_Tables_from_tables_partition()
  db: schema_tables: coroutinize create_tables_from_tables_partition()
  db: schema_tables: coroutinize create_table_from_name()
  db: schema_tables: coroutinize read_table_mutations()
  db: schema_tables: coroutinize merge_keyspaces()
  db: schema_tables: coroutinize do_merge_schema()
  db: schema_tables: futurize and coroutinize merge_functions()
  db: schema_tables: futurize and coroutinize user_types_to_drop::drop
  db: schema_tables: futurize and coroutinize merge_types()
  db: schema_tables: futurize and coroutinize merge_tables_and_views()
  db: schema_tables: coroutinize store_column_mapping()
  db: schema_tables: futurize and coroutinize read_tables_for_keyspaces()
  db: schema_tables: coroutinize read_table_names_of_keyspace()
  db: schema_tables: coroutinize recalculate_schema_version()
  db: schema_tables: coroutinize merge_schema()
  db: schema_tables: introduce and use with_merge_lock()
  db: schema_tables: coroutinize update_schema_version_and_announce()
  db: schema_tables: coroutinize read_keyspace_mutation()
  db: schema_tables: coroutinize read_schema_partition_for_table()
  db: schema_tables: coroutinize read_schema_partition_for_keyspace()
  db: schema_tables: coroutinize query_partition_mutation()
  db: schema_tables: coroutinize read_schema_for_keyspaces()
  db: schema_tables: coroutinize convert_schema_to_mutations()
  db: schema_tables: coroutinize calculate_schema_digest()
  db: schema_tables: coroutinize save_system_schema()
This commit is contained in:
Nadav Har'El
2021-08-02 13:43:53 +03:00

View File

@@ -52,8 +52,7 @@
#include "schema_builder.hh"
#include "map_difference.hh"
#include "utils/UUID_gen.hh"
#include <seastar/core/do_with.hh>
#include <seastar/core/thread.hh>
#include <seastar/coroutine/all.hh>
#include "log.hh"
#include "frozen_schema.hh"
#include "schema_registry.hh"
@@ -73,6 +72,7 @@
#include <seastar/util/noncopyable_function.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/core/coroutine.hh>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/range/algorithm/copy.hpp>
@@ -166,21 +166,21 @@ struct qualified_name {
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
std::map<utils::UUID, schema_mutations>&& tables_before,
std::map<utils::UUID, schema_mutations>&& tables_after,
std::map<utils::UUID, schema_mutations>&& views_before,
std::map<utils::UUID, schema_mutations>&& views_after);
struct user_types_to_drop final {
seastar::noncopyable_function<void()> drop;
struct [[nodiscard]] user_types_to_drop final {
seastar::noncopyable_function<future<> ()> drop;
};
[[nodiscard]] static user_types_to_drop merge_types(distributed<service::storage_proxy>& proxy,
static future<user_types_to_drop> merge_types(distributed<service::storage_proxy>& proxy,
schema_result before,
schema_result after);
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
@@ -233,14 +233,15 @@ future<> save_system_schema(cql3::query_processor& qp, const sstring & ksname) {
auto ksm = ks.metadata();
// delete old, possibly obsolete entries in schema tables
return parallel_for_each(all_table_names(schema_features::full()), [ksm] (sstring cf) {
co_await parallel_for_each(all_table_names(schema_features::full()), [ksm] (sstring cf) -> future<> {
auto deletion_timestamp = schema_creation_timestamp() - 1;
return qctx->execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
co_await qctx->execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
deletion_timestamp), ksm->name()).discard_result();
}).then([ksm, &qp] {
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
return qp.proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
});
{
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
co_await qp.proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
}
}
/** add entries to system_schema.* for the hardcoded system definitions */
@@ -701,8 +702,9 @@ redact_columns_for_missing_features(mutation m, schema_features features) {
*/
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
{
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features, &accept_keyspace] (auto rs) {
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
{
auto s = proxy.local().get_db().local().find_schema(NAME, table);
std::vector<mutation> mutations;
for (auto&& p : rs->partitions()) {
@@ -714,17 +716,20 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
mut = redact_columns_for_missing_features(std::move(mut), features);
mutations.emplace_back(std::move(mut));
}
return mutations;
});
co_return mutations;
}
};
auto reduce = [features] (auto& hash, auto&& mutations) {
for (const mutation& m : mutations) {
feed_hash_for_schema_digest(hash, m, features);
}
};
return do_with(md5_hasher(), all_table_names(features), std::move(map), [features, reduce] (auto& hash, auto& tables, auto& map) mutable {
return do_for_each(tables, [&hash, &map, reduce, features] (auto& table) mutable {
return map(table).then([&hash, reduce, features] (auto&& mutations) {
auto hash = md5_hasher();
auto tables = all_table_names(features);
{
for (auto& table: tables) {
auto mutations = co_await map(table);
{
if (diff_logger.is_enabled(logging::log_level::trace)) {
for (const mutation& m : mutations) {
md5_hasher h;
@@ -733,11 +738,12 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
}
}
reduce(hash, mutations);
});
}).then([&hash] {
return make_ready_future<utils::UUID>(utils::UUID_gen::get_name_UUID(hash.finalize()));
});
});
}
}
{
co_return utils::UUID_gen::get_name_UUID(hash.finalize());
}
}
}
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
@@ -747,8 +753,9 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
{
auto map = [&proxy, features] (sstring table) {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
auto map = [&proxy, features] (sstring table) -> future<std::vector<canonical_mutation>> {
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
{
auto s = proxy.local().get_db().local().find_schema(NAME, table);
std::vector<canonical_mutation> results;
for (auto&& p : rs->partitions()) {
@@ -760,14 +767,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
mut = redact_columns_for_missing_features(std::move(mut), features);
results.emplace_back(mut);
}
return results;
});
co_return results;
}
};
auto reduce = [] (auto&& result, auto&& mutations) {
std::move(mutations.begin(), mutations.end(), std::back_inserter(result));
return std::move(result);
};
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
co_return co_await map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
}
std::vector<mutation>
@@ -795,7 +802,7 @@ read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstr
}
return std::move(result);
};
return map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result{}, insert);
co_return co_await map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result{}, insert);
}
static
@@ -805,31 +812,35 @@ future<mutation> query_partition_mutation(service::storage_proxy& proxy,
partition_key pkey)
{
auto dk = dht::decorate_key(*s, pkey);
return do_with(dht::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) {
return proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout, tracing::trace_state_ptr{})
.then([dk = std::move(dk), s](rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> res_hit_rate) {
auto range = dht::partition_range::make_singular(dk);
{
auto res_hit_rate = co_await proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout, tracing::trace_state_ptr{});
{
auto&& [res, hit_rate] = res_hit_rate;
auto&& partitions = res->partitions();
if (partitions.size() == 0) {
return mutation(s, std::move(dk));
co_return mutation(s, std::move(dk));
} else if (partitions.size() == 1) {
return partitions[0].mut().unfreeze(s);
co_return partitions[0].mut().unfreeze(s);
} else {
throw std::invalid_argument("Results must have at most one partition");
}
});
});
}
}
}
future<schema_result_value_type>
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name)
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name_, const sstring& keyspace_name_)
{
auto schema_table_name = schema_table_name_; // capture for co_await
auto keyspace_name = keyspace_name_; // capture for co_await
auto schema = proxy.local().get_db().local().find_schema(NAME, schema_table_name);
auto keyspace_key = dht::decorate_key(*schema,
partition_key::from_singular(*schema, keyspace_name));
return db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key).then([keyspace_name] (auto&& rs) {
return schema_result_value_type{keyspace_name, std::move(rs)};
});
auto rs = co_await db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key);
{
co_return schema_result_value_type{keyspace_name, std::move(rs)};
}
}
future<mutation>
@@ -843,7 +854,7 @@ read_schema_partition_for_table(distributed<service::storage_proxy>& proxy, sche
.build();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice),
query::row_limit(query::max_rows));
return query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
co_return co_await query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
}
future<mutation>
@@ -852,7 +863,7 @@ read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring
auto key = partition_key::from_singular(*s, keyspace_name);
auto slice = s->full_slice();
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice));
return query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
}
static thread_local semaphore the_merge_lock {1};
@@ -865,17 +876,34 @@ future<> merge_unlock() {
return smp::submit_to(0, [] { the_merge_lock.signal(); });
}
static future<> with_merge_lock(noncopyable_function<future<> ()> func) {
co_await merge_lock();
std::exception_ptr ep;
try {
co_await func();
} catch (...) {
ep = std::current_exception();
}
co_await merge_unlock();
if (ep) {
std::rethrow_exception(std::move(ep));
}
}
static
future<> update_schema_version_and_announce(distributed<service::storage_proxy>& proxy, schema_features features) {
return calculate_schema_digest(proxy, features).then([&proxy] (utils::UUID uuid) {
return db::system_keyspace::update_schema_version(uuid).then([&proxy, uuid] {
return proxy.local().get_db().invoke_on_all([uuid] (database& db) {
auto uuid = co_await calculate_schema_digest(proxy, features);
{
co_await db::system_keyspace::update_schema_version(uuid);
{
co_await proxy.local().get_db().invoke_on_all([uuid] (database& db) {
db.update_version(uuid);
});
}).then([uuid] {
}
{
slogger.info("Schema version changed to {}", uuid);
});
});
}
}
}
/**
@@ -889,20 +917,17 @@ future<> update_schema_version_and_announce(distributed<service::storage_proxy>&
*/
future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
{
return merge_lock().then([&proxy, &feat, mutations = std::move(mutations)] () mutable {
return do_merge_schema(proxy, std::move(mutations), true).then([&proxy, &feat] {
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
});
}).finally([] {
return merge_unlock();
co_await with_merge_lock([&] () mutable -> future<> {
co_await do_merge_schema(proxy, std::move(mutations), true);
{
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
}
});
}
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
return merge_lock().then([&proxy, &feat] {
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
}).finally([] {
return merge_unlock();
co_await with_merge_lock([&] () -> future<> {
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
});
}
@@ -910,12 +935,13 @@ future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy,
future<std::vector<sstring>>
static read_table_names_of_keyspace(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name, schema_ptr schema_table) {
auto pkey = dht::decorate_key(*schema_table, partition_key::from_singular(*schema_table, keyspace_name));
return db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey).then([schema_table] (auto&& rs) {
return boost::copy_range<std::vector<sstring>>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) {
auto&& rs = co_await db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey);
{
co_return boost::copy_range<std::vector<sstring>>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) {
const sstring name = schema_table->clustering_key_columns().begin()->name_as_text();
return row.get_nonnull<sstring>(name);
}));
});
}
}
static utils::UUID table_id_from_mutations(const schema_mutations& sm) {
@@ -924,21 +950,20 @@ static utils::UUID table_id_from_mutations(const schema_mutations& sm) {
return table_row.get_nonnull<utils::UUID>("id");
}
// Call inside a seastar thread
static
std::map<utils::UUID, schema_mutations>
future<std::map<utils::UUID, schema_mutations>>
read_tables_for_keyspaces(distributed<service::storage_proxy>& proxy, const std::set<sstring>& keyspace_names, schema_ptr s)
{
std::map<utils::UUID, schema_mutations> result;
for (auto&& keyspace_name : keyspace_names) {
for (auto&& table_name : read_table_names_of_keyspace(proxy, keyspace_name, s).get0()) {
for (auto&& table_name : co_await read_table_names_of_keyspace(proxy, keyspace_name, s)) {
auto qn = qualified_name(keyspace_name, table_name);
auto muts = read_table_mutations(proxy, qn, s).get0();
auto muts = co_await read_table_mutations(proxy, qn, s);
auto id = table_id_from_mutations(muts);
result.emplace(std::move(id), std::move(muts));
}
}
return result;
co_return result;
}
mutation compact_for_schema_digest(const mutation& m) {
@@ -1009,7 +1034,7 @@ static void fill_column_info(const schema& table,
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl) {
// Skip "system*" tables -- only user-related tables are relevant
if (static_cast<std::string_view>(s->ks_name()).starts_with(db::system_keyspace::NAME)) {
return make_ready_future<>();
co_return;
}
schema_ptr history_tbl = scylla_table_schema_history();
@@ -1030,12 +1055,12 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
fill_column_info(*s, ckey, cdef, ts, ttl, m);
muts.emplace_back(std::move(m));
}
return proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
co_await proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
}
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
{
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable {
{
slogger.trace("do_merge_schema: {}", mutations);
schema_ptr s = keyspaces();
// compare before/after schemas of the affected keyspaces only
@@ -1050,55 +1075,56 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
}
// current state of the schema
auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& old_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views());
auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
auto&& old_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto&& old_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& old_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
auto&& old_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
auto old_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
#if 0 // not in 2.1.8
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
#endif
proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0();
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
if (do_flush) {
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {
return parallel_for_each(cfs.begin(), cfs.end(), [&db] (const utils::UUID& id) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
auto& cfs = column_families;
co_await parallel_for_each(cfs.begin(), cfs.end(), [&] (const utils::UUID& id) -> future<> {
auto& cf = db.find_column_family(id);
return cf.flush();
co_await cf.flush();
});
}).get();
});
}
// with new data applied
auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& new_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views());
auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
auto&& new_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
auto&& new_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
auto&& new_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
auto new_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
#if 0 // not in 2.1.8
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
#endif
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
auto types_to_drop = merge_types(proxy, std::move(old_types), std::move(new_types));
merge_tables_and_views(proxy,
std::set<sstring> keyspaces_to_drop = co_await merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
co_await merge_tables_and_views(proxy,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views));
merge_functions(proxy, std::move(old_functions), std::move(new_functions));
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
#if 0
mergeAggregates(oldAggregates, newAggregates);
#endif
types_to_drop.drop();
co_await types_to_drop.drop();
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
return do_for_each(keyspaces_to_drop, [&db] (sstring keyspace_to_drop) {
for (auto keyspace_to_drop : keyspaces_to_drop) {
db.drop_keyspace(keyspace_to_drop);
return db.get_notifier().drop_keyspace(keyspace_to_drop);
});
}).get0();
});
co_await db.get_notifier().drop_keyspace(keyspace_to_drop);
}
});
}
}
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after)
@@ -1132,24 +1158,25 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
slogger.info("Altering keyspace {}", key);
altered.emplace_back(key);
}
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) mutable {
return do_with(std::move(altered), [&proxy, &created](auto& altered) {
return proxy.local().get_db().invoke_on_all([&created, &altered, &proxy] (database& db) {
return do_for_each(created, [&db](auto&& val) {
{
{
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
for (auto&& val : created) {
auto ksm = create_keyspace_from_schema_partition(val);
return db.create_keyspace(ksm).then([&db, ksm] {
return db.get_notifier().create_keyspace(ksm);
});
}).then([&altered, &db, &proxy]() {
return do_for_each(altered, [&db, &proxy](auto& name) {
return db.update_keyspace(proxy, name);
});
});
co_await db.create_keyspace(ksm);
co_await db.get_notifier().create_keyspace(ksm);
}
{
for (auto& name : altered) {
co_await db.update_keyspace(proxy, name);
};
}
});
});
}).then([dropped = std::move(dropped)] () {
return make_ready_future<std::set<sstring>>(dropped);
});
}
}
{
co_return dropped;
}
}
struct schema_diff {
@@ -1205,7 +1232,7 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
// that when a base schema and a subset of its views are modified together (i.e.,
// upon an alter table or alter type statement), then they are published together
// as well, without any deferring in-between.
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
std::map<utils::UUID, schema_mutations>&& tables_before,
std::map<utils::UUID, schema_mutations>&& tables_after,
std::map<utils::UUID, schema_mutations>&& views_before,
@@ -1253,28 +1280,28 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
return vp;
});
proxy.local().get_db().invoke_on_all([&] (database& db) {
return seastar::async([&] {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
{
// First drop views and *only then* the tables, if interleaved it can lead
// to a mv not finding its schema when snapshoting since the main table
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
parallel_for_each(views_diff.dropped, [&] (schema_diff::dropped_schema& dt) {
co_await parallel_for_each(views_diff.dropped, [&] (schema_diff::dropped_schema& dt) -> future<> {
auto& s = *dt.schema.get();
return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
}).get();
parallel_for_each(tables_diff.dropped, [&] (schema_diff::dropped_schema& dt) {
co_await db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
});
co_await parallel_for_each(tables_diff.dropped, [&] (schema_diff::dropped_schema& dt) -> future<> {
auto& s = *dt.schema.get();
return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
}).get();
co_await db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
});
// In order to avoid possible races we first create the tables and only then the views.
// That way if a view seeks information about its base table it's guarantied to find it.
parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) {
return db.add_column_family_and_make_directory(gs);
}).get();
parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) {
return db.add_column_family_and_make_directory(gs);
}).get();
co_await parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
});
co_await parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
});
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
db.find_column_family(gs).mark_ready_for_writes();
}
@@ -1285,21 +1312,21 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
}
auto it = columns_changed.begin();
auto notify = [&] (auto& r, auto&& f) {
auto notify = [&] (auto& r, auto&& f) -> future<> {
auto notifications = r | boost::adaptors::transformed(f);
when_all(notifications.begin(), notifications.end()).get();
co_await when_all(notifications.begin(), notifications.end());
};
// View drops are notified first, because a table can only be dropped if its views are already deleted
notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
notify(tables_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_column_family(dt.schema); });
co_await notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
co_await notify(tables_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_column_family(dt.schema); });
// Table creations are notified first, in case a view is created right after the table
notify(tables_diff.created, [&] (auto&& gs) { return db.get_notifier().create_column_family(gs); });
notify(views_diff.created, [&] (auto&& gs) { return db.get_notifier().create_view(view_ptr(gs)); });
co_await notify(tables_diff.created, [&] (auto&& gs) { return db.get_notifier().create_column_family(gs); });
co_await notify(views_diff.created, [&] (auto&& gs) { return db.get_notifier().create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
notify(tables_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_column_family(altered.new_schema, *it++); });
notify(views_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_view(view_ptr(altered.new_schema), *it++); });
});
}).get();
co_await notify(tables_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_column_family(altered.new_schema, *it++); });
co_await notify(views_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_view(view_ptr(altered.new_schema), *it++); });
}
});
// Insert column_mapping into history table for altered and created tables.
//
@@ -1311,20 +1338,20 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
//
// Drop column mapping entries for dropped tables since these will not be TTLed automatically
// and will stay there forever if we don't clean them up manually
when_all_succeed(
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) {
return store_column_mapping(proxy, gs.get(), false);
co_await when_all_succeed(
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) -> future<> {
co_await store_column_mapping(proxy, gs.get(), false);
}),
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) {
return when_all_succeed(
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
co_await when_all_succeed(
store_column_mapping(proxy, altered.old_schema.get(), true),
store_column_mapping(proxy, altered.new_schema.get(), false)).discard_result();
store_column_mapping(proxy, altered.new_schema.get(), false));
}),
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) {
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
schema_ptr s = dropped.schema.get();
return drop_column_mapping(s->id(), s->version());
co_await drop_column_mapping(s->id(), s->version());
})
).get();
);
}
static std::vector<const query::result_set_row*> collect_rows(const std::set<sstring>& keys, const schema_result& result) {
@@ -1451,7 +1478,7 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
}
// see the comments for merge_keyspaces()
[[nodiscard]] static user_types_to_drop merge_types(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after)
static future<user_types_to_drop> merge_types(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after)
{
auto diff = diff_rows(before, after);
@@ -1459,28 +1486,29 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
// use those types. Similarly, defer dropping until after tables/views that may use
// some of these user types are dropped.
proxy.local().get_db().invoke_on_all([&diff] (database& db) {
return seastar::async([&] {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
{
for (auto&& user_type : create_types(db, diff.created)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
db.get_notifier().create_user_type(user_type).get();
co_await db.get_notifier().create_user_type(user_type);
}
for (auto&& user_type : create_types(db, diff.altered)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
db.get_notifier().update_user_type(user_type).get();
co_await db.get_notifier().update_user_type(user_type);
}
}
});
co_return user_types_to_drop{[&proxy, before = std::move(before), rows = std::move(diff.dropped)] () mutable -> future<> {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
auto dropped = create_types(db, rows);
{
for (auto& user_type : dropped) {
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
co_await db.get_notifier().drop_user_type(user_type);
}
}
});
}).get();
return user_types_to_drop{[&proxy, before = std::move(before), rows = std::move(diff.dropped)] () mutable {
proxy.local().get_db().invoke_on_all([&rows](database& db) {
return do_with(create_types(db, rows), [&db] (auto &dropped) {
return do_for_each(dropped, [&db](auto& user_type) {
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
return db.get_notifier().drop_user_type(user_type);
});
});
}).get();
}};
}
@@ -1571,11 +1599,11 @@ static shared_ptr<cql3::functions::user_function> create_func(database& db, cons
row.get_nonnull<bool>("called_on_null_input"), std::move(bitcode), std::move(cfg));
}
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
std::function<shared_ptr<cql3::functions::function>(database& db, const query::result_set_row& row)> create) {
auto diff = diff_rows(before, after);
proxy.local().get_db().invoke_on_all([&diff, create] (database& db) {
co_await proxy.local().get_db().invoke_on_all([&] (database& db) {
for (const auto& val : diff.created) {
cql3::functions::functions::add_function(create(db, *val));
}
@@ -1586,11 +1614,11 @@ static void merge_functions(distributed<service::storage_proxy>& proxy, schema_r
for (const auto& val : diff.altered) {
cql3::functions::functions::replace_function(create(db, *val));
}
}).get();
});
}
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
return merge_functions(proxy, before, after, create_func);
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
co_await merge_functions(proxy, before, after, create_func);
}
template<typename... Args>
@@ -2269,17 +2297,18 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s)
{
return when_all_succeed(
read_schema_partition_for_table(proxy, s, table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, columns(), table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, view_virtual_columns(), table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, computed_columns(), table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, dropped_columns(), table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, indexes(), table.keyspace_name, table.table_name),
read_schema_partition_for_table(proxy, scylla_tables(), table.keyspace_name, table.table_name)).then_unpack(
[] (mutation cf_m, mutation col_m, mutation vv_col_m, mutation c_col_m, mutation dropped_m, mutation idx_m, mutation st_m) {
return schema_mutations{std::move(cf_m), std::move(col_m), std::move(vv_col_m), std::move(c_col_m), std::move(idx_m), std::move(dropped_m), std::move(st_m)};
});
auto&& [cf_m, col_m, vv_col_m, c_col_m, dropped_m, idx_m, st_m] = co_await coroutine::all(
[&] { return read_schema_partition_for_table(proxy, s, table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, columns(), table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, view_virtual_columns(), table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, computed_columns(), table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, dropped_columns(), table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, indexes(), table.keyspace_name, table.table_name); },
[&] { return read_schema_partition_for_table(proxy, scylla_tables(), table.keyspace_name, table.table_name); }
);
{
co_return schema_mutations{std::move(cf_m), std::move(col_m), std::move(vv_col_m), std::move(c_col_m), std::move(idx_m), std::move(dropped_m), std::move(st_m)};
}
#if 0
// FIXME:
Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
@@ -2297,14 +2326,16 @@ static future<schema_mutations> read_table_mutations(distributed<service::storag
future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table)
{
return do_with(qualified_name(keyspace, table), [&proxy] (qualified_name& qn) {
return read_table_mutations(proxy, qn, tables()).then([qn, &proxy] (schema_mutations sm) {
auto qn = qualified_name(keyspace, table);
{
auto sm = co_await read_table_mutations(proxy, qn, tables());
{
if (!sm.live()) {
throw std::runtime_error(format("{}:{} not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name));
}
return create_table_from_mutations(proxy, std::move(sm));
});
});
co_return create_table_from_mutations(proxy, std::move(sm));
}
}
}
/**
@@ -2314,14 +2345,16 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
*/
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
{
auto tables = make_lw_shared<std::map<sstring, schema_ptr>>();
return parallel_for_each(result->rows().begin(), result->rows().end(), [&proxy, tables] (const query::result_set_row& row) {
return create_table_from_table_row(proxy, row).then([tables] (schema_ptr&& cfm) {
tables->emplace(cfm->cf_name(), std::move(cfm));
});
}).then([tables] {
return std::move(*tables);
auto tables = std::map<sstring, schema_ptr>();
co_await parallel_for_each(result->rows().begin(), result->rows().end(), [&] (const query::result_set_row& row) -> future<> {
schema_ptr cfm = co_await create_table_from_table_row(proxy, row);
{
tables.emplace(cfm->cf_name(), std::move(cfm));
}
});
{
co_return std::move(tables);
}
}
#if 0
@@ -2795,14 +2828,15 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
static future<view_ptr> create_view_from_table_row(distributed<service::storage_proxy>& proxy, const query::result_set_row& row) {
qualified_name qn(row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("view_name"));
return do_with(std::move(qn), [&proxy] (auto&& qn) {
return read_table_mutations(proxy, qn, views()).then([&] (schema_mutations sm) {
{
schema_mutations sm = co_await read_table_mutations(proxy, qn, views());
{
if (!sm.live()) {
throw std::runtime_error(format("{}:{} not found in the view definitions keyspace.", qn.keyspace_name, qn.table_name));
}
return create_view_from_mutations(proxy, std::move(sm));
});
});
co_return create_view_from_mutations(proxy, std::move(sm));
}
}
}
/**
@@ -2812,15 +2846,18 @@ static future<view_ptr> create_view_from_table_row(distributed<service::storage_
*/
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
{
return do_with(std::vector<view_ptr>(), [&] (auto& views) {
return parallel_for_each(result->rows().begin(), result->rows().end(), [&proxy, &views] (auto&& row) {
return create_view_from_table_row(proxy, row).then([&views] (auto&& v) {
std::vector<view_ptr> views;
{
co_await parallel_for_each(result->rows().begin(), result->rows().end(), [&] (auto&& row) -> future<> {
auto v = co_await create_view_from_table_row(proxy, row);
{
views.push_back(std::move(v));
});
}).then([&views] {
return std::move(views);
}
});
});
{
co_return std::move(views);
}
}
}
static schema_mutations make_view_mutations(view_ptr view, api::timestamp_type timestamp, bool with_columns)
@@ -3141,13 +3178,13 @@ table_schema_version schema_mutations::digest() const {
future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s)
{
return read_schema_partition_for_table(proxy, s, keyspace_name, table_name)
.then([&proxy, keyspace_name, table_name] (mutation cf_m) {
return read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name)
.then([cf_m = std::move(cf_m)] (mutation col_m) {
return schema_mutations{std::move(cf_m), std::move(col_m)};
});
});
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
{
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
{
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
}
}
}
} // namespace legacy
@@ -3156,17 +3193,17 @@ static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_or
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version version) {
auto cm_fut = qctx->qp().execute_internal(
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id, version}
);
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
{
if (results->empty()) {
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
// Fail the whole learn stage in this case.
return make_exception_future<column_mapping>(std::runtime_error(
co_return coroutine::make_exception(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
@@ -3193,28 +3230,29 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
cm_columns.emplace_back(column_mapping_entry{def.name(), def.type});
}
column_mapping cm(std::move(cm_columns), static_columns.size());
return make_ready_future<column_mapping>(std::move(cm));
});
co_return std::move(cm);
}
}
future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version version) {
return qctx->qp().execute_internal(
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id, version}
).then([] (shared_ptr<cql3::untyped_result_set> results) {
return !results->empty();
});
);
{
co_return !results->empty();
}
}
future<> drop_column_mapping(utils::UUID table_id, table_schema_version version) {
const static sstring DEL_COLUMN_MAPPING_QUERY =
format("DELETE FROM system.{} WHERE cf_id = ? and schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
return qctx->qp().execute_internal(
co_await qctx->qp().execute_internal(
DEL_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id, version}).discard_result();
{table_id, version});
}
} // namespace schema_tables