Merge 'Remove legacy MD5-based schema version computation' from Gleb Natapov

Now that all tables have their version stored in scylla_tables (ensured by ensure_committed_by_group0), the MD5 hash fallback in schema_mutations::digest() is dead code. Remove it along with feed_hash_for_schema_digest() and maybe_delete_schema_version(). Also include clenaups that became possible because of this code path removal.

No need to backport, code removal.

Closes scylladb/scylladb#30110

* https://github.com/scylladb/scylladb:
  schema: remove register_feature_listeners
  schema: remove reload_schema and schema recalculation support
  schema: make in_memory_tables feature unconditional
  schema: rename compact_for_schema_digest() to compact_for_comparison()
  schema: remove legacy MD5-based schema version computation
This commit is contained in:
Patryk Jędrzejczak
2026-06-02 13:32:06 +02:00
14 changed files with 26 additions and 186 deletions

View File

@@ -1106,10 +1106,7 @@ rest_sample_key_range(std::unique_ptr<http::request> req) {
static
future<json::json_return_type>
rest_reset_local_schema(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
// FIXME: We should truncate schema tables if more than one node in the cluster.
apilog.info("reset_local_schema");
co_await ss.local().reload_schema();
co_return json_void();
throw std::runtime_error("Schema version recalculation is no longer supported");
}
static

View File

@@ -213,36 +213,6 @@ static read_table_names_of_keyspace(sharded<service::storage_proxy>& proxy, cons
}) | std::ranges::to<std::vector>();
}
// Applies deletion of the "version" column to system_schema.scylla_tables mutation rows
// which weren't committed by group 0.
static void maybe_delete_schema_version(mutation& m) {
if (m.column_family_id() != scylla_tables()->id()) {
return;
}
const column_definition& origin_col = *m.schema()->get_column_definition(to_bytes("committed_by_group0"));
const column_definition& version_col = *m.schema()->get_column_definition(to_bytes("version"));
for (auto&& row : m.partition().clustered_rows()) {
auto&& cells = row.row().cells();
if (auto&& origin_cell = cells.find_cell(origin_col.id); origin_cell) {
auto&& ac = origin_cell->as_atomic_cell(origin_col);
if (ac.is_live()) {
auto dv = origin_col.type->deserialize(managed_bytes_view(ac.value()));
auto committed_by_group0 = value_cast<bool>(dv);
if (committed_by_group0) {
// Don't delete "version" for this entry.
continue;
}
}
}
auto&& cell = cells.find_cell(version_col.id);
api::timestamp_type t = api::new_timestamp();
if (cell) {
t = std::max(t, cell->as_atomic_cell(version_col).timestamp());
}
cells.apply(version_col, atomic_cell::make_dead(t, gc_clock::now()));
}
}
future<> schema_applier::merge_keyspaces()
{
/*
@@ -486,7 +456,6 @@ enum class schema_diff_side {
static schema_diff_per_shard diff_table_or_view(sharded<service::storage_proxy>& proxy,
const std::map<table_id, schema_mutations>& before,
const std::map<table_id, schema_mutations>& after,
bool reload,
noncopyable_function<schema_ptr (schema_mutations sm, schema_diff_side)> create_schema)
{
schema_diff_per_shard d;
@@ -507,13 +476,6 @@ static schema_diff_per_shard diff_table_or_view(sharded<service::storage_proxy>&
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff_per_shard::altered_schema{s_before, s});
}
if (reload) {
for (auto&& key: diff.entries_in_common) {
auto s = create_schema(std::move(after.at(key)), schema_diff_side::right);
slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff_per_shard::altered_schema {s, s});
}
}
return d;
}
@@ -598,10 +560,10 @@ future<> schema_applier::merge_tables_and_views()
// Create CDC tables before non-CDC base tables, because we want the base tables with CDC enabled
// to point to their CDC tables.
local_cdc = diff_table_or_view(_proxy, _before.cdc, _after.cdc, _reload, [&] (schema_mutations sm, schema_diff_side) {
local_cdc = diff_table_or_view(_proxy, _before.cdc, _after.cdc, [&] (schema_mutations sm, schema_diff_side) {
return create_table_from_mutations(_proxy, std::move(sm), user_types, nullptr);
});
local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, _reload, [&] (schema_mutations sm, schema_diff_side side) {
local_tables = diff_table_or_view(_proxy, _before.tables, _after.tables, [&] (schema_mutations sm, schema_diff_side side) {
// If the table has CDC enabled, find the CDC schema version and set it in the table schema.
// If the table is created or altered with CDC enabled, then the CDC
// table is also created or altered in the same operation, so we can
@@ -637,7 +599,7 @@ future<> schema_applier::merge_tables_and_views()
return create_table_from_mutations(_proxy, std::move(sm), user_types, cdc_schema);
});
local_views = diff_table_or_view(_proxy, _before.views, _after.views, _reload, [&] (schema_mutations sm, schema_diff_side side) {
local_views = diff_table_or_view(_proxy, _before.views, _after.views, [&] (schema_mutations sm, schema_diff_side side) {
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
// If we don't do it we are leaving a window where write commands to this schema are illegal.
// There are 3 possibilities:
@@ -888,15 +850,6 @@ future<> schema_applier::prepare(utils::chunked_vector<mutation>& muts) {
_keyspaces.emplace(std::move(keyspace_name));
}
if (_reload) {
for (auto&& ks : _proxy.local().get_db().local().get_non_system_keyspaces()) {
_keyspaces.emplace(ks);
table_selector sel;
sel.all_in_keyspace = true;
_affected_tables[ks] = sel;
}
}
// Resolve sel.all_in_keyspace == true to the actual list of tables and views.
for (auto&& [keyspace_name, sel] : _affected_tables) {
if (sel.all_in_keyspace) {
@@ -913,12 +866,6 @@ future<> schema_applier::prepare(utils::chunked_vector<mutation>& muts) {
_before = co_await get_schema_persisted_state();
for (auto& mut : muts) {
// We must force recalculation of schema version after the merge, since the resulting
// schema may be a mix of the old and new schemas, with the exception of entries
// that originate from group 0.
maybe_delete_schema_version(mut);
}
}
class pending_schema_getter : public service::schema_getter {
@@ -1261,10 +1208,10 @@ static future<> execute_do_merge_schema(sharded<service::storage_proxy>& proxy,
co_await ap.post_commit();
}
static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, sharded<db::system_keyspace>& sys_ks, utils::chunked_vector<mutation> mutations, bool reload)
static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, sharded<db::system_keyspace>& sys_ks, utils::chunked_vector<mutation> mutations)
{
slogger.trace("do_merge_schema: {}", mutations);
schema_applier ap(proxy, ss, sys_ks, reload);
schema_applier ap(proxy, ss, sys_ks);
co_await execute_do_merge_schema(proxy, ap, std::move(mutations)).finally([&ap]() {
return ap.destroy();
});
@@ -1279,17 +1226,17 @@ static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, utils::chunked_vector<mutation> mutations, bool reload)
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, utils::chunked_vector<mutation> mutations)
{
if (this_shard_id() != 0) {
// mutations must be applied on the owning shard (0).
co_await smp::submit_to(0, coroutine::lambda([&, fmuts = freeze(mutations)] () mutable -> future<> {
co_await merge_schema(sys_ks, proxy, ss, co_await unfreeze_gently(fmuts), reload);
co_await merge_schema(sys_ks, proxy, ss, co_await unfreeze_gently(fmuts));
}));
co_return;
}
co_await with_merge_lock([&] () mutable -> future<> {
co_await do_merge_schema(proxy, ss, sys_ks, std::move(mutations), reload);
co_await do_merge_schema(proxy, ss, sys_ks, std::move(mutations));
auto version = co_await get_group0_schema_version(sys_ks.local());
co_await update_schema_version_and_announce(sys_ks, proxy, version);
});

View File

@@ -29,7 +29,7 @@ namespace db {
namespace schema_tables {
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, utils::chunked_vector<mutation> mutations, bool reload = false);
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, sharded<service::storage_proxy>& proxy, sharded<service::storage_service>& ss, utils::chunked_vector<mutation> mutations);
enum class table_kind { table, view };
@@ -166,7 +166,6 @@ class schema_applier {
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<db::system_keyspace>& _sys_ks;
const bool _reload;
std::set<sstring> _keyspaces;
std::unordered_map<keyspace_name, table_selector> _affected_tables;
@@ -199,9 +198,8 @@ public:
schema_applier(
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<db::system_keyspace>& sys_ks,
bool reload = false)
: _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {};
sharded<db::system_keyspace>& sys_ks)
: _proxy(proxy), _ss(ss), _sys_ks(sys_ks) {};
// Gets called before mutations are applied,
// preferably no work should be done here but subsystem

View File

@@ -28,7 +28,7 @@ enum class schema_feature {
// When enabled we'll add a new column to the `system_schema.scylla_tables` table.
GROUP0_SCHEMA_VERSIONING,
// Unused. Defined for backward compatibility only
// Always enabled. Defined for backward compatibility.
IN_MEMORY_TABLES,
// Per-table tablet options

View File

@@ -302,11 +302,8 @@ schema_ptr tables() {
// Holds Scylla-specific table metadata.
schema_ptr scylla_tables(schema_features features) {
static thread_local schema_ptr schemas[2]{};
static thread_local schema_ptr s;
bool has_in_memory = features.contains(schema_feature::IN_MEMORY_TABLES);
schema_ptr& s = schemas[has_in_memory];
if (!s) {
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
auto sb = schema_builder(this_smp_shard_count(), NAME, SCYLLA_TABLES, std::make_optional(id))
@@ -319,9 +316,7 @@ schema_ptr scylla_tables(schema_features features) {
// PER_TABLE_PARTITIONERS
sb.with_column("partitioner", utf8_type);
if (has_in_memory) {
sb.with_column("in_memory", boolean_type);
}
sb.with_column("in_memory", boolean_type);
// If true, this table's latest schema was committed by group 0.
// In this case `version` column is non-null and will be used for `schema::version()` instead of calculating a hash.
@@ -850,7 +845,7 @@ read_keyspace_mutation(sharded<service::storage_proxy>& proxy, const sstring& ke
co_return co_await query_partition_mutation(proxy.local(), std::move(s), std::move(cmd), std::move(key));
}
mutation compact_for_schema_digest(const mutation& m) {
mutation compact_for_comparison(const mutation& m) {
// Cassandra is skipping tombstones from digest calculation
// to avoid disagreements due to tombstone GC.
// See https://issues.apache.org/jira/browse/CASSANDRA-6862.
@@ -860,13 +855,6 @@ mutation compact_for_schema_digest(const mutation& m) {
return m_compacted;
}
void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features features) {
auto compacted = compact_for_schema_digest(m);
if (!features.contains<schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>() || !compacted.partition().empty()) {
feed_hash(h, compacted);
}
}
/// Helper function which fills a given mutation with column information
/// provided the corresponding column_definition object.
static void fill_column_info(const schema& table,
@@ -2311,7 +2299,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
if (version) {
builder.with_version(*version);
} else {
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
builder.with_version(sm.digest());
}
if (cdc_schema) {
@@ -2545,8 +2533,9 @@ static schema_builder prepare_view_schema_builder_from_mutations(const schema_ct
if (version) {
builder.with_version(*version);
} else {
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
builder.with_version(sm.digest());
}
return builder;
}

View File

@@ -281,9 +281,7 @@ data_type parse_type(sstring str);
sstring serialize_index_kind(index_metadata_kind kind);
index_metadata_kind deserialize_index_kind(sstring kind);
mutation compact_for_schema_digest(const mutation& m);
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
mutation compact_for_comparison(const mutation& m);
template<typename K, typename V>
std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {

View File

@@ -180,7 +180,7 @@ db::schema_features feature_service::cluster_schema_features() const {
f.set_if<db::schema_feature::SCYLLA_AGGREGATES>(aggregate_storage_options);
f.set_if<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>(table_digest_insensitive_to_expiry);
f.set<db::schema_feature::GROUP0_SCHEMA_VERSIONING>();
f.set_if<db::schema_feature::IN_MEMORY_TABLES>(bool(in_memory_tables));
f.set<db::schema_feature::IN_MEMORY_TABLES>();
f.set_if<db::schema_feature::TABLET_OPTIONS>(bool(tablet_options));
f.set_if<db::schema_feature::KEYSPACE_MULTI_RF_CHANGE>(bool(keyspace_multi_rf_change));
return f;

View File

@@ -2096,10 +2096,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return db.recover_logstor();
}).get();
// Depends on all keyspaces being initialized because after this call
// we can be reloading schema.
mm.local().register_feature_listeners();
checkpoint(stop_signal, "starting commit log");
auto cl = db.local().commitlog();

View File

@@ -9,8 +9,6 @@
#include "schema_mutations.hh"
#include "mutation/canonical_mutation.hh"
#include "db/schema_tables.hh"
#include "utils/hashers.hh"
#include "utils/UUID_gen.hh"
schema_mutations::schema_mutations(canonical_mutation columnfamilies,
canonical_mutation columns,
@@ -49,7 +47,7 @@ void schema_mutations::copy_to(utils::chunked_vector<mutation>& dst) const {
}
}
table_schema_version schema_mutations::digest(db::schema_features sf) const {
table_schema_version schema_mutations::digest() const {
if (_scylla_tables) {
auto rs = query::result_set(*_scylla_tables);
if (!rs.empty()) {
@@ -61,39 +59,7 @@ table_schema_version schema_mutations::digest(db::schema_features sf) const {
}
}
md5_hasher h;
if (!sf.contains<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>()) {
// Disable this feature so that the digest remains compactible with Scylla
// versions prior to this feature.
// This digest affects the table schema version calculation and it's important
// that all nodes arrive at the same table schema version to avoid needless schema version
// pulls. It used to be the case that when table schema versions were calculated on boot we
// didn't yet know all the cluster features, so we could get different table versions after reboot
// in an already upgraded cluster. However, they are now available, and if
// TABLE_DIGEST_INSENSITIVE_TO_EXPIRY is enabled, we can compute with DIGEST_INSENSITIVE_TO_EXPIRY
// enabled.
sf.remove<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>();
}
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, sf);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, sf);
if (_view_virtual_columns && !_view_virtual_columns->partition().empty()) {
db::schema_tables::feed_hash_for_schema_digest(h, *_view_virtual_columns, sf);
}
if (_computed_columns && !_computed_columns->partition().empty()) {
db::schema_tables::feed_hash_for_schema_digest(h, *_computed_columns, sf);
}
if (_indices && !_indices->partition().empty()) {
db::schema_tables::feed_hash_for_schema_digest(h, *_indices, sf);
}
if (_dropped_columns && !_dropped_columns->partition().empty()) {
db::schema_tables::feed_hash_for_schema_digest(h, *_dropped_columns, sf);
}
if (_scylla_tables) {
db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables, sf);
}
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
throw std::runtime_error("schema version not found in scylla_tables");
}
std::optional<sstring> schema_mutations::partitioner() const {
@@ -110,11 +76,11 @@ static mutation_opt compact(const mutation_opt& m) {
if (!m) {
return m;
}
return db::schema_tables::compact_for_schema_digest(*m);
return db::schema_tables::compact_for_comparison(*m);
}
static mutation_opt compact(const mutation& m) {
return db::schema_tables::compact_for_schema_digest(m);
return db::schema_tables::compact_for_comparison(m);
}
bool schema_mutations::operator==(const schema_mutations& other) const {

View File

@@ -12,7 +12,6 @@
#include "mutation/mutation.hh"
#include "schema/schema_fwd.hh"
#include "mutation/canonical_mutation.hh"
#include "db/schema_features.hh"
// Commutative representation of table schema
// Equality ignores tombstones.
@@ -125,7 +124,7 @@ public:
bool is_view() const;
table_schema_version digest(db::schema_features) const;
table_schema_version digest() const;
std::optional<sstring> partitioner() const;
bool operator==(const schema_mutations&) const;

View File

@@ -167,28 +167,6 @@ future<> migration_manager::uninit_messaging_service()
co_await ser::migration_manager_rpc_verbs::unregister(&_messaging);
}
void migration_manager::register_feature_listeners() {
auto reload_schema_in_bg = [this] {
(void) with_gate(_background_tasks, [this] {
return reload_schema().handle_exception([] (std::exception_ptr ep) {
// Due to features being unordered, reload might fail because
// some tables still have the wrong version and looking up e.g.
// the base-table of a view will fail.
mlogger.debug("Failed to reload schema: {}", ep);
});
});
};
if (this_shard_id() == 0) {
for (const gms::feature& feature : {
std::cref(_feat.table_digest_insensitive_to_expiry)}) {
if (!feature) {
_feature_listeners.push_back(feature.when_enabled(reload_schema_in_bg));
}
}
_feature_listeners.push_back(_feat.in_memory_tables.when_enabled(reload_schema_in_bg));
}
}
void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.add(listener);
@@ -277,16 +255,6 @@ future<> migration_manager::merge_schema_from(locator::host_id src, const utils:
co_await db::schema_tables::merge_schema(_sys_ks, proxy.container(), ss.get()->container(), std::move(mutations));
}
future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
auto ss = _ss.get_permit();
if (!ss) {
co_return;
}
utils::chunked_vector<mutation> mutations;
co_await db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), ss.get()->container(), std::move(mutations), true);
}
future<> migration_notifier::on_schema_change(std::function<void(migration_listener*)> notify, std::function<std::string(std::exception_ptr)> describe_error) {
return seastar::async([this, notify = std::move(notify), describe_error = std::move(describe_error)] {
std::exception_ptr ex;

View File

@@ -54,7 +54,6 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
private:
migration_notifier& _notifier;
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::named_gate _background_tasks;
gms::feature_service& _feat;
netw::messaging_service& _messaging;
@@ -88,8 +87,6 @@ public:
// Makes sure that this node knows about all schema changes known by "nodes" that were made prior to this call.
future<> sync_schema(const replica::database& db, const std::vector<locator::host_id>& nodes);
future<> reload_schema();
// Merge mutations received from src.
// Keep mutations alive around whole async operation.
future<> merge_schema_from(locator::host_id src, const utils::chunked_vector<canonical_mutation>& mutations);
@@ -170,8 +167,6 @@ private:
future<> announce_with_raft(utils::chunked_vector<mutation> schema, group0_guard, std::string_view description, std::optional<raft_timeout> timeout);
public:
void register_feature_listeners();
// Returns schema of given version, either from cache or from remote node identified by 'from'.
// The returned schema may not be synchronized. See schema::is_synced().
// Intended to be used in the read path.

View File

@@ -2936,16 +2936,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
}
future<> storage_service::reload_schema() {
// Flush memtables and clear cache so that we use the same state we would after node restart
// to rule out potential discrepancies which could stem from merging with memtable/cache readers.
co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await _migration_manager.invoke_on(0, [] (auto& mm) {
return mm.reload_schema();
});
}
future<> storage_service::drain() {
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
if (ss._operation_mode == mode::DRAINED) {

View File

@@ -764,9 +764,6 @@ public:
*/
future<> drain();
// Recalculates schema digests on this node from contents of tables on disk.
future<> reload_schema();
future<std::map<gms::inet_address, float>> get_ownership();
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name, sstring table_name);