migration_manager, schema_tables: Implement migration_manager::reload_schema()

Will recreate schema_ptr's from schema tables like during table
alter. Will be needed when digest calculation changes in reaction to
cluster feature at run time.
This commit is contained in:
Tomasz Grabiec
2023-06-29 01:08:51 +02:00
parent 9bfe9f0b2f
commit 0c86abab4d
4 changed files with 36 additions and 10 deletions

View File

@@ -151,7 +151,8 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
std::map<table_id, schema_mutations>&& tables_before,
std::map<table_id, schema_mutations>&& tables_after,
std::map<table_id, schema_mutations>&& views_before,
std::map<table_id, schema_mutations>&& views_after);
std::map<table_id, schema_mutations>&& views_after,
bool reload);
struct [[nodiscard]] user_types_to_drop final {
seastar::noncopyable_function<future<> ()> drop;
@@ -164,7 +165,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> merge_aggregates(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after, schema_result scylla_before, schema_result scylla_after);
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush, bool reload);
using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
static computed_columns_map get_computed_columns(const schema_mutations& sm);
@@ -969,7 +970,7 @@ future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload)
{
if (this_shard_id() != 0) {
// mutations must be applied on the owning shard (0).
@@ -980,7 +981,7 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
}
co_await with_merge_lock([&] () mutable -> future<> {
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
co_await do_merge_schema(proxy, std::move(mutations), flush_schema, reload);
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}
@@ -1222,7 +1223,7 @@ table_selector get_affected_tables(const sstring& keyspace_name, const mutation&
return result;
}
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush, bool reload)
{
slogger.trace("do_merge_schema: {}", mutations);
schema_ptr s = keyspaces();
@@ -1249,6 +1250,15 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
delete_schema_version(mutation);
}
if (reload) {
for (auto&& ks : proxy.local().get_db().local().get_non_system_keyspaces()) {
keyspaces.emplace(ks);
table_selector sel;
sel.all_in_keyspace = true;
affected_tables[ks] = sel;
}
}
// Resolve sel.all_in_keyspace == true to the actual list of tables and views.
for (auto&& [keyspace_name, sel] : affected_tables) {
if (sel.all_in_keyspace) {
@@ -1305,7 +1315,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
co_await merge_tables_and_views(proxy,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views));
std::move(old_views), std::move(new_views), reload);
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
co_await merge_aggregates(proxy, std::move(old_aggregates), std::move(new_aggregates), std::move(old_scylla_aggregates), std::move(new_scylla_aggregates));
co_await types_to_drop.drop();
@@ -1409,6 +1419,7 @@ enum class schema_diff_side {
static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy,
std::map<table_id, schema_mutations>&& before,
std::map<table_id, schema_mutations>&& after,
bool reload,
noncopyable_function<schema_ptr (schema_mutations sm, schema_diff_side)> create_schema)
{
schema_diff d;
@@ -1429,6 +1440,13 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff::altered_schema{s_before, s});
}
if (reload) {
for (auto&& key: diff.entries_in_common) {
auto s = create_schema(std::move(after.at(key)), schema_diff_side::right);
slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff::altered_schema {s, s});
}
}
return d;
}
@@ -1441,12 +1459,13 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
std::map<table_id, schema_mutations>&& tables_before,
std::map<table_id, schema_mutations>&& tables_after,
std::map<table_id, schema_mutations>&& views_before,
std::map<table_id, schema_mutations>&& views_after)
std::map<table_id, schema_mutations>&& views_after,
bool reload)
{
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [&] (schema_mutations sm, schema_diff_side) {
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), reload, [&] (schema_mutations sm, schema_diff_side) {
return create_table_from_mutations(proxy, std::move(sm));
});
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm, schema_diff_side side) {
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), reload, [&] (schema_mutations sm, schema_diff_side side) {
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
// If we don't do it we are leaving a window where write commands to this schema are illegal.
// There are 3 possibilities:

View File

@@ -189,7 +189,7 @@ future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, co
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations);
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);
// Recalculates the local schema version.
//

View File

@@ -368,6 +368,12 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations));
}
future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
std::vector<mutation> mutations;
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(mutations), true);
}
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations)
{
if (_as.abort_requested()) {

View File

@@ -94,6 +94,7 @@ public:
// Coalesces requests.
future<> merge_schema_from(netw::msg_addr);
future<> do_merge_schema_from(netw::msg_addr);
future<> reload_schema();
// Merge mutations received from src.
// Keep mutations alive around whole async operation.