mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
migration_manager, schema_tables: Implement migration_manager::reload_schema()
Will recreate schema_ptr's from schema tables like during table alter. Will be needed when digest calculation changes in reaction to cluster feature at run time.
This commit is contained in:
@@ -151,7 +151,8 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
std::map<table_id, schema_mutations>&& tables_before,
|
||||
std::map<table_id, schema_mutations>&& tables_after,
|
||||
std::map<table_id, schema_mutations>&& views_before,
|
||||
std::map<table_id, schema_mutations>&& views_after);
|
||||
std::map<table_id, schema_mutations>&& views_after,
|
||||
bool reload);
|
||||
|
||||
struct [[nodiscard]] user_types_to_drop final {
|
||||
seastar::noncopyable_function<future<> ()> drop;
|
||||
@@ -164,7 +165,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
|
||||
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
|
||||
static future<> merge_aggregates(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after, schema_result scylla_before, schema_result scylla_after);
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush, bool reload);
|
||||
|
||||
using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
|
||||
static computed_columns_map get_computed_columns(const schema_mutations& sm);
|
||||
@@ -969,7 +970,7 @@ future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks
|
||||
* @throws ConfigurationException If one of metadata attributes has invalid value
|
||||
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
|
||||
*/
|
||||
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
|
||||
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload)
|
||||
{
|
||||
if (this_shard_id() != 0) {
|
||||
// mutations must be applied on the owning shard (0).
|
||||
@@ -980,7 +981,7 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
|
||||
}
|
||||
co_await with_merge_lock([&] () mutable -> future<> {
|
||||
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
|
||||
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
|
||||
co_await do_merge_schema(proxy, std::move(mutations), flush_schema, reload);
|
||||
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
|
||||
});
|
||||
}
|
||||
@@ -1222,7 +1223,7 @@ table_selector get_affected_tables(const sstring& keyspace_name, const mutation&
|
||||
return result;
|
||||
}
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush, bool reload)
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
schema_ptr s = keyspaces();
|
||||
@@ -1249,6 +1250,15 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
delete_schema_version(mutation);
|
||||
}
|
||||
|
||||
if (reload) {
|
||||
for (auto&& ks : proxy.local().get_db().local().get_non_system_keyspaces()) {
|
||||
keyspaces.emplace(ks);
|
||||
table_selector sel;
|
||||
sel.all_in_keyspace = true;
|
||||
affected_tables[ks] = sel;
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve sel.all_in_keyspace == true to the actual list of tables and views.
|
||||
for (auto&& [keyspace_name, sel] : affected_tables) {
|
||||
if (sel.all_in_keyspace) {
|
||||
@@ -1305,7 +1315,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
|
||||
co_await merge_tables_and_views(proxy,
|
||||
std::move(old_column_families), std::move(new_column_families),
|
||||
std::move(old_views), std::move(new_views));
|
||||
std::move(old_views), std::move(new_views), reload);
|
||||
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
|
||||
co_await merge_aggregates(proxy, std::move(old_aggregates), std::move(new_aggregates), std::move(old_scylla_aggregates), std::move(new_scylla_aggregates));
|
||||
co_await types_to_drop.drop();
|
||||
@@ -1409,6 +1419,7 @@ enum class schema_diff_side {
|
||||
static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy,
|
||||
std::map<table_id, schema_mutations>&& before,
|
||||
std::map<table_id, schema_mutations>&& after,
|
||||
bool reload,
|
||||
noncopyable_function<schema_ptr (schema_mutations sm, schema_diff_side)> create_schema)
|
||||
{
|
||||
schema_diff d;
|
||||
@@ -1429,6 +1440,13 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
|
||||
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
||||
d.altered.emplace_back(schema_diff::altered_schema{s_before, s});
|
||||
}
|
||||
if (reload) {
|
||||
for (auto&& key: diff.entries_in_common) {
|
||||
auto s = create_schema(std::move(after.at(key)), schema_diff_side::right);
|
||||
slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
|
||||
d.altered.emplace_back(schema_diff::altered_schema {s, s});
|
||||
}
|
||||
}
|
||||
return d;
|
||||
}
|
||||
|
||||
@@ -1441,12 +1459,13 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
std::map<table_id, schema_mutations>&& tables_before,
|
||||
std::map<table_id, schema_mutations>&& tables_after,
|
||||
std::map<table_id, schema_mutations>&& views_before,
|
||||
std::map<table_id, schema_mutations>&& views_after)
|
||||
std::map<table_id, schema_mutations>&& views_after,
|
||||
bool reload)
|
||||
{
|
||||
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [&] (schema_mutations sm, schema_diff_side) {
|
||||
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), reload, [&] (schema_mutations sm, schema_diff_side) {
|
||||
return create_table_from_mutations(proxy, std::move(sm));
|
||||
});
|
||||
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm, schema_diff_side side) {
|
||||
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), reload, [&] (schema_mutations sm, schema_diff_side side) {
|
||||
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
|
||||
// If we don't do it we are leaving a window where write commands to this schema are illegal.
|
||||
// There are 3 possibilities:
|
||||
|
||||
@@ -189,7 +189,7 @@ future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, co
|
||||
// Must be called on shard 0.
|
||||
future<semaphore_units<>> hold_merge_lock() noexcept;
|
||||
|
||||
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations);
|
||||
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);
|
||||
|
||||
// Recalculates the local schema version.
|
||||
//
|
||||
|
||||
@@ -368,6 +368,12 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
|
||||
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::reload_schema() {
|
||||
mlogger.info("Reloading schema");
|
||||
std::vector<mutation> mutations;
|
||||
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(mutations), true);
|
||||
}
|
||||
|
||||
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations)
|
||||
{
|
||||
if (_as.abort_requested()) {
|
||||
|
||||
@@ -94,6 +94,7 @@ public:
|
||||
// Coalesces requests.
|
||||
future<> merge_schema_from(netw::msg_addr);
|
||||
future<> do_merge_schema_from(netw::msg_addr);
|
||||
future<> reload_schema();
|
||||
|
||||
// Merge mutations received from src.
|
||||
// Keep mutations alive around whole async operation.
|
||||
|
||||
Reference in New Issue
Block a user