diff --git a/database.cc b/database.cc index 1c14e4cb5e..452e3f7f35 100644 --- a/database.cc +++ b/database.cc @@ -850,11 +850,22 @@ future<> database::parse_system_tables(distributed& prox }); }).then([&proxy, &mm, this] { return do_parse_schema_tables(proxy, db::schema_tables::VIEWS, [this, &proxy, &mm] (schema_result_value_type &v) { - return create_views_from_schema_partition(proxy, v.second).then([this, &mm] (std::vector views) { - return parallel_for_each(views.begin(), views.end(), [this, &mm] (auto&& v) { - return this->add_column_family_and_make_directory(v).then([this, &mm, v] { - return maybe_update_legacy_secondary_index_mv_schema(mm.local(), *this, v); - }); + return create_views_from_schema_partition(proxy, v.second).then([this, &mm, &proxy] (std::vector views) { + return parallel_for_each(views.begin(), views.end(), [this, &mm, &proxy] (auto&& v) { + // TODO: Remove once computed columns are guaranteed to be featured in the whole cluster. + // we fix here the schema in place in oreder to avoid races (write commands comming from other coordinators). + view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::yes); + view_ptr v_to_add = fixed_v ? fixed_v : v; + future<> f = this->add_column_family_and_make_directory(v_to_add); + if (bool(fixed_v)) { + v_to_add = fixed_v; + auto&& keyspace = find_keyspace(v->ks_name()).metadata(); + auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true); + f = f.then([this, &proxy, mutations = std::move(mutations)] { + return db::schema_tables::merge_schema(proxy, _feat, std::move(mutations)); + }); + } + return f; }); }); }); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index f895bef8b5..9dfae17944 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1204,7 +1204,42 @@ static void merge_tables_and_views(distributed& proxy, return create_table_from_mutations(proxy, std::move(sm)); }); auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm) { - return create_view_from_mutations(proxy, std::move(sm)); + // The view schema mutation should be created with reference to the base table schema because we definitely know it by now. + // If we don't do it we are leaving a window where write commands to this schema are illegal. + // There are 3 possibilities: + // 1. The table was altered - in this case we want the view to correspond to this new table schema. + // 2. The table was just created - the table is guarantied to be published with the view in that case. + // 3. The view itself was altered - in that case we already know the base table so we can take it from + // the database object. + view_ptr vp = create_view_from_mutations(proxy, std::move(sm)); + schema_ptr base_schema; + for (auto&& s : tables_diff.altered) { + if (s.new_schema.get()->ks_name() == vp->ks_name() && s.new_schema.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s.new_schema; + break; + } + } + if (!base_schema) { + for (auto&& s : tables_diff.created) { + if (s.get()->ks_name() == vp->ks_name() && s.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s; + break; + } + } + } + + if (!base_schema) { + base_schema = proxy.local().local_db().find_schema(vp->ks_name(), vp->view_info()->base_name()); + } + + // Now when we have a referenced base - just in case we are registering an old view (this can happen in a mixed cluster) + // lets make it write enabled by updating it's compute columns. + view_ptr fixed_vp = maybe_fix_legacy_secondary_index_mv_schema(proxy.local().get_db().local(), vp, base_schema, preserve_version::yes); + if(fixed_vp) { + vp = fixed_vp; + } + vp->view_info()->set_base_info(vp->view_info()->make_base_dependent_view_info(*base_schema)); + return vp; }); proxy.local().get_db().invoke_on_all([&] (database& db) { @@ -3032,8 +3067,7 @@ std::vector all_table_names(schema_features features) { boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); })); } -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v) { - // TODO(sarna): Remove once computed columns are guaranteed to be featured in the whole cluster. +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version) { // Legacy format for a secondary index used a hardcoded "token" column, which ensured a proper // order for indexed queries. This "token" column is now implemented as a computed column, // but for the sake of compatibility we assume that there might be indexes created in the legacy @@ -3041,26 +3075,32 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage // columns marked as computed (because they were either created on a node that supports computed // columns or were fixed by this utility function), it's safe to remove this function altogether. if (v->clustering_key_size() == 0) { - return make_ready_future<>(); + return view_ptr(nullptr); } const column_definition& first_view_ck = v->clustering_key_columns().front(); if (first_view_ck.is_computed()) { - return make_ready_future<>(); + return view_ptr(nullptr); + } + + if (!base_schema) { + base_schema = db.find_schema(v->view_info()->base_id()); } - table& base = db.find_column_family(v->view_info()->base_id()); - schema_ptr base_schema = base.schema(); // If the first clustering key part of a view is a column with name not found in base schema, // it implies it might be backing an index created before computed columns were introduced, // and as such it must be recreated properly. if (!base_schema->columns_by_name().contains(first_view_ck.name())) { schema_builder builder{schema_ptr(v)}; builder.mark_column_computed(first_view_ck.name(), std::make_unique()); - return mm.announce_view_update(view_ptr(builder.build())); + if (preserve_version) { + builder.with_version(v->version()); + } + return view_ptr(builder.build()); } - return make_ready_future<>(); + return view_ptr(nullptr); } + namespace legacy { table_schema_version schema_mutations::digest() const { diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 74d10cc9a1..0d51a88869 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -239,7 +239,9 @@ std::vector make_update_view_mutations(lw_shared_ptr make_drop_view_mutations(lw_shared_ptr keyspace, view_ptr view, api::timestamp_type timestamp); -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v); +class preserve_version_tag {}; +using preserve_version = bool_class; +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version); sstring serialize_kind(column_kind kind); column_kind deserialize_kind(sstring kind); diff --git a/schema.cc b/schema.cc index 2c9e32886c..74fd1352fb 100644 --- a/schema.cc +++ b/schema.cc @@ -456,6 +456,9 @@ schema::schema(const schema& o) rebuild(); if (o.is_view()) { _view_info = std::make_unique<::view_info>(*this, o.view_info()->raw()); + if (o.view_info()->base_info()) { + _view_info->set_base_info(o.view_info()->base_info()); + } } } diff --git a/schema_registry.cc b/schema_registry.cc index ea2e69c7b2..1f2b230c2c 100644 --- a/schema_registry.cc +++ b/schema_registry.cc @@ -24,6 +24,7 @@ #include "schema_registry.hh" #include "log.hh" #include "db/schema_tables.hh" +#include "view_info.hh" static logging::logger slogger("schema_registry"); @@ -274,22 +275,43 @@ global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept { assert(o._cpu_of_origin == current); _ptr = std::move(o._ptr); _cpu_of_origin = current; + _base_schema = std::move(o._base_schema); } schema_ptr global_schema_ptr::get() const { if (this_shard_id() == _cpu_of_origin) { return _ptr; } else { - // 'e' points to a foreign entry, but we know it won't be evicted - // because _ptr is preventing this. - const schema_registry_entry& e = *_ptr->registry_entry(); - schema_ptr s = local_schema_registry().get_or_null(e.version()); - if (!s) { - s = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { - return e.frozen(); - }); + auto registered_schema = [](const schema_registry_entry& e) { + schema_ptr ret = local_schema_registry().get_or_null(e.version()); + if (!ret) { + ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { + return e.frozen(); + }); + } + return ret; + }; + + schema_ptr registered_bs; + // the following code contains registry entry dereference of a foreign shard + // however, it is guarantied to succeed since we made sure in the constructor + // that _bs_schema and _ptr will have a registry on the foreign shard where this + // object originated so as long as this object lives the registry entries lives too + // and it is safe to reference them on foreign shards. + if (_base_schema) { + registered_bs = registered_schema(*_base_schema->registry_entry()); + if (_base_schema->registry_entry()->is_synced()) { + registered_bs->registry_entry()->mark_synced(); + } } - if (e.is_synced()) { + schema_ptr s = registered_schema(*_ptr->registry_entry()); + if (s->is_view()) { + if (!s->view_info()->base_info()) { + // we know that registered_bs is valid here because we make sure of it in the constructors. + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*registered_bs)); + } + } + if (_ptr->registry_entry()->is_synced()) { s->registry_entry()->mark_synced(); } return s; @@ -297,16 +319,33 @@ schema_ptr global_schema_ptr::get() const { } global_schema_ptr::global_schema_ptr(const schema_ptr& ptr) - : _ptr([&ptr]() { - // _ptr must always have an associated registry entry, - // if ptr doesn't, we need to load it into the registry. - schema_registry_entry* e = ptr->registry_entry(); + : _cpu_of_origin(this_shard_id()) { + // _ptr must always have an associated registry entry, + // if ptr doesn't, we need to load it into the registry. + auto ensure_registry_entry = [] (const schema_ptr& s) { + schema_registry_entry* e = s->registry_entry(); if (e) { - return ptr; - } - return local_schema_registry().get_or_load(ptr->version(), [&ptr] (table_schema_version) { - return frozen_schema(ptr); + return s; + } else { + return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) { + return frozen_schema(s); }); - }()) - , _cpu_of_origin(this_shard_id()) -{ } + } + }; + + schema_ptr s = ensure_registry_entry(ptr); + if (s->is_view()) { + if (s->view_info()->base_info()) { + _base_schema = ensure_registry_entry(s->view_info()->base_info()->base_schema()); + } else if (ptr->view_info()->base_info()) { + _base_schema = ensure_registry_entry(ptr->view_info()->base_info()->base_schema()); + } else { + on_internal_error(slogger, format("Tried to build a global schema for view {}.{} with an uninitialized base info", s->ks_name(), s->cf_name())); + } + + if (!s->view_info()->base_info() || !s->view_info()->base_info()->base_schema()->registry_entry()) { + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*_base_schema)); + } + } + _ptr = s; +} diff --git a/schema_registry.hh b/schema_registry.hh index 78aa02c50e..9b58f2f793 100644 --- a/schema_registry.hh +++ b/schema_registry.hh @@ -165,6 +165,7 @@ schema_registry& local_schema_registry(); // chain will last. class global_schema_ptr { schema_ptr _ptr; + schema_ptr _base_schema; unsigned _cpu_of_origin; public: // Note: the schema_ptr must come from the current shard and can't be nullptr.