From 4bfa605c383878ceea77014f7ea80b54c220ea1f Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 9 Mar 2021 16:27:34 +0100 Subject: [PATCH] Merge 'Fix inconsistencies in MV and SI (reworked)' from Eliran Sinvani This is a reworked submission of #7686 which has been reverted. This series fixes some race conditions in MV/SI schema creation and load, we spotted some places where a schema without a base table reference can sneak into the registry. This can cause to an unrecoverable error since write commands with those schemas can't be issued from other nodes. Most of those cases can occur on 2 main and uncommon cases, in a mixed cluster (during an upgrade) and in a small window after a view or base table altering. Fixes #7709 Closes #8091 * github.com:scylladb/scylla: database: Fix view schemas in place when loading global_schema_ptr: add support for view's base table materialized views: create view schemas with proper base table reference. materialized views: Extract fix legacy schema into its own logic (cherry picked from commit d473bc9b0669e9d3879ff0f329a9ed16378dd569) --- database.cc | 21 +++++++++--- db/schema_tables.cc | 58 +++++++++++++++++++++++++++------ db/schema_tables.hh | 4 ++- schema.cc | 3 ++ schema_registry.cc | 79 +++++++++++++++++++++++++++++++++------------ schema_registry.hh | 1 + 6 files changed, 131 insertions(+), 35 deletions(-) diff --git a/database.cc b/database.cc index 1c14e4cb5e..452e3f7f35 100644 --- a/database.cc +++ b/database.cc @@ -850,11 +850,22 @@ future<> database::parse_system_tables(distributed& prox }); }).then([&proxy, &mm, this] { return do_parse_schema_tables(proxy, db::schema_tables::VIEWS, [this, &proxy, &mm] (schema_result_value_type &v) { - return create_views_from_schema_partition(proxy, v.second).then([this, &mm] (std::vector views) { - return parallel_for_each(views.begin(), views.end(), [this, &mm] (auto&& v) { - return this->add_column_family_and_make_directory(v).then([this, &mm, v] { - return maybe_update_legacy_secondary_index_mv_schema(mm.local(), *this, v); - }); + return create_views_from_schema_partition(proxy, v.second).then([this, &mm, &proxy] (std::vector views) { + return parallel_for_each(views.begin(), views.end(), [this, &mm, &proxy] (auto&& v) { + // TODO: Remove once computed columns are guaranteed to be featured in the whole cluster. + // we fix here the schema in place in oreder to avoid races (write commands comming from other coordinators). + view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::yes); + view_ptr v_to_add = fixed_v ? fixed_v : v; + future<> f = this->add_column_family_and_make_directory(v_to_add); + if (bool(fixed_v)) { + v_to_add = fixed_v; + auto&& keyspace = find_keyspace(v->ks_name()).metadata(); + auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true); + f = f.then([this, &proxy, mutations = std::move(mutations)] { + return db::schema_tables::merge_schema(proxy, _feat, std::move(mutations)); + }); + } + return f; }); }); }); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index f895bef8b5..9dfae17944 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1204,7 +1204,42 @@ static void merge_tables_and_views(distributed& proxy, return create_table_from_mutations(proxy, std::move(sm)); }); auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm) { - return create_view_from_mutations(proxy, std::move(sm)); + // The view schema mutation should be created with reference to the base table schema because we definitely know it by now. + // If we don't do it we are leaving a window where write commands to this schema are illegal. + // There are 3 possibilities: + // 1. The table was altered - in this case we want the view to correspond to this new table schema. + // 2. The table was just created - the table is guarantied to be published with the view in that case. + // 3. The view itself was altered - in that case we already know the base table so we can take it from + // the database object. + view_ptr vp = create_view_from_mutations(proxy, std::move(sm)); + schema_ptr base_schema; + for (auto&& s : tables_diff.altered) { + if (s.new_schema.get()->ks_name() == vp->ks_name() && s.new_schema.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s.new_schema; + break; + } + } + if (!base_schema) { + for (auto&& s : tables_diff.created) { + if (s.get()->ks_name() == vp->ks_name() && s.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s; + break; + } + } + } + + if (!base_schema) { + base_schema = proxy.local().local_db().find_schema(vp->ks_name(), vp->view_info()->base_name()); + } + + // Now when we have a referenced base - just in case we are registering an old view (this can happen in a mixed cluster) + // lets make it write enabled by updating it's compute columns. + view_ptr fixed_vp = maybe_fix_legacy_secondary_index_mv_schema(proxy.local().get_db().local(), vp, base_schema, preserve_version::yes); + if(fixed_vp) { + vp = fixed_vp; + } + vp->view_info()->set_base_info(vp->view_info()->make_base_dependent_view_info(*base_schema)); + return vp; }); proxy.local().get_db().invoke_on_all([&] (database& db) { @@ -3032,8 +3067,7 @@ std::vector all_table_names(schema_features features) { boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); })); } -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v) { - // TODO(sarna): Remove once computed columns are guaranteed to be featured in the whole cluster. +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version) { // Legacy format for a secondary index used a hardcoded "token" column, which ensured a proper // order for indexed queries. This "token" column is now implemented as a computed column, // but for the sake of compatibility we assume that there might be indexes created in the legacy @@ -3041,26 +3075,32 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage // columns marked as computed (because they were either created on a node that supports computed // columns or were fixed by this utility function), it's safe to remove this function altogether. if (v->clustering_key_size() == 0) { - return make_ready_future<>(); + return view_ptr(nullptr); } const column_definition& first_view_ck = v->clustering_key_columns().front(); if (first_view_ck.is_computed()) { - return make_ready_future<>(); + return view_ptr(nullptr); + } + + if (!base_schema) { + base_schema = db.find_schema(v->view_info()->base_id()); } - table& base = db.find_column_family(v->view_info()->base_id()); - schema_ptr base_schema = base.schema(); // If the first clustering key part of a view is a column with name not found in base schema, // it implies it might be backing an index created before computed columns were introduced, // and as such it must be recreated properly. if (!base_schema->columns_by_name().contains(first_view_ck.name())) { schema_builder builder{schema_ptr(v)}; builder.mark_column_computed(first_view_ck.name(), std::make_unique()); - return mm.announce_view_update(view_ptr(builder.build())); + if (preserve_version) { + builder.with_version(v->version()); + } + return view_ptr(builder.build()); } - return make_ready_future<>(); + return view_ptr(nullptr); } + namespace legacy { table_schema_version schema_mutations::digest() const { diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 74d10cc9a1..0d51a88869 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -239,7 +239,9 @@ std::vector make_update_view_mutations(lw_shared_ptr make_drop_view_mutations(lw_shared_ptr keyspace, view_ptr view, api::timestamp_type timestamp); -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v); +class preserve_version_tag {}; +using preserve_version = bool_class; +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version); sstring serialize_kind(column_kind kind); column_kind deserialize_kind(sstring kind); diff --git a/schema.cc b/schema.cc index 2c9e32886c..74fd1352fb 100644 --- a/schema.cc +++ b/schema.cc @@ -456,6 +456,9 @@ schema::schema(const schema& o) rebuild(); if (o.is_view()) { _view_info = std::make_unique<::view_info>(*this, o.view_info()->raw()); + if (o.view_info()->base_info()) { + _view_info->set_base_info(o.view_info()->base_info()); + } } } diff --git a/schema_registry.cc b/schema_registry.cc index ea2e69c7b2..1f2b230c2c 100644 --- a/schema_registry.cc +++ b/schema_registry.cc @@ -24,6 +24,7 @@ #include "schema_registry.hh" #include "log.hh" #include "db/schema_tables.hh" +#include "view_info.hh" static logging::logger slogger("schema_registry"); @@ -274,22 +275,43 @@ global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept { assert(o._cpu_of_origin == current); _ptr = std::move(o._ptr); _cpu_of_origin = current; + _base_schema = std::move(o._base_schema); } schema_ptr global_schema_ptr::get() const { if (this_shard_id() == _cpu_of_origin) { return _ptr; } else { - // 'e' points to a foreign entry, but we know it won't be evicted - // because _ptr is preventing this. - const schema_registry_entry& e = *_ptr->registry_entry(); - schema_ptr s = local_schema_registry().get_or_null(e.version()); - if (!s) { - s = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { - return e.frozen(); - }); + auto registered_schema = [](const schema_registry_entry& e) { + schema_ptr ret = local_schema_registry().get_or_null(e.version()); + if (!ret) { + ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { + return e.frozen(); + }); + } + return ret; + }; + + schema_ptr registered_bs; + // the following code contains registry entry dereference of a foreign shard + // however, it is guarantied to succeed since we made sure in the constructor + // that _bs_schema and _ptr will have a registry on the foreign shard where this + // object originated so as long as this object lives the registry entries lives too + // and it is safe to reference them on foreign shards. + if (_base_schema) { + registered_bs = registered_schema(*_base_schema->registry_entry()); + if (_base_schema->registry_entry()->is_synced()) { + registered_bs->registry_entry()->mark_synced(); + } } - if (e.is_synced()) { + schema_ptr s = registered_schema(*_ptr->registry_entry()); + if (s->is_view()) { + if (!s->view_info()->base_info()) { + // we know that registered_bs is valid here because we make sure of it in the constructors. + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*registered_bs)); + } + } + if (_ptr->registry_entry()->is_synced()) { s->registry_entry()->mark_synced(); } return s; @@ -297,16 +319,33 @@ schema_ptr global_schema_ptr::get() const { } global_schema_ptr::global_schema_ptr(const schema_ptr& ptr) - : _ptr([&ptr]() { - // _ptr must always have an associated registry entry, - // if ptr doesn't, we need to load it into the registry. - schema_registry_entry* e = ptr->registry_entry(); + : _cpu_of_origin(this_shard_id()) { + // _ptr must always have an associated registry entry, + // if ptr doesn't, we need to load it into the registry. + auto ensure_registry_entry = [] (const schema_ptr& s) { + schema_registry_entry* e = s->registry_entry(); if (e) { - return ptr; - } - return local_schema_registry().get_or_load(ptr->version(), [&ptr] (table_schema_version) { - return frozen_schema(ptr); + return s; + } else { + return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) { + return frozen_schema(s); }); - }()) - , _cpu_of_origin(this_shard_id()) -{ } + } + }; + + schema_ptr s = ensure_registry_entry(ptr); + if (s->is_view()) { + if (s->view_info()->base_info()) { + _base_schema = ensure_registry_entry(s->view_info()->base_info()->base_schema()); + } else if (ptr->view_info()->base_info()) { + _base_schema = ensure_registry_entry(ptr->view_info()->base_info()->base_schema()); + } else { + on_internal_error(slogger, format("Tried to build a global schema for view {}.{} with an uninitialized base info", s->ks_name(), s->cf_name())); + } + + if (!s->view_info()->base_info() || !s->view_info()->base_info()->base_schema()->registry_entry()) { + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*_base_schema)); + } + } + _ptr = s; +} diff --git a/schema_registry.hh b/schema_registry.hh index 78aa02c50e..9b58f2f793 100644 --- a/schema_registry.hh +++ b/schema_registry.hh @@ -165,6 +165,7 @@ schema_registry& local_schema_registry(); // chain will last. class global_schema_ptr { schema_ptr _ptr; + schema_ptr _base_schema; unsigned _cpu_of_origin; public: // Note: the schema_ptr must come from the current shard and can't be nullptr.