diff --git a/api/storage_service.cc b/api/storage_service.cc index 7b38c8134c..6fbd58a528 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1106,10 +1106,7 @@ rest_sample_key_range(std::unique_ptr req) { static future rest_reset_local_schema(sharded& ss, std::unique_ptr req) { - // FIXME: We should truncate schema tables if more than one node in the cluster. - apilog.info("reset_local_schema"); - co_await ss.local().reload_schema(); - co_return json_void(); + throw std::runtime_error("Schema version recalculation is no longer supported"); } static diff --git a/db/schema_applier.cc b/db/schema_applier.cc index deb9849106..8a5d4e6770 100644 --- a/db/schema_applier.cc +++ b/db/schema_applier.cc @@ -213,36 +213,6 @@ static read_table_names_of_keyspace(sharded& proxy, cons }) | std::ranges::to(); } -// Applies deletion of the "version" column to system_schema.scylla_tables mutation rows -// which weren't committed by group 0. -static void maybe_delete_schema_version(mutation& m) { - if (m.column_family_id() != scylla_tables()->id()) { - return; - } - const column_definition& origin_col = *m.schema()->get_column_definition(to_bytes("committed_by_group0")); - const column_definition& version_col = *m.schema()->get_column_definition(to_bytes("version")); - for (auto&& row : m.partition().clustered_rows()) { - auto&& cells = row.row().cells(); - if (auto&& origin_cell = cells.find_cell(origin_col.id); origin_cell) { - auto&& ac = origin_cell->as_atomic_cell(origin_col); - if (ac.is_live()) { - auto dv = origin_col.type->deserialize(managed_bytes_view(ac.value())); - auto committed_by_group0 = value_cast(dv); - if (committed_by_group0) { - // Don't delete "version" for this entry. - continue; - } - } - } - auto&& cell = cells.find_cell(version_col.id); - api::timestamp_type t = api::new_timestamp(); - if (cell) { - t = std::max(t, cell->as_atomic_cell(version_col).timestamp()); - } - cells.apply(version_col, atomic_cell::make_dead(t, gc_clock::now())); - } -} - future<> schema_applier::merge_keyspaces() { /* @@ -486,7 +456,6 @@ enum class schema_diff_side { static schema_diff_per_shard diff_table_or_view(sharded& proxy, const std::map& before, const std::map& after, - bool reload, noncopyable_function create_schema) { schema_diff_per_shard d; @@ -507,13 +476,6 @@ static schema_diff_per_shard diff_table_or_view(sharded& slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version()); d.altered.emplace_back(schema_diff_per_shard::altered_schema{s_before, s}); } - if (reload) { - for (auto&& key: diff.entries_in_common) { - auto s = create_schema(std::move(after.at(key)), schema_diff_side::right); - slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version()); - d.altered.emplace_back(schema_diff_per_shard::altered_schema {s, s}); - } - } return d; } @@ -598,10 +560,10 @@ future<> schema_applier::merge_tables_and_views() // Create CDC tables before non-CDC base tables, because we want the base tables with CDC enabled // to point to their CDC tables. - local_cdc = diff_table_or_view(_proxy, _before.cdc, _after.cdc, _reload, [&] (schema_mutations sm, schema_diff_side) { + local_cdc = diff_table_or_view(_proxy, _before.cdc, _after.cdc, [&] (schema_mutations sm, schema_diff_side) { return create_table_from_mutations(_proxy, std::move(sm), user_types, nullptr); }); - local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, _reload, [&] (schema_mutations sm, schema_diff_side side) { + local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, [&] (schema_mutations sm, schema_diff_side side) { // If the table has CDC enabled, find the CDC schema version and set it in the table schema. // If the table is created or altered with CDC enabled, then the CDC // table is also created or altered in the same operation, so we can @@ -637,7 +599,7 @@ future<> schema_applier::merge_tables_and_views() return create_table_from_mutations(_proxy, std::move(sm), user_types, cdc_schema); }); - local_views = diff_table_or_view(_proxy, _before.views, _after.views, _reload, [&] (schema_mutations sm, schema_diff_side side) { + local_views = diff_table_or_view(_proxy, _before.views, _after.views, [&] (schema_mutations sm, schema_diff_side side) { // The view schema mutation should be created with reference to the base table schema because we definitely know it by now. // If we don't do it we are leaving a window where write commands to this schema are illegal. // There are 3 possibilities: @@ -888,15 +850,6 @@ future<> schema_applier::prepare(utils::chunked_vector& muts) { _keyspaces.emplace(std::move(keyspace_name)); } - if (_reload) { - for (auto&& ks : _proxy.local().get_db().local().get_non_system_keyspaces()) { - _keyspaces.emplace(ks); - table_selector sel; - sel.all_in_keyspace = true; - _affected_tables[ks] = sel; - } - } - // Resolve sel.all_in_keyspace == true to the actual list of tables and views. for (auto&& [keyspace_name, sel] : _affected_tables) { if (sel.all_in_keyspace) { @@ -913,12 +866,6 @@ future<> schema_applier::prepare(utils::chunked_vector& muts) { _before = co_await get_schema_persisted_state(); - for (auto& mut : muts) { - // We must force recalculation of schema version after the merge, since the resulting - // schema may be a mix of the old and new schemas, with the exception of entries - // that originate from group 0. - maybe_delete_schema_version(mut); - } } class pending_schema_getter : public service::schema_getter { @@ -1261,10 +1208,10 @@ static future<> execute_do_merge_schema(sharded& proxy, co_await ap.post_commit(); } -static future<> do_merge_schema(sharded& proxy, sharded& ss, sharded& sys_ks, utils::chunked_vector mutations, bool reload) +static future<> do_merge_schema(sharded& proxy, sharded& ss, sharded& sys_ks, utils::chunked_vector mutations) { slogger.trace("do_merge_schema: {}", mutations); - schema_applier ap(proxy, ss, sys_ks, reload); + schema_applier ap(proxy, ss, sys_ks); co_await execute_do_merge_schema(proxy, ap, std::move(mutations)).finally([&ap]() { return ap.destroy(); }); @@ -1279,17 +1226,17 @@ static future<> do_merge_schema(sharded& proxy, sharded * @throws ConfigurationException If one of metadata attributes has invalid value * @throws IOException If data was corrupted during transportation or failed to apply fs operations */ -future<> merge_schema(sharded& sys_ks, sharded& proxy, sharded& ss, utils::chunked_vector mutations, bool reload) +future<> merge_schema(sharded& sys_ks, sharded& proxy, sharded& ss, utils::chunked_vector mutations) { if (this_shard_id() != 0) { // mutations must be applied on the owning shard (0). co_await smp::submit_to(0, coroutine::lambda([&, fmuts = freeze(mutations)] () mutable -> future<> { - co_await merge_schema(sys_ks, proxy, ss, co_await unfreeze_gently(fmuts), reload); + co_await merge_schema(sys_ks, proxy, ss, co_await unfreeze_gently(fmuts)); })); co_return; } co_await with_merge_lock([&] () mutable -> future<> { - co_await do_merge_schema(proxy, ss, sys_ks, std::move(mutations), reload); + co_await do_merge_schema(proxy, ss, sys_ks, std::move(mutations)); auto version = co_await get_group0_schema_version(sys_ks.local()); co_await update_schema_version_and_announce(sys_ks, proxy, version); }); diff --git a/db/schema_applier.hh b/db/schema_applier.hh index 8753759b8d..6cc7715ace 100644 --- a/db/schema_applier.hh +++ b/db/schema_applier.hh @@ -29,7 +29,7 @@ namespace db { namespace schema_tables { -future<> merge_schema(sharded& sys_ks, sharded& proxy, sharded& ss, utils::chunked_vector mutations, bool reload = false); +future<> merge_schema(sharded& sys_ks, sharded& proxy, sharded& ss, utils::chunked_vector mutations); enum class table_kind { table, view }; @@ -166,7 +166,6 @@ class schema_applier { sharded& _proxy; sharded& _ss; sharded& _sys_ks; - const bool _reload; std::set _keyspaces; std::unordered_map _affected_tables; @@ -199,9 +198,8 @@ public: schema_applier( sharded& proxy, sharded& ss, - sharded& sys_ks, - bool reload = false) - : _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {}; + sharded& sys_ks) + : _proxy(proxy), _ss(ss), _sys_ks(sys_ks) {}; // Gets called before mutations are applied, // preferably no work should be done here but subsystem diff --git a/db/schema_features.hh b/db/schema_features.hh index 9b17d1ba3b..03a4fe9442 100644 --- a/db/schema_features.hh +++ b/db/schema_features.hh @@ -28,7 +28,7 @@ enum class schema_feature { // When enabled we'll add a new column to the `system_schema.scylla_tables` table. GROUP0_SCHEMA_VERSIONING, - // Unused. Defined for backward compatibility only + // Always enabled. Defined for backward compatibility. IN_MEMORY_TABLES, // Per-table tablet options diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 944a3b57ec..2804592e84 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -302,11 +302,8 @@ schema_ptr tables() { // Holds Scylla-specific table metadata. schema_ptr scylla_tables(schema_features features) { - static thread_local schema_ptr schemas[2]{}; + static thread_local schema_ptr s; - bool has_in_memory = features.contains(schema_feature::IN_MEMORY_TABLES); - - schema_ptr& s = schemas[has_in_memory]; if (!s) { auto id = generate_legacy_id(NAME, SCYLLA_TABLES); auto sb = schema_builder(this_smp_shard_count(), NAME, SCYLLA_TABLES, std::make_optional(id)) @@ -319,9 +316,7 @@ schema_ptr scylla_tables(schema_features features) { // PER_TABLE_PARTITIONERS sb.with_column("partitioner", utf8_type); - if (has_in_memory) { - sb.with_column("in_memory", boolean_type); - } + sb.with_column("in_memory", boolean_type); // If true, this table's latest schema was committed by group 0. // In this case `version` column is non-null and will be used for `schema::version()` instead of calculating a hash. @@ -850,7 +845,7 @@ read_keyspace_mutation(sharded& proxy, const sstring& ke co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key)); } -mutation compact_for_schema_digest(const mutation& m) { +mutation compact_for_comparison(const mutation& m) { // Cassandra is skipping tombstones from digest calculation // to avoid disagreements due to tombstone GC. // See https://issues.apache.org/jira/browse/CASSANDRA-6862. @@ -860,13 +855,6 @@ mutation compact_for_schema_digest(const mutation& m) { return m_compacted; } -void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features features) { - auto compacted = compact_for_schema_digest(m); - if (!features.contains() || !compacted.partition().empty()) { - feed_hash(h, compacted); - } -} - /// Helper function which fills a given mutation with column information /// provided the corresponding column_definition object. static void fill_column_info(const schema& table, @@ -2311,7 +2299,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations if (version) { builder.with_version(*version); } else { - builder.with_version(sm.digest(ctxt.features().cluster_schema_features())); + builder.with_version(sm.digest()); } if (cdc_schema) { @@ -2545,8 +2533,9 @@ static schema_builder prepare_view_schema_builder_from_mutations(const schema_ct if (version) { builder.with_version(*version); } else { - builder.with_version(sm.digest(ctxt.features().cluster_schema_features())); + builder.with_version(sm.digest()); } + return builder; } diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 17d161cfe2..ae07305db7 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -281,9 +281,7 @@ data_type parse_type(sstring str); sstring serialize_index_kind(index_metadata_kind kind); index_metadata_kind deserialize_index_kind(sstring kind); -mutation compact_for_schema_digest(const mutation& m); - -void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features); +mutation compact_for_comparison(const mutation& m); template std::optional> get_map(const query::result_set_row& row, const sstring& name) { diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 844e854fe5..9c90a7051e 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -180,7 +180,7 @@ db::schema_features feature_service::cluster_schema_features() const { f.set_if(aggregate_storage_options); f.set_if(table_digest_insensitive_to_expiry); f.set(); - f.set_if(bool(in_memory_tables)); + f.set(); f.set_if(bool(tablet_options)); f.set_if(bool(keyspace_multi_rf_change)); return f; diff --git a/main.cc b/main.cc index d56c154f5c..2b4717ee62 100644 --- a/main.cc +++ b/main.cc @@ -2096,10 +2096,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl return db.recover_logstor(); }).get(); - // Depends on all keyspaces being initialized because after this call - // we can be reloading schema. - mm.local().register_feature_listeners(); - checkpoint(stop_signal, "starting commit log"); auto cl = db.local().commitlog(); diff --git a/schema/schema_mutations.cc b/schema/schema_mutations.cc index aa05d34bae..e606225226 100644 --- a/schema/schema_mutations.cc +++ b/schema/schema_mutations.cc @@ -9,8 +9,6 @@ #include "schema_mutations.hh" #include "mutation/canonical_mutation.hh" #include "db/schema_tables.hh" -#include "utils/hashers.hh" -#include "utils/UUID_gen.hh" schema_mutations::schema_mutations(canonical_mutation columnfamilies, canonical_mutation columns, @@ -49,7 +47,7 @@ void schema_mutations::copy_to(utils::chunked_vector& dst) const { } } -table_schema_version schema_mutations::digest(db::schema_features sf) const { +table_schema_version schema_mutations::digest() const { if (_scylla_tables) { auto rs = query::result_set(*_scylla_tables); if (!rs.empty()) { @@ -61,39 +59,7 @@ table_schema_version schema_mutations::digest(db::schema_features sf) const { } } - md5_hasher h; - - if (!sf.contains()) { - // Disable this feature so that the digest remains compactible with Scylla - // versions prior to this feature. - // This digest affects the table schema version calculation and it's important - // that all nodes arrive at the same table schema version to avoid needless schema version - // pulls. It used to be the case that when table schema versions were calculated on boot we - // didn't yet know all the cluster features, so we could get different table versions after reboot - // in an already upgraded cluster. However, they are now available, and if - // TABLE_DIGEST_INSENSITIVE_TO_EXPIRY is enabled, we can compute with DIGEST_INSENSITIVE_TO_EXPIRY - // enabled. - sf.remove(); - } - - db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, sf); - db::schema_tables::feed_hash_for_schema_digest(h, _columns, sf); - if (_view_virtual_columns && !_view_virtual_columns->partition().empty()) { - db::schema_tables::feed_hash_for_schema_digest(h, *_view_virtual_columns, sf); - } - if (_computed_columns && !_computed_columns->partition().empty()) { - db::schema_tables::feed_hash_for_schema_digest(h, *_computed_columns, sf); - } - if (_indices && !_indices->partition().empty()) { - db::schema_tables::feed_hash_for_schema_digest(h, *_indices, sf); - } - if (_dropped_columns && !_dropped_columns->partition().empty()) { - db::schema_tables::feed_hash_for_schema_digest(h, *_dropped_columns, sf); - } - if (_scylla_tables) { - db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables, sf); - } - return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize())); + throw std::runtime_error("schema version not found in scylla_tables"); } std::optional schema_mutations::partitioner() const { @@ -110,11 +76,11 @@ static mutation_opt compact(const mutation_opt& m) { if (!m) { return m; } - return db::schema_tables::compact_for_schema_digest(*m); + return db::schema_tables::compact_for_comparison(*m); } static mutation_opt compact(const mutation& m) { - return db::schema_tables::compact_for_schema_digest(m); + return db::schema_tables::compact_for_comparison(m); } bool schema_mutations::operator==(const schema_mutations& other) const { diff --git a/schema/schema_mutations.hh b/schema/schema_mutations.hh index 72ef7d0def..51c8221ce5 100644 --- a/schema/schema_mutations.hh +++ b/schema/schema_mutations.hh @@ -12,7 +12,6 @@ #include "mutation/mutation.hh" #include "schema/schema_fwd.hh" #include "mutation/canonical_mutation.hh" -#include "db/schema_features.hh" // Commutative representation of table schema // Equality ignores tombstones. @@ -125,7 +124,7 @@ public: bool is_view() const; - table_schema_version digest(db::schema_features) const; + table_schema_version digest() const; std::optional partitioner() const; bool operator==(const schema_mutations&) const; diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 79a91adbb5..208d7c26c9 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -167,28 +167,6 @@ future<> migration_manager::uninit_messaging_service() co_await ser::migration_manager_rpc_verbs::unregister(&_messaging); } -void migration_manager::register_feature_listeners() { - auto reload_schema_in_bg = [this] { - (void) with_gate(_background_tasks, [this] { - return reload_schema().handle_exception([] (std::exception_ptr ep) { - // Due to features being unordered, reload might fail because - // some tables still have the wrong version and looking up e.g. - // the base-table of a view will fail. - mlogger.debug("Failed to reload schema: {}", ep); - }); - }); - }; - if (this_shard_id() == 0) { - for (const gms::feature& feature : { - std::cref(_feat.table_digest_insensitive_to_expiry)}) { - if (!feature) { - _feature_listeners.push_back(feature.when_enabled(reload_schema_in_bg)); - } - } - _feature_listeners.push_back(_feat.in_memory_tables.when_enabled(reload_schema_in_bg)); - } -} - void migration_notifier::register_listener(migration_listener* listener) { _listeners.add(listener); @@ -277,16 +255,6 @@ future<> migration_manager::merge_schema_from(locator::host_id src, const utils: co_await db::schema_tables::merge_schema(_sys_ks, proxy.container(), ss.get()->container(), std::move(mutations)); } -future<> migration_manager::reload_schema() { - mlogger.info("Reloading schema"); - auto ss = _ss.get_permit(); - if (!ss) { - co_return; - } - utils::chunked_vector mutations; - co_await db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), std::move(mutations), true); -} - future<> migration_notifier::on_schema_change(std::function notify, std::function describe_error) { return seastar::async([this, notify = std::move(notify), describe_error = std::move(describe_error)] { std::exception_ptr ex; diff --git a/service/migration_manager.hh b/service/migration_manager.hh index adf2e7f01d..fb376f7619 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -54,7 +54,6 @@ class migration_manager : public seastar::async_sharded_service _feature_listeners; seastar::named_gate _background_tasks; gms::feature_service& _feat; netw::messaging_service& _messaging; @@ -88,8 +87,6 @@ public: // Makes sure that this node knows about all schema changes known by "nodes" that were made prior to this call. future<> sync_schema(const replica::database& db, const std::vector& nodes); - future<> reload_schema(); - // Merge mutations received from src. // Keep mutations alive around whole async operation. future<> merge_schema_from(locator::host_id src, const utils::chunked_vector& mutations); @@ -170,8 +167,6 @@ private: future<> announce_with_raft(utils::chunked_vector schema, group0_guard, std::string_view description, std::optional timeout); public: - void register_feature_listeners(); - // Returns schema of given version, either from cache or from remote node identified by 'from'. // The returned schema may not be synchronized. See schema::is_synced(). // Intended to be used in the read path. diff --git a/service/storage_service.cc b/service/storage_service.cc index bffef7266f..6928a4572a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2936,16 +2936,6 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); } -future<> storage_service::reload_schema() { - // Flush memtables and clear cache so that we use the same state we would after node restart - // to rule out potential discrepancies which could stem from merging with memtable/cache readers. - co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME); - co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME); - co_await _migration_manager.invoke_on(0, [] (auto& mm) { - return mm.reload_schema(); - }); -} - future<> storage_service::drain() { return run_with_api_lock(sstring("drain"), [] (storage_service& ss) { if (ss._operation_mode == mode::DRAINED) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 7b6bac8284..d1a28bef11 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -764,9 +764,6 @@ public: */ future<> drain(); - // Recalculates schema digests on this node from contents of tables on disk. - future<> reload_schema(); - future> get_ownership(); future> effective_ownership(sstring keyspace_name, sstring table_name);