Merge 'db: schema_tables: coroutinize' from Avi Kivity
schema_tables is quite hairy, but can be easily simplified with coroutines. In addition to switching future-returning functions to coroutines, we also switch Seastar threads to coroutines. This is less of a clear-cut win; the motivation is to reduce the chances of someone calling a function that expects to run in a thread from a non-thread context. This sometimes works by accident, but when it doesn't, it's pretty bad. So a uniform calling convention has some benefit. I left the extra indents in, since the indent-fixing patch is hard to rebase in case a rebase is needed. I will follow up with an indent fix post merge. Test: unit (dev, debug, release) Closes #9118 * github.com:scylladb/scylla: db: schema_tables: drop now redundant #includes db: schema_tables: coroutinize drop_column_mapping() db: schema_tables: coroutinize column_mapping_exists() db: schema_tables: coroutinize get_column_mapping() db: schema_tables: coroutinize read_table_mutations() db: schema_tables: coroutinize create_views_from_schema_partition() db: schema_tables: coroutinize create_views_from_table_row() db: schema_tables: unpeel lw_shared_ptr in create_Tables_from_tables_partition() db: schema_tables: coroutinize create_tables_from_tables_partition() db: schema_tables: coroutinize create_table_from_name() db: schema_tables: coroutinize read_table_mutations() db: schema_tables: coroutinize merge_keyspaces() db: schema_tables: coroutinize do_merge_schema() db: schema_tables: futurize and coroutinize merge_functions() db: schema_tables: futurize and coroutinize user_types_to_drop::drop db: schema_tables: futurize and coroutinize merge_types() db: schema_tables: futurize and coroutinize merge_tables_and_views() db: schema_tables: coroutinize store_column_mapping() db: schema_tables: futurize and coroutinize read_tables_for_keyspaces() db: schema_tables: coroutinize read_table_names_of_keyspace() db: schema_tables: coroutinize recalculate_schema_version() db: schema_tables: coroutinize merge_schema() db: schema_tables: introduce and use with_merge_lock() db: schema_tables: coroutinize update_schema_version_and_announce() db: schema_tables: coroutinize read_keyspace_mutation() db: schema_tables: coroutinize read_schema_partition_for_table() db: schema_tables: coroutinize read_schema_partition_for_keyspace() db: schema_tables: coroutinize query_partition_mutation() db: schema_tables: coroutinize read_schema_for_keyspaces() db: schema_tables: coroutinize convert_schema_to_mutations() db: schema_tables: coroutinize calculate_schema_digest() db: schema_tables: coroutinize save_system_schema()
This commit is contained in:
@@ -52,8 +52,7 @@
|
||||
#include "schema_builder.hh"
|
||||
#include "map_difference.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/coroutine/all.hh>
|
||||
#include "log.hh"
|
||||
#include "frozen_schema.hh"
|
||||
#include "schema_registry.hh"
|
||||
@@ -73,6 +72,7 @@
|
||||
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
@@ -166,21 +166,21 @@ struct qualified_name {
|
||||
|
||||
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
|
||||
|
||||
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
std::map<utils::UUID, schema_mutations>&& tables_before,
|
||||
std::map<utils::UUID, schema_mutations>&& tables_after,
|
||||
std::map<utils::UUID, schema_mutations>&& views_before,
|
||||
std::map<utils::UUID, schema_mutations>&& views_after);
|
||||
|
||||
struct user_types_to_drop final {
|
||||
seastar::noncopyable_function<void()> drop;
|
||||
struct [[nodiscard]] user_types_to_drop final {
|
||||
seastar::noncopyable_function<future<> ()> drop;
|
||||
};
|
||||
|
||||
[[nodiscard]] static user_types_to_drop merge_types(distributed<service::storage_proxy>& proxy,
|
||||
static future<user_types_to_drop> merge_types(distributed<service::storage_proxy>& proxy,
|
||||
schema_result before,
|
||||
schema_result after);
|
||||
|
||||
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
|
||||
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
|
||||
|
||||
@@ -233,14 +233,15 @@ future<> save_system_schema(cql3::query_processor& qp, const sstring & ksname) {
|
||||
auto ksm = ks.metadata();
|
||||
|
||||
// delete old, possibly obsolete entries in schema tables
|
||||
return parallel_for_each(all_table_names(schema_features::full()), [ksm] (sstring cf) {
|
||||
co_await parallel_for_each(all_table_names(schema_features::full()), [ksm] (sstring cf) -> future<> {
|
||||
auto deletion_timestamp = schema_creation_timestamp() - 1;
|
||||
return qctx->execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
|
||||
co_await qctx->execute_cql(format("DELETE FROM {}.{} USING TIMESTAMP {} WHERE keyspace_name = ?", NAME, cf,
|
||||
deletion_timestamp), ksm->name()).discard_result();
|
||||
}).then([ksm, &qp] {
|
||||
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
|
||||
return qp.proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
|
||||
});
|
||||
{
|
||||
auto mvec = make_create_keyspace_mutations(ksm, schema_creation_timestamp(), true);
|
||||
co_await qp.proxy().mutate_locally(std::move(mvec), tracing::trace_state_ptr());
|
||||
}
|
||||
}
|
||||
|
||||
/** add entries to system_schema.* for the hardcoded system definitions */
|
||||
@@ -701,8 +702,9 @@ redact_columns_for_missing_features(mutation m, schema_features features) {
|
||||
*/
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features, noncopyable_function<bool(std::string_view)> accept_keyspace)
|
||||
{
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features, &accept_keyspace] (auto rs) {
|
||||
auto map = [&proxy, features, accept_keyspace = std::move(accept_keyspace)] (sstring table) mutable -> future<std::vector<mutation>> {
|
||||
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
|
||||
{
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<mutation> mutations;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -714,17 +716,20 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
mutations.emplace_back(std::move(mut));
|
||||
}
|
||||
return mutations;
|
||||
});
|
||||
co_return mutations;
|
||||
}
|
||||
};
|
||||
auto reduce = [features] (auto& hash, auto&& mutations) {
|
||||
for (const mutation& m : mutations) {
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
}
|
||||
};
|
||||
return do_with(md5_hasher(), all_table_names(features), std::move(map), [features, reduce] (auto& hash, auto& tables, auto& map) mutable {
|
||||
return do_for_each(tables, [&hash, &map, reduce, features] (auto& table) mutable {
|
||||
return map(table).then([&hash, reduce, features] (auto&& mutations) {
|
||||
auto hash = md5_hasher();
|
||||
auto tables = all_table_names(features);
|
||||
{
|
||||
for (auto& table: tables) {
|
||||
auto mutations = co_await map(table);
|
||||
{
|
||||
if (diff_logger.is_enabled(logging::log_level::trace)) {
|
||||
for (const mutation& m : mutations) {
|
||||
md5_hasher h;
|
||||
@@ -733,11 +738,12 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
}
|
||||
}
|
||||
reduce(hash, mutations);
|
||||
});
|
||||
}).then([&hash] {
|
||||
return make_ready_future<utils::UUID>(utils::UUID_gen::get_name_UUID(hash.finalize()));
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
{
|
||||
co_return utils::UUID_gen::get_name_UUID(hash.finalize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
@@ -747,8 +753,9 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy, features] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
|
||||
auto map = [&proxy, features] (sstring table) -> future<std::vector<canonical_mutation>> {
|
||||
auto rs = co_await db::system_keyspace::query_mutations(proxy, NAME, table);
|
||||
{
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<canonical_mutation> results;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
@@ -760,14 +767,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
|
||||
mut = redact_columns_for_missing_features(std::move(mut), features);
|
||||
results.emplace_back(mut);
|
||||
}
|
||||
return results;
|
||||
});
|
||||
co_return results;
|
||||
}
|
||||
};
|
||||
auto reduce = [] (auto&& result, auto&& mutations) {
|
||||
std::move(mutations.begin(), mutations.end(), std::back_inserter(result));
|
||||
return std::move(result);
|
||||
};
|
||||
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
co_return co_await map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
}
|
||||
|
||||
std::vector<mutation>
|
||||
@@ -795,7 +802,7 @@ read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstr
|
||||
}
|
||||
return std::move(result);
|
||||
};
|
||||
return map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result{}, insert);
|
||||
co_return co_await map_reduce(keyspace_names.begin(), keyspace_names.end(), map, schema_result{}, insert);
|
||||
}
|
||||
|
||||
static
|
||||
@@ -805,31 +812,35 @@ future<mutation> query_partition_mutation(service::storage_proxy& proxy,
|
||||
partition_key pkey)
|
||||
{
|
||||
auto dk = dht::decorate_key(*s, pkey);
|
||||
return do_with(dht::partition_range::make_singular(dk), [&proxy, dk, s = std::move(s), cmd = std::move(cmd)] (auto& range) {
|
||||
return proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout, tracing::trace_state_ptr{})
|
||||
.then([dk = std::move(dk), s](rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> res_hit_rate) {
|
||||
auto range = dht::partition_range::make_singular(dk);
|
||||
{
|
||||
auto res_hit_rate = co_await proxy.query_mutations_locally(s, std::move(cmd), range, db::no_timeout, tracing::trace_state_ptr{});
|
||||
{
|
||||
auto&& [res, hit_rate] = res_hit_rate;
|
||||
auto&& partitions = res->partitions();
|
||||
if (partitions.size() == 0) {
|
||||
return mutation(s, std::move(dk));
|
||||
co_return mutation(s, std::move(dk));
|
||||
} else if (partitions.size() == 1) {
|
||||
return partitions[0].mut().unfreeze(s);
|
||||
co_return partitions[0].mut().unfreeze(s);
|
||||
} else {
|
||||
throw std::invalid_argument("Results must have at most one partition");
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name)
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name_, const sstring& keyspace_name_)
|
||||
{
|
||||
auto schema_table_name = schema_table_name_; // capture for co_await
|
||||
auto keyspace_name = keyspace_name_; // capture for co_await
|
||||
auto schema = proxy.local().get_db().local().find_schema(NAME, schema_table_name);
|
||||
auto keyspace_key = dht::decorate_key(*schema,
|
||||
partition_key::from_singular(*schema, keyspace_name));
|
||||
return db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key).then([keyspace_name] (auto&& rs) {
|
||||
return schema_result_value_type{keyspace_name, std::move(rs)};
|
||||
});
|
||||
auto rs = co_await db::system_keyspace::query(proxy, NAME, schema_table_name, keyspace_key);
|
||||
{
|
||||
co_return schema_result_value_type{keyspace_name, std::move(rs)};
|
||||
}
|
||||
}
|
||||
|
||||
future<mutation>
|
||||
@@ -843,7 +854,7 @@ read_schema_partition_for_table(distributed<service::storage_proxy>& proxy, sche
|
||||
.build();
|
||||
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.local().get_max_result_size(slice),
|
||||
query::row_limit(query::max_rows));
|
||||
return query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
|
||||
co_return co_await query_partition_mutation(proxy.local(), std::move(schema), std::move(cmd), std::move(keyspace_key));
|
||||
}
|
||||
|
||||
future<mutation>
|
||||
@@ -852,7 +863,7 @@ read_keyspace_mutation(distributed<service::storage_proxy>& proxy, const sstring
|
||||
auto key = partition_key::from_singular(*s, keyspace_name);
|
||||
auto slice = s->full_slice();
|
||||
auto cmd = make_lw_shared<query::read_command>(s->id(), s->version(), std::move(slice), proxy.local().get_max_result_size(slice));
|
||||
return query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
|
||||
co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
|
||||
}
|
||||
|
||||
static thread_local semaphore the_merge_lock {1};
|
||||
@@ -865,17 +876,34 @@ future<> merge_unlock() {
|
||||
return smp::submit_to(0, [] { the_merge_lock.signal(); });
|
||||
}
|
||||
|
||||
static future<> with_merge_lock(noncopyable_function<future<> ()> func) {
|
||||
co_await merge_lock();
|
||||
std::exception_ptr ep;
|
||||
try {
|
||||
co_await func();
|
||||
} catch (...) {
|
||||
ep = std::current_exception();
|
||||
}
|
||||
co_await merge_unlock();
|
||||
if (ep) {
|
||||
std::rethrow_exception(std::move(ep));
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
future<> update_schema_version_and_announce(distributed<service::storage_proxy>& proxy, schema_features features) {
|
||||
return calculate_schema_digest(proxy, features).then([&proxy] (utils::UUID uuid) {
|
||||
return db::system_keyspace::update_schema_version(uuid).then([&proxy, uuid] {
|
||||
return proxy.local().get_db().invoke_on_all([uuid] (database& db) {
|
||||
auto uuid = co_await calculate_schema_digest(proxy, features);
|
||||
{
|
||||
co_await db::system_keyspace::update_schema_version(uuid);
|
||||
{
|
||||
co_await proxy.local().get_db().invoke_on_all([uuid] (database& db) {
|
||||
db.update_version(uuid);
|
||||
});
|
||||
}).then([uuid] {
|
||||
}
|
||||
{
|
||||
slogger.info("Schema version changed to {}", uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -889,20 +917,17 @@ future<> update_schema_version_and_announce(distributed<service::storage_proxy>&
|
||||
*/
|
||||
future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
|
||||
{
|
||||
return merge_lock().then([&proxy, &feat, mutations = std::move(mutations)] () mutable {
|
||||
return do_merge_schema(proxy, std::move(mutations), true).then([&proxy, &feat] {
|
||||
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
});
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
co_await with_merge_lock([&] () mutable -> future<> {
|
||||
co_await do_merge_schema(proxy, std::move(mutations), true);
|
||||
{
|
||||
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
|
||||
return merge_lock().then([&proxy, &feat] {
|
||||
return update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
}).finally([] {
|
||||
return merge_unlock();
|
||||
co_await with_merge_lock([&] () -> future<> {
|
||||
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -910,12 +935,13 @@ future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy,
|
||||
future<std::vector<sstring>>
|
||||
static read_table_names_of_keyspace(distributed<service::storage_proxy>& proxy, const sstring& keyspace_name, schema_ptr schema_table) {
|
||||
auto pkey = dht::decorate_key(*schema_table, partition_key::from_singular(*schema_table, keyspace_name));
|
||||
return db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey).then([schema_table] (auto&& rs) {
|
||||
return boost::copy_range<std::vector<sstring>>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) {
|
||||
auto&& rs = co_await db::system_keyspace::query(proxy, schema_table->ks_name(), schema_table->cf_name(), pkey);
|
||||
{
|
||||
co_return boost::copy_range<std::vector<sstring>>(rs->rows() | boost::adaptors::transformed([schema_table] (const query::result_set_row& row) {
|
||||
const sstring name = schema_table->clustering_key_columns().begin()->name_as_text();
|
||||
return row.get_nonnull<sstring>(name);
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static utils::UUID table_id_from_mutations(const schema_mutations& sm) {
|
||||
@@ -924,21 +950,20 @@ static utils::UUID table_id_from_mutations(const schema_mutations& sm) {
|
||||
return table_row.get_nonnull<utils::UUID>("id");
|
||||
}
|
||||
|
||||
// Call inside a seastar thread
|
||||
static
|
||||
std::map<utils::UUID, schema_mutations>
|
||||
future<std::map<utils::UUID, schema_mutations>>
|
||||
read_tables_for_keyspaces(distributed<service::storage_proxy>& proxy, const std::set<sstring>& keyspace_names, schema_ptr s)
|
||||
{
|
||||
std::map<utils::UUID, schema_mutations> result;
|
||||
for (auto&& keyspace_name : keyspace_names) {
|
||||
for (auto&& table_name : read_table_names_of_keyspace(proxy, keyspace_name, s).get0()) {
|
||||
for (auto&& table_name : co_await read_table_names_of_keyspace(proxy, keyspace_name, s)) {
|
||||
auto qn = qualified_name(keyspace_name, table_name);
|
||||
auto muts = read_table_mutations(proxy, qn, s).get0();
|
||||
auto muts = co_await read_table_mutations(proxy, qn, s);
|
||||
auto id = table_id_from_mutations(muts);
|
||||
result.emplace(std::move(id), std::move(muts));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
co_return result;
|
||||
}
|
||||
|
||||
mutation compact_for_schema_digest(const mutation& m) {
|
||||
@@ -1009,7 +1034,7 @@ static void fill_column_info(const schema& table,
|
||||
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl) {
|
||||
// Skip "system*" tables -- only user-related tables are relevant
|
||||
if (static_cast<std::string_view>(s->ks_name()).starts_with(db::system_keyspace::NAME)) {
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
schema_ptr history_tbl = scylla_table_schema_history();
|
||||
|
||||
@@ -1030,12 +1055,12 @@ future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema
|
||||
fill_column_info(*s, ckey, cdef, ts, ttl, m);
|
||||
muts.emplace_back(std::move(m));
|
||||
}
|
||||
return proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
|
||||
co_await proxy.local().mutate_locally(std::move(muts), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
return seastar::async([&proxy, mutations = std::move(mutations), do_flush] () mutable {
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
schema_ptr s = keyspaces();
|
||||
// compare before/after schemas of the affected keyspaces only
|
||||
@@ -1050,55 +1075,56 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
}
|
||||
|
||||
// current state of the schema
|
||||
auto&& old_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
|
||||
auto&& old_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
|
||||
auto&& old_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
|
||||
auto&& old_views = read_tables_for_keyspaces(proxy, keyspaces, views());
|
||||
auto old_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
|
||||
auto&& old_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
|
||||
auto&& old_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
|
||||
auto&& old_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
|
||||
auto&& old_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
|
||||
auto old_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
|
||||
#if 0 // not in 2.1.8
|
||||
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
#endif
|
||||
|
||||
proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()).get0();
|
||||
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
|
||||
if (do_flush) {
|
||||
proxy.local().get_db().invoke_on_all([s, cfs = std::move(column_families)] (database& db) {
|
||||
return parallel_for_each(cfs.begin(), cfs.end(), [&db] (const utils::UUID& id) {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
auto& cfs = column_families;
|
||||
co_await parallel_for_each(cfs.begin(), cfs.end(), [&] (const utils::UUID& id) -> future<> {
|
||||
auto& cf = db.find_column_family(id);
|
||||
return cf.flush();
|
||||
co_await cf.flush();
|
||||
});
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
// with new data applied
|
||||
auto&& new_keyspaces = read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces).get0();
|
||||
auto&& new_column_families = read_tables_for_keyspaces(proxy, keyspaces, tables());
|
||||
auto&& new_types = read_schema_for_keyspaces(proxy, TYPES, keyspaces).get0();
|
||||
auto&& new_views = read_tables_for_keyspaces(proxy, keyspaces, views());
|
||||
auto new_functions = read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces).get0();
|
||||
auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);
|
||||
auto&& new_column_families = co_await read_tables_for_keyspaces(proxy, keyspaces, tables());
|
||||
auto&& new_types = co_await read_schema_for_keyspaces(proxy, TYPES, keyspaces);
|
||||
auto&& new_views = co_await read_tables_for_keyspaces(proxy, keyspaces, views());
|
||||
auto new_functions = co_await read_schema_for_keyspaces(proxy, FUNCTIONS, keyspaces);
|
||||
#if 0 // not in 2.1.8
|
||||
/*auto& new_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
#endif
|
||||
|
||||
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
|
||||
auto types_to_drop = merge_types(proxy, std::move(old_types), std::move(new_types));
|
||||
merge_tables_and_views(proxy,
|
||||
std::set<sstring> keyspaces_to_drop = co_await merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
|
||||
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
|
||||
co_await merge_tables_and_views(proxy,
|
||||
std::move(old_column_families), std::move(new_column_families),
|
||||
std::move(old_views), std::move(new_views));
|
||||
merge_functions(proxy, std::move(old_functions), std::move(new_functions));
|
||||
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
|
||||
#if 0
|
||||
mergeAggregates(oldAggregates, newAggregates);
|
||||
#endif
|
||||
types_to_drop.drop();
|
||||
co_await types_to_drop.drop();
|
||||
|
||||
proxy.local().get_db().invoke_on_all([keyspaces_to_drop = std::move(keyspaces_to_drop)] (database& db) {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
|
||||
return do_for_each(keyspaces_to_drop, [&db] (sstring keyspace_to_drop) {
|
||||
for (auto keyspace_to_drop : keyspaces_to_drop) {
|
||||
db.drop_keyspace(keyspace_to_drop);
|
||||
return db.get_notifier().drop_keyspace(keyspace_to_drop);
|
||||
});
|
||||
}).get0();
|
||||
});
|
||||
co_await db.get_notifier().drop_keyspace(keyspace_to_drop);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after)
|
||||
@@ -1132,24 +1158,25 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
|
||||
slogger.info("Altering keyspace {}", key);
|
||||
altered.emplace_back(key);
|
||||
}
|
||||
return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) mutable {
|
||||
return do_with(std::move(altered), [&proxy, &created](auto& altered) {
|
||||
return proxy.local().get_db().invoke_on_all([&created, &altered, &proxy] (database& db) {
|
||||
return do_for_each(created, [&db](auto&& val) {
|
||||
{
|
||||
{
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
for (auto&& val : created) {
|
||||
auto ksm = create_keyspace_from_schema_partition(val);
|
||||
return db.create_keyspace(ksm).then([&db, ksm] {
|
||||
return db.get_notifier().create_keyspace(ksm);
|
||||
});
|
||||
}).then([&altered, &db, &proxy]() {
|
||||
return do_for_each(altered, [&db, &proxy](auto& name) {
|
||||
return db.update_keyspace(proxy, name);
|
||||
});
|
||||
});
|
||||
co_await db.create_keyspace(ksm);
|
||||
co_await db.get_notifier().create_keyspace(ksm);
|
||||
}
|
||||
{
|
||||
for (auto& name : altered) {
|
||||
co_await db.update_keyspace(proxy, name);
|
||||
};
|
||||
}
|
||||
});
|
||||
});
|
||||
}).then([dropped = std::move(dropped)] () {
|
||||
return make_ready_future<std::set<sstring>>(dropped);
|
||||
});
|
||||
}
|
||||
}
|
||||
{
|
||||
co_return dropped;
|
||||
}
|
||||
}
|
||||
|
||||
struct schema_diff {
|
||||
@@ -1205,7 +1232,7 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
|
||||
// that when a base schema and a subset of its views are modified together (i.e.,
|
||||
// upon an alter table or alter type statement), then they are published together
|
||||
// as well, without any deferring in-between.
|
||||
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
std::map<utils::UUID, schema_mutations>&& tables_before,
|
||||
std::map<utils::UUID, schema_mutations>&& tables_after,
|
||||
std::map<utils::UUID, schema_mutations>&& views_before,
|
||||
@@ -1253,28 +1280,28 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
return vp;
|
||||
});
|
||||
|
||||
proxy.local().get_db().invoke_on_all([&] (database& db) {
|
||||
return seastar::async([&] {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
{
|
||||
// First drop views and *only then* the tables, if interleaved it can lead
|
||||
// to a mv not finding its schema when snapshoting since the main table
|
||||
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
|
||||
parallel_for_each(views_diff.dropped, [&] (schema_diff::dropped_schema& dt) {
|
||||
co_await parallel_for_each(views_diff.dropped, [&] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
auto& s = *dt.schema.get();
|
||||
return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
|
||||
}).get();
|
||||
parallel_for_each(tables_diff.dropped, [&] (schema_diff::dropped_schema& dt) {
|
||||
co_await db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
|
||||
});
|
||||
co_await parallel_for_each(tables_diff.dropped, [&] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
auto& s = *dt.schema.get();
|
||||
return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
|
||||
}).get();
|
||||
co_await db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
|
||||
});
|
||||
|
||||
// In order to avoid possible races we first create the tables and only then the views.
|
||||
// That way if a view seeks information about its base table it's guarantied to find it.
|
||||
parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) {
|
||||
return db.add_column_family_and_make_directory(gs);
|
||||
}).get();
|
||||
parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) {
|
||||
return db.add_column_family_and_make_directory(gs);
|
||||
}).get();
|
||||
co_await parallel_for_each(tables_diff.created, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await db.add_column_family_and_make_directory(gs);
|
||||
});
|
||||
co_await parallel_for_each(views_diff.created, [&] (global_schema_ptr& gs) -> future<> {
|
||||
co_await db.add_column_family_and_make_directory(gs);
|
||||
});
|
||||
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
|
||||
db.find_column_family(gs).mark_ready_for_writes();
|
||||
}
|
||||
@@ -1285,21 +1312,21 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
}
|
||||
|
||||
auto it = columns_changed.begin();
|
||||
auto notify = [&] (auto& r, auto&& f) {
|
||||
auto notify = [&] (auto& r, auto&& f) -> future<> {
|
||||
auto notifications = r | boost::adaptors::transformed(f);
|
||||
when_all(notifications.begin(), notifications.end()).get();
|
||||
co_await when_all(notifications.begin(), notifications.end());
|
||||
};
|
||||
// View drops are notified first, because a table can only be dropped if its views are already deleted
|
||||
notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
|
||||
notify(tables_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_column_family(dt.schema); });
|
||||
co_await notify(views_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_view(view_ptr(dt.schema)); });
|
||||
co_await notify(tables_diff.dropped, [&] (auto&& dt) { return db.get_notifier().drop_column_family(dt.schema); });
|
||||
// Table creations are notified first, in case a view is created right after the table
|
||||
notify(tables_diff.created, [&] (auto&& gs) { return db.get_notifier().create_column_family(gs); });
|
||||
notify(views_diff.created, [&] (auto&& gs) { return db.get_notifier().create_view(view_ptr(gs)); });
|
||||
co_await notify(tables_diff.created, [&] (auto&& gs) { return db.get_notifier().create_column_family(gs); });
|
||||
co_await notify(views_diff.created, [&] (auto&& gs) { return db.get_notifier().create_view(view_ptr(gs)); });
|
||||
// Table altering is notified first, in case new base columns appear
|
||||
notify(tables_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_column_family(altered.new_schema, *it++); });
|
||||
notify(views_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_view(view_ptr(altered.new_schema), *it++); });
|
||||
});
|
||||
}).get();
|
||||
co_await notify(tables_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_column_family(altered.new_schema, *it++); });
|
||||
co_await notify(views_diff.altered, [&] (auto&& altered) { return db.get_notifier().update_view(view_ptr(altered.new_schema), *it++); });
|
||||
}
|
||||
});
|
||||
|
||||
// Insert column_mapping into history table for altered and created tables.
|
||||
//
|
||||
@@ -1311,20 +1338,20 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
//
|
||||
// Drop column mapping entries for dropped tables since these will not be TTLed automatically
|
||||
// and will stay there forever if we don't clean them up manually
|
||||
when_all_succeed(
|
||||
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) {
|
||||
return store_column_mapping(proxy, gs.get(), false);
|
||||
co_await when_all_succeed(
|
||||
parallel_for_each(tables_diff.created, [&proxy] (global_schema_ptr& gs) -> future<> {
|
||||
co_await store_column_mapping(proxy, gs.get(), false);
|
||||
}),
|
||||
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) {
|
||||
return when_all_succeed(
|
||||
parallel_for_each(tables_diff.altered, [&proxy] (schema_diff::altered_schema& altered) -> future<> {
|
||||
co_await when_all_succeed(
|
||||
store_column_mapping(proxy, altered.old_schema.get(), true),
|
||||
store_column_mapping(proxy, altered.new_schema.get(), false)).discard_result();
|
||||
store_column_mapping(proxy, altered.new_schema.get(), false));
|
||||
}),
|
||||
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) {
|
||||
parallel_for_each(tables_diff.dropped, [&proxy] (schema_diff::dropped_schema& dropped) -> future<> {
|
||||
schema_ptr s = dropped.schema.get();
|
||||
return drop_column_mapping(s->id(), s->version());
|
||||
co_await drop_column_mapping(s->id(), s->version());
|
||||
})
|
||||
).get();
|
||||
);
|
||||
}
|
||||
|
||||
static std::vector<const query::result_set_row*> collect_rows(const std::set<sstring>& keys, const schema_result& result) {
|
||||
@@ -1451,7 +1478,7 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
|
||||
}
|
||||
|
||||
// see the comments for merge_keyspaces()
|
||||
[[nodiscard]] static user_types_to_drop merge_types(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after)
|
||||
static future<user_types_to_drop> merge_types(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after)
|
||||
{
|
||||
auto diff = diff_rows(before, after);
|
||||
|
||||
@@ -1459,28 +1486,29 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
|
||||
// use those types. Similarly, defer dropping until after tables/views that may use
|
||||
// some of these user types are dropped.
|
||||
|
||||
proxy.local().get_db().invoke_on_all([&diff] (database& db) {
|
||||
return seastar::async([&] {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
{
|
||||
for (auto&& user_type : create_types(db, diff.created)) {
|
||||
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
|
||||
db.get_notifier().create_user_type(user_type).get();
|
||||
co_await db.get_notifier().create_user_type(user_type);
|
||||
}
|
||||
for (auto&& user_type : create_types(db, diff.altered)) {
|
||||
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
|
||||
db.get_notifier().update_user_type(user_type).get();
|
||||
co_await db.get_notifier().update_user_type(user_type);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
co_return user_types_to_drop{[&proxy, before = std::move(before), rows = std::move(diff.dropped)] () mutable -> future<> {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) -> future<> {
|
||||
auto dropped = create_types(db, rows);
|
||||
{
|
||||
for (auto& user_type : dropped) {
|
||||
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
|
||||
co_await db.get_notifier().drop_user_type(user_type);
|
||||
}
|
||||
}
|
||||
});
|
||||
}).get();
|
||||
|
||||
return user_types_to_drop{[&proxy, before = std::move(before), rows = std::move(diff.dropped)] () mutable {
|
||||
proxy.local().get_db().invoke_on_all([&rows](database& db) {
|
||||
return do_with(create_types(db, rows), [&db] (auto &dropped) {
|
||||
return do_for_each(dropped, [&db](auto& user_type) {
|
||||
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
|
||||
return db.get_notifier().drop_user_type(user_type);
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -1571,11 +1599,11 @@ static shared_ptr<cql3::functions::user_function> create_func(database& db, cons
|
||||
row.get_nonnull<bool>("called_on_null_input"), std::move(bitcode), std::move(cfg));
|
||||
}
|
||||
|
||||
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
|
||||
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after,
|
||||
std::function<shared_ptr<cql3::functions::function>(database& db, const query::result_set_row& row)> create) {
|
||||
auto diff = diff_rows(before, after);
|
||||
|
||||
proxy.local().get_db().invoke_on_all([&diff, create] (database& db) {
|
||||
co_await proxy.local().get_db().invoke_on_all([&] (database& db) {
|
||||
for (const auto& val : diff.created) {
|
||||
cql3::functions::functions::add_function(create(db, *val));
|
||||
}
|
||||
@@ -1586,11 +1614,11 @@ static void merge_functions(distributed<service::storage_proxy>& proxy, schema_r
|
||||
for (const auto& val : diff.altered) {
|
||||
cql3::functions::functions::replace_function(create(db, *val));
|
||||
}
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
static void merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
|
||||
return merge_functions(proxy, before, after, create_func);
|
||||
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after) {
|
||||
co_await merge_functions(proxy, before, after, create_func);
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
@@ -2269,17 +2297,18 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
|
||||
|
||||
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s)
|
||||
{
|
||||
return when_all_succeed(
|
||||
read_schema_partition_for_table(proxy, s, table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, columns(), table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, view_virtual_columns(), table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, computed_columns(), table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, dropped_columns(), table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, indexes(), table.keyspace_name, table.table_name),
|
||||
read_schema_partition_for_table(proxy, scylla_tables(), table.keyspace_name, table.table_name)).then_unpack(
|
||||
[] (mutation cf_m, mutation col_m, mutation vv_col_m, mutation c_col_m, mutation dropped_m, mutation idx_m, mutation st_m) {
|
||||
return schema_mutations{std::move(cf_m), std::move(col_m), std::move(vv_col_m), std::move(c_col_m), std::move(idx_m), std::move(dropped_m), std::move(st_m)};
|
||||
});
|
||||
auto&& [cf_m, col_m, vv_col_m, c_col_m, dropped_m, idx_m, st_m] = co_await coroutine::all(
|
||||
[&] { return read_schema_partition_for_table(proxy, s, table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, columns(), table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, view_virtual_columns(), table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, computed_columns(), table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, dropped_columns(), table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, indexes(), table.keyspace_name, table.table_name); },
|
||||
[&] { return read_schema_partition_for_table(proxy, scylla_tables(), table.keyspace_name, table.table_name); }
|
||||
);
|
||||
{
|
||||
co_return schema_mutations{std::move(cf_m), std::move(col_m), std::move(vv_col_m), std::move(c_col_m), std::move(idx_m), std::move(dropped_m), std::move(st_m)};
|
||||
}
|
||||
#if 0
|
||||
// FIXME:
|
||||
Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName);
|
||||
@@ -2297,14 +2326,16 @@ static future<schema_mutations> read_table_mutations(distributed<service::storag
|
||||
|
||||
future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& proxy, const sstring& keyspace, const sstring& table)
|
||||
{
|
||||
return do_with(qualified_name(keyspace, table), [&proxy] (qualified_name& qn) {
|
||||
return read_table_mutations(proxy, qn, tables()).then([qn, &proxy] (schema_mutations sm) {
|
||||
auto qn = qualified_name(keyspace, table);
|
||||
{
|
||||
auto sm = co_await read_table_mutations(proxy, qn, tables());
|
||||
{
|
||||
if (!sm.live()) {
|
||||
throw std::runtime_error(format("{}:{} not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name));
|
||||
}
|
||||
return create_table_from_mutations(proxy, std::move(sm));
|
||||
});
|
||||
});
|
||||
co_return create_table_from_mutations(proxy, std::move(sm));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2314,14 +2345,16 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
|
||||
*/
|
||||
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
|
||||
{
|
||||
auto tables = make_lw_shared<std::map<sstring, schema_ptr>>();
|
||||
return parallel_for_each(result->rows().begin(), result->rows().end(), [&proxy, tables] (const query::result_set_row& row) {
|
||||
return create_table_from_table_row(proxy, row).then([tables] (schema_ptr&& cfm) {
|
||||
tables->emplace(cfm->cf_name(), std::move(cfm));
|
||||
});
|
||||
}).then([tables] {
|
||||
return std::move(*tables);
|
||||
auto tables = std::map<sstring, schema_ptr>();
|
||||
co_await parallel_for_each(result->rows().begin(), result->rows().end(), [&] (const query::result_set_row& row) -> future<> {
|
||||
schema_ptr cfm = co_await create_table_from_table_row(proxy, row);
|
||||
{
|
||||
tables.emplace(cfm->cf_name(), std::move(cfm));
|
||||
}
|
||||
});
|
||||
{
|
||||
co_return std::move(tables);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -2795,14 +2828,15 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
|
||||
|
||||
static future<view_ptr> create_view_from_table_row(distributed<service::storage_proxy>& proxy, const query::result_set_row& row) {
|
||||
qualified_name qn(row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("view_name"));
|
||||
return do_with(std::move(qn), [&proxy] (auto&& qn) {
|
||||
return read_table_mutations(proxy, qn, views()).then([&] (schema_mutations sm) {
|
||||
{
|
||||
schema_mutations sm = co_await read_table_mutations(proxy, qn, views());
|
||||
{
|
||||
if (!sm.live()) {
|
||||
throw std::runtime_error(format("{}:{} not found in the view definitions keyspace.", qn.keyspace_name, qn.table_name));
|
||||
}
|
||||
return create_view_from_mutations(proxy, std::move(sm));
|
||||
});
|
||||
});
|
||||
co_return create_view_from_mutations(proxy, std::move(sm));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2812,15 +2846,18 @@ static future<view_ptr> create_view_from_table_row(distributed<service::storage_
|
||||
*/
|
||||
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result)
|
||||
{
|
||||
return do_with(std::vector<view_ptr>(), [&] (auto& views) {
|
||||
return parallel_for_each(result->rows().begin(), result->rows().end(), [&proxy, &views] (auto&& row) {
|
||||
return create_view_from_table_row(proxy, row).then([&views] (auto&& v) {
|
||||
std::vector<view_ptr> views;
|
||||
{
|
||||
co_await parallel_for_each(result->rows().begin(), result->rows().end(), [&] (auto&& row) -> future<> {
|
||||
auto v = co_await create_view_from_table_row(proxy, row);
|
||||
{
|
||||
views.push_back(std::move(v));
|
||||
});
|
||||
}).then([&views] {
|
||||
return std::move(views);
|
||||
}
|
||||
});
|
||||
});
|
||||
{
|
||||
co_return std::move(views);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static schema_mutations make_view_mutations(view_ptr view, api::timestamp_type timestamp, bool with_columns)
|
||||
@@ -3141,13 +3178,13 @@ table_schema_version schema_mutations::digest() const {
|
||||
future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy,
|
||||
sstring keyspace_name, sstring table_name, schema_ptr s)
|
||||
{
|
||||
return read_schema_partition_for_table(proxy, s, keyspace_name, table_name)
|
||||
.then([&proxy, keyspace_name, table_name] (mutation cf_m) {
|
||||
return read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name)
|
||||
.then([cf_m = std::move(cf_m)] (mutation col_m) {
|
||||
return schema_mutations{std::move(cf_m), std::move(col_m)};
|
||||
});
|
||||
});
|
||||
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
|
||||
{
|
||||
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
|
||||
{
|
||||
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace legacy
|
||||
@@ -3156,17 +3193,17 @@ static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_or
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
|
||||
future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_version version) {
|
||||
auto cm_fut = qctx->qp().execute_internal(
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id, version}
|
||||
);
|
||||
return cm_fut.then([version] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
{
|
||||
if (results->empty()) {
|
||||
// If we don't have a stored column_mapping for an obsolete schema version
|
||||
// then it means it's way too old and been cleaned up already.
|
||||
// Fail the whole learn stage in this case.
|
||||
return make_exception_future<column_mapping>(std::runtime_error(
|
||||
co_return coroutine::make_exception(std::runtime_error(
|
||||
format("Failed to look up column mapping for schema version {}",
|
||||
version)));
|
||||
}
|
||||
@@ -3193,28 +3230,29 @@ future<column_mapping> get_column_mapping(utils::UUID table_id, table_schema_ver
|
||||
cm_columns.emplace_back(column_mapping_entry{def.name(), def.type});
|
||||
}
|
||||
column_mapping cm(std::move(cm_columns), static_columns.size());
|
||||
return make_ready_future<column_mapping>(std::move(cm));
|
||||
});
|
||||
co_return std::move(cm);
|
||||
}
|
||||
}
|
||||
|
||||
future<bool> column_mapping_exists(utils::UUID table_id, table_schema_version version) {
|
||||
return qctx->qp().execute_internal(
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await qctx->qp().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id, version}
|
||||
).then([] (shared_ptr<cql3::untyped_result_set> results) {
|
||||
return !results->empty();
|
||||
});
|
||||
);
|
||||
{
|
||||
co_return !results->empty();
|
||||
}
|
||||
}
|
||||
|
||||
future<> drop_column_mapping(utils::UUID table_id, table_schema_version version) {
|
||||
const static sstring DEL_COLUMN_MAPPING_QUERY =
|
||||
format("DELETE FROM system.{} WHERE cf_id = ? and schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
return qctx->qp().execute_internal(
|
||||
co_await qctx->qp().execute_internal(
|
||||
DEL_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id, version}).discard_result();
|
||||
{table_id, version});
|
||||
}
|
||||
|
||||
} // namespace schema_tables
|
||||
|
||||
Reference in New Issue
Block a user