schema_tables, storage_service: Make schema digest insensitive to expired tombstones in empty partition
Schema digest is calculated by querying for mutations of all schema tables, then compacting them so that all tombstones in them are dropped. However, even if the mutation becomes empty after compaction, we still feed its partition key. If the same mutations were compacted prior to the query, because the tombstones expire, we won't get any mutation at all and won't feed the partition key. So schema digest will change once an empty partition of some schema table is compacted away. That's not a problem during normal cluster operation because the tombstones will expire at all nodes at the same time, and schema digest, although changes, will change to the same value on all nodes at about the same time. This fix changes digest calculation to not feed any digest for partitions which are empty after compaction. The digest returned by schema_mutations::digest() is left unchanged by this patch. It affects the table schema version calculation. It's not changed because the version is calculated on boot, where we don't yet know all the cluster features. It's possible to fix this but it's more complicated, so this patch defers that. Refs #4485. Asd
This commit is contained in:
@@ -26,11 +26,17 @@
|
||||
namespace db {
|
||||
|
||||
enum class schema_feature {
|
||||
VIEW_VIRTUAL_COLUMNS
|
||||
VIEW_VIRTUAL_COLUMNS,
|
||||
|
||||
// When set, the schema digest is calcualted in a way such that it doesn't change after all
|
||||
// tombstones in an empty partition expire.
|
||||
// See https://github.com/scylladb/scylla/issues/4485
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
};
|
||||
|
||||
using schema_features = enum_set<super_enum<schema_feature,
|
||||
schema_feature::VIEW_VIRTUAL_COLUMNS
|
||||
schema_feature::VIEW_VIRTUAL_COLUMNS,
|
||||
schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY
|
||||
>>;
|
||||
|
||||
}
|
||||
|
||||
@@ -587,9 +587,9 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
return mutations;
|
||||
});
|
||||
};
|
||||
auto reduce = [] (auto& hash, auto&& mutations) {
|
||||
auto reduce = [features] (auto& hash, auto&& mutations) {
|
||||
for (const mutation& m : mutations) {
|
||||
feed_hash_for_schema_digest(hash, m);
|
||||
feed_hash_for_schema_digest(hash, m, features);
|
||||
}
|
||||
};
|
||||
return do_with(md5_hasher(), all_table_names(features), [features, map, reduce] (auto& hash, auto& tables) {
|
||||
@@ -778,8 +778,11 @@ mutation compact_for_schema_digest(const mutation& m) {
|
||||
return m_compacted;
|
||||
}
|
||||
|
||||
void feed_hash_for_schema_digest(hasher& h, const mutation& m) {
|
||||
feed_hash(h, compact_for_schema_digest(m));
|
||||
void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features features) {
|
||||
auto compacted = compact_for_schema_digest(m);
|
||||
if (!features.contains<schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>() || !compacted.partition().empty()) {
|
||||
feed_hash(h, compact_for_schema_digest(m));
|
||||
}
|
||||
}
|
||||
|
||||
// Applies deletion of the "version" column to a system_schema.scylla_tables mutation.
|
||||
@@ -2731,8 +2734,9 @@ namespace legacy {
|
||||
|
||||
table_schema_version schema_mutations::digest() const {
|
||||
md5_hasher h;
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns);
|
||||
const db::schema_features no_features;
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
|
||||
@@ -215,7 +215,7 @@ index_metadata_kind deserialize_index_kind(sstring kind);
|
||||
|
||||
mutation compact_for_schema_digest(const mutation& m);
|
||||
|
||||
void feed_hash_for_schema_digest(hasher&, const mutation&);
|
||||
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
|
||||
|
||||
} // namespace schema_tables
|
||||
} // namespace db
|
||||
|
||||
@@ -69,19 +69,30 @@ table_schema_version schema_mutations::digest() const {
|
||||
}
|
||||
|
||||
md5_hasher h;
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns);
|
||||
db::schema_features sf = db::schema_features::full();
|
||||
|
||||
// Disable this feature so that the digest remains compactible with Scylla
|
||||
// versions prior to this feature.
|
||||
// This digest affects the table schema version calculation and it's important
|
||||
// that all nodes arrive at the same table schema version to avoid needless schema version
|
||||
// pulls. Table schema versions are calculated on boot when we don't yet
|
||||
// know all the cluster features, so we could get different table versions after reboot
|
||||
// in an already upgraded cluster.
|
||||
sf.remove<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>();
|
||||
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, sf);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns, sf);
|
||||
if (_view_virtual_columns && !_view_virtual_columns->partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_view_virtual_columns);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_view_virtual_columns, sf);
|
||||
}
|
||||
if (_indices && !_indices->partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_indices);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_indices, sf);
|
||||
}
|
||||
if (_dropped_columns && !_dropped_columns->partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_dropped_columns);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_dropped_columns, sf);
|
||||
}
|
||||
if (_scylla_tables) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables, sf);
|
||||
}
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
@@ -82,12 +82,16 @@ future<> migration_manager::stop()
|
||||
void migration_manager::init_messaging_service()
|
||||
{
|
||||
auto& ss = service::get_local_storage_service();
|
||||
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled([this, &ss] {
|
||||
|
||||
auto update_schema = [this, &ss] {
|
||||
with_gate(_background_tasks, [this, &ss] {
|
||||
mlogger.debug("view_virtual_columns feature enabled, recalculating schema version");
|
||||
mlogger.debug("features changed, recalculating schema version");
|
||||
return update_schema_version(get_storage_proxy(), ss.cluster_schema_features());
|
||||
});
|
||||
}));
|
||||
};
|
||||
|
||||
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled(update_schema));
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
|
||||
@@ -110,6 +110,7 @@ static const sstring TRUNCATION_TABLE = "TRUNCATION_TABLE";
|
||||
static const sstring CORRECT_STATIC_COMPACT_IN_MC = "CORRECT_STATIC_COMPACT_IN_MC";
|
||||
static const sstring UNBOUNDED_RANGE_TOMBSTONES_FEATURE = "UNBOUNDED_RANGE_TOMBSTONES";
|
||||
static const sstring VIEW_VIRTUAL_COLUMNS = "VIEW_VIRTUAL_COLUMNS";
|
||||
static const sstring DIGEST_INSENSITIVE_TO_EXPIRY = "DIGEST_INSENSITIVE_TO_EXPIRY";
|
||||
|
||||
static const sstring SSTABLE_FORMAT_PARAM_NAME = "sstable_format";
|
||||
|
||||
@@ -162,6 +163,7 @@ storage_service::storage_service(distributed<database>& db, gms::gossiper& gossi
|
||||
, _correct_static_compact_in_mc(_feature_service, CORRECT_STATIC_COMPACT_IN_MC)
|
||||
, _unbounded_range_tombstones_feature(_feature_service, UNBOUNDED_RANGE_TOMBSTONES_FEATURE)
|
||||
, _view_virtual_columns(_feature_service, VIEW_VIRTUAL_COLUMNS)
|
||||
, _digest_insensitive_to_expiry(_feature_service, DIGEST_INSENSITIVE_TO_EXPIRY)
|
||||
, _la_feature_listener(*this, _feature_listeners_sem, sstables::sstable_version_types::la)
|
||||
, _mc_feature_listener(*this, _feature_listeners_sem, sstables::sstable_version_types::mc)
|
||||
, _replicate_action([this] { return do_replicate_to_all_cores(); })
|
||||
@@ -208,6 +210,7 @@ void storage_service::enable_all_features() {
|
||||
std::ref(_correct_static_compact_in_mc),
|
||||
std::ref(_unbounded_range_tombstones_feature),
|
||||
std::ref(_view_virtual_columns),
|
||||
std::ref(_digest_insensitive_to_expiry),
|
||||
})
|
||||
{
|
||||
if (features.count(f.name())) {
|
||||
@@ -311,6 +314,7 @@ std::set<sstring> storage_service::get_config_supported_features_set() {
|
||||
TRUNCATION_TABLE,
|
||||
CORRECT_STATIC_COMPACT_IN_MC,
|
||||
VIEW_VIRTUAL_COLUMNS,
|
||||
DIGEST_INSENSITIVE_TO_EXPIRY,
|
||||
};
|
||||
|
||||
// Do not respect config in the case database is not started
|
||||
@@ -3479,6 +3483,7 @@ void storage_service::notify_cql_change(inet_address endpoint, bool ready)
|
||||
db::schema_features storage_service::cluster_schema_features() const {
|
||||
db::schema_features f;
|
||||
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(bool(_view_virtual_columns));
|
||||
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(bool(_digest_insensitive_to_expiry));
|
||||
return f;
|
||||
}
|
||||
|
||||
|
||||
@@ -323,6 +323,7 @@ private:
|
||||
gms::feature _correct_static_compact_in_mc;
|
||||
gms::feature _unbounded_range_tombstones_feature;
|
||||
gms::feature _view_virtual_columns;
|
||||
gms::feature _digest_insensitive_to_expiry;
|
||||
|
||||
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
|
||||
seastar::semaphore _feature_listeners_sem = {1};
|
||||
@@ -2338,6 +2339,9 @@ public:
|
||||
const gms::feature& cluster_supports_view_virtual_columns() const {
|
||||
return _view_virtual_columns;
|
||||
}
|
||||
const gms::feature& cluster_supports_digest_insensitive_to_expiry() const {
|
||||
return _digest_insensitive_to_expiry;
|
||||
}
|
||||
// Returns schema features which all nodes in the cluster advertise as supported.
|
||||
db::schema_features cluster_schema_features() const;
|
||||
private:
|
||||
|
||||
Reference in New Issue
Block a user