|
|
|
|
@@ -151,7 +151,8 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
|
|
|
|
std::map<table_id, schema_mutations>&& tables_before,
|
|
|
|
|
std::map<table_id, schema_mutations>&& tables_after,
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_before,
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_after);
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_after,
|
|
|
|
|
bool reload);
|
|
|
|
|
|
|
|
|
|
struct [[nodiscard]] user_types_to_drop final {
|
|
|
|
|
seastar::noncopyable_function<future<> ()> drop;
|
|
|
|
|
@@ -164,7 +165,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
|
|
|
|
|
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
|
|
|
|
|
static future<> merge_aggregates(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after, schema_result scylla_before, schema_result scylla_after);
|
|
|
|
|
|
|
|
|
|
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
|
|
|
|
|
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush, bool reload);
|
|
|
|
|
|
|
|
|
|
using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
|
|
|
|
|
static computed_columns_map get_computed_columns(const schema_mutations& sm);
|
|
|
|
|
@@ -969,7 +970,7 @@ future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks
|
|
|
|
|
* @throws ConfigurationException If one of metadata attributes has invalid value
|
|
|
|
|
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
|
|
|
|
|
*/
|
|
|
|
|
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
|
|
|
|
|
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload)
|
|
|
|
|
{
|
|
|
|
|
if (this_shard_id() != 0) {
|
|
|
|
|
// mutations must be applied on the owning shard (0).
|
|
|
|
|
@@ -980,7 +981,7 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
|
|
|
|
|
}
|
|
|
|
|
co_await with_merge_lock([&] () mutable -> future<> {
|
|
|
|
|
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
|
|
|
|
|
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
|
|
|
|
|
co_await do_merge_schema(proxy, std::move(mutations), flush_schema, reload);
|
|
|
|
|
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
@@ -1222,7 +1223,7 @@ table_selector get_affected_tables(const sstring& keyspace_name, const mutation&
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
|
|
|
|
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush, bool reload)
|
|
|
|
|
{
|
|
|
|
|
slogger.trace("do_merge_schema: {}", mutations);
|
|
|
|
|
schema_ptr s = keyspaces();
|
|
|
|
|
@@ -1249,6 +1250,15 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
|
|
|
|
delete_schema_version(mutation);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (reload) {
|
|
|
|
|
for (auto&& ks : proxy.local().get_db().local().get_non_system_keyspaces()) {
|
|
|
|
|
keyspaces.emplace(ks);
|
|
|
|
|
table_selector sel;
|
|
|
|
|
sel.all_in_keyspace = true;
|
|
|
|
|
affected_tables[ks] = sel;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Resolve sel.all_in_keyspace == true to the actual list of tables and views.
|
|
|
|
|
for (auto&& [keyspace_name, sel] : affected_tables) {
|
|
|
|
|
if (sel.all_in_keyspace) {
|
|
|
|
|
@@ -1305,7 +1315,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
|
|
|
|
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
|
|
|
|
|
co_await merge_tables_and_views(proxy,
|
|
|
|
|
std::move(old_column_families), std::move(new_column_families),
|
|
|
|
|
std::move(old_views), std::move(new_views));
|
|
|
|
|
std::move(old_views), std::move(new_views), reload);
|
|
|
|
|
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
|
|
|
|
|
co_await merge_aggregates(proxy, std::move(old_aggregates), std::move(new_aggregates), std::move(old_scylla_aggregates), std::move(new_scylla_aggregates));
|
|
|
|
|
co_await types_to_drop.drop();
|
|
|
|
|
@@ -1409,6 +1419,7 @@ enum class schema_diff_side {
|
|
|
|
|
static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy,
|
|
|
|
|
std::map<table_id, schema_mutations>&& before,
|
|
|
|
|
std::map<table_id, schema_mutations>&& after,
|
|
|
|
|
bool reload,
|
|
|
|
|
noncopyable_function<schema_ptr (schema_mutations sm, schema_diff_side)> create_schema)
|
|
|
|
|
{
|
|
|
|
|
schema_diff d;
|
|
|
|
|
@@ -1429,6 +1440,13 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
|
|
|
|
|
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
|
|
|
|
d.altered.emplace_back(schema_diff::altered_schema{s_before, s});
|
|
|
|
|
}
|
|
|
|
|
if (reload) {
|
|
|
|
|
for (auto&& key: diff.entries_in_common) {
|
|
|
|
|
auto s = create_schema(std::move(after.at(key)), schema_diff_side::right);
|
|
|
|
|
slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
|
|
|
|
d.altered.emplace_back(schema_diff::altered_schema {s, s});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return d;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1441,12 +1459,13 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
|
|
|
|
std::map<table_id, schema_mutations>&& tables_before,
|
|
|
|
|
std::map<table_id, schema_mutations>&& tables_after,
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_before,
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_after)
|
|
|
|
|
std::map<table_id, schema_mutations>&& views_after,
|
|
|
|
|
bool reload)
|
|
|
|
|
{
|
|
|
|
|
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [&] (schema_mutations sm, schema_diff_side) {
|
|
|
|
|
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), reload, [&] (schema_mutations sm, schema_diff_side) {
|
|
|
|
|
return create_table_from_mutations(proxy, std::move(sm));
|
|
|
|
|
});
|
|
|
|
|
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm, schema_diff_side side) {
|
|
|
|
|
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), reload, [&] (schema_mutations sm, schema_diff_side side) {
|
|
|
|
|
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
|
|
|
|
|
// If we don't do it we are leaving a window where write commands to this schema are illegal.
|
|
|
|
|
// There are 3 possibilities:
|
|
|
|
|
|