From 72c472a3aef896c383932e612aa86154699e3080 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 12:53:34 +0200 Subject: [PATCH 01/16] replica: introduce large_data_exception Add large_data_exception to the replica exception hierarchy so that write-path guardrails can reject mutations that target partitions already known to exceed configured size limits. Wire it through exception_variant / IDL so it propagates from replica to coordinator, where storage_proxy re-throws it as a mutation_write_failure. --- idl/replica_exception.idl.hh | 9 ++++++++- replica/exceptions.cc | 2 ++ replica/exceptions.hh | 22 +++++++++++++++++++++- service/paxos/paxos_state.cc | 9 ++++++++- service/storage_proxy.cc | 18 ++++++++++++------ service/storage_proxy.hh | 6 +++--- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/idl/replica_exception.idl.hh b/idl/replica_exception.idl.hh index c22687ba6a..411cf40734 100644 --- a/idl/replica_exception.idl.hh +++ b/idl/replica_exception.idl.hh @@ -27,13 +27,20 @@ class critical_disk_utilization_exception { sstring failed_action(); }; +class large_data_exception { + sstring ks_name(); + sstring cf_name(); + sstring message(); +}; + struct exception_variant { std::variant reason; }; diff --git a/replica/exceptions.cc b/replica/exceptions.cc index 1a8a180d56..0567839321 100644 --- a/replica/exceptions.cc +++ b/replica/exceptions.cc @@ -25,6 +25,8 @@ exception_variant try_encode_replica_exception(std::exception_ptr eptr) { return abort_requested_exception(); } catch (const critical_disk_utilization_exception& e) { return e; + } catch (const large_data_exception& e) { + return e; } catch (...) { return no_exception{}; } diff --git a/replica/exceptions.hh b/replica/exceptions.hh index e60bea392e..f0deeb2df8 100644 --- a/replica/exceptions.hh +++ b/replica/exceptions.hh @@ -80,6 +80,25 @@ public: virtual const char* what() const noexcept override { return _message.c_str(); } }; +class large_data_exception final : public replica_exception { + seastar::sstring _ks_name; + seastar::sstring _cf_name; + seastar::sstring _message; +public: + large_data_exception(std::string_view ks_name, std::string_view cf_name, std::string_view detail) noexcept + : replica_exception() + , _ks_name(ks_name) + , _cf_name(cf_name) + , _message(seastar::format("Write rejected for {}.{}: {}", ks_name, cf_name, detail)) + { } + + const seastar::sstring& ks_name() const { return _ks_name; } + const seastar::sstring& cf_name() const { return _cf_name; } + const seastar::sstring& message() const { return _message; } + + virtual const char* what() const noexcept override { return _message.c_str(); } +}; + using abort_requested_exception = seastar::abort_requested_exception; struct exception_variant { @@ -88,7 +107,8 @@ struct exception_variant { rate_limit_exception, stale_topology_exception, abort_requested_exception, - critical_disk_utilization_exception + critical_disk_utilization_exception, + large_data_exception > reason; exception_variant() diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 2da4136e56..e5880c4015 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -260,7 +260,14 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_ on_internal_error(logger, format("schema version in learn does not match current schema")); } - co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); + // Skip the large-partition write guardrail for the Paxos learn write. + // Paxos cannot recover from a partial-replica rejection here: rejecting + // on a subset of replicas would leave the cluster in an inconsistent + // state where some replicas have applied the decision and others + // haven't. The decision was already accepted; it must be durably + // committed by every reachable replica. + co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout, + std::monostate{}, /* skip_large_data_guardrails */ true); } else { logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision); tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index ffb73f72a3..488515abf2 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -649,9 +649,10 @@ private: seastar::log_level l = seastar::log_level::warn; if (is_timeout_exception(eptr) || std::holds_alternative(errors.local.reason) + || std::holds_alternative(errors.local.reason) || std::holds_alternative(errors.local.reason)) { - // ignore timeouts, abort requests and rate limit exceptions so that logs are not flooded. - // database's total_writes_timedout or total_writes_rate_limited counter was incremented. + // ignore timeouts, abort requests, rate limit and large-data rejections so that logs are not flooded. + // database's total_writes_timedout, total_writes_rate_limited or total_writes_rejected_due_to_large_partition counter was incremented. l = seastar::log_level::debug; } slogger.log(l, "Failed to apply mutation from {}#{}: {}", reply_to_host_id, shard, eptr); @@ -793,6 +794,9 @@ private: } else if constexpr (std::is_same_v) { msg = e.what(); return error::FAILURE; + } else if constexpr (std::is_same_v) { + msg = e.message(); + return error::FAILURE; } }, exception->reason); } @@ -3460,17 +3464,17 @@ storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_sta future<> storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, - smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) { + smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { auto erm = _db.local().find_column_family(s).get_effective_replication_map(); - auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info] (shard_id shard) { + auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info, skip_large_data_guardrails] (shard_id shard) { get_stats().replica_cross_shard_ops += shard != this_shard_id(); auto shard_rate_limit = rate_limit_info; if (shard == this_shard_id()) { shard_rate_limit = adjust_rate_limit_for_local_operation(shard_rate_limit); } return _db.invoke_on(shard, {smp_grp, timeout}, - [&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit] (replica::database& db) mutable -> future<> { - return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit); + [&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit, skip_large_data_guardrails] (replica::database& db) mutable -> future<> { + return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit, skip_large_data_guardrails); }); }; return apply_on_shards(erm, *s, m.token(*s), std::move(apply)); @@ -4777,6 +4781,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo msg = e->grab_cause(); } else if (auto* e = try_catch(eptr)) { msg = e->what(); + } else if (auto* e = try_catch(eptr)) { + msg = e->message(); } else { slogger.error("exception during mutation write to {}.{} on {}: {}", schema->ks_name(), schema->cf_name(), coordinator, eptr); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index a2723f2f3d..d8dd880fac 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -592,7 +592,7 @@ private: // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, - smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info); + smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails = false); // Applies mutations on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(utils::chunked_vector mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info); @@ -701,8 +701,8 @@ public: } // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. - future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { - return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info); + future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate(), bool skip_large_data_guardrails = false) { + return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info, skip_large_data_guardrails); } // Applies materialized view mutation on this node. // Resolves with timed_out_error when timeout is reached. From c12b16603d3532135b29727305a3ea0e82de559b Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:47:02 +0200 Subject: [PATCH 02/16] db: add large_partition_fail_threshold_mb config option Reject writes targeting a partition whose on-disk size already exceeds this threshold (MB). Code default is 0 (disabled) for existing clusters; scylla.yaml ships 2000 for new deployments. --- conf/scylla.yaml | 4 ++++ db/config.cc | 3 +++ db/config.hh | 1 + 3 files changed, 8 insertions(+) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index a5f1709849..5b0bcdd4ad 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -425,6 +425,10 @@ column_index_size_in_kb: 1 # Log a warning when writing partitions larger than this value # compaction_large_partition_warning_threshold_mb: 1000 +# Reject writes to partitions whose on-disk size exceeds this threshold (MB). +# Set to 0 to disable. +large_partition_fail_threshold_mb: 2000 + # Log a warning when writing rows larger than this value # compaction_large_row_warning_threshold_mb: 10 diff --git a/db/config.cc b/db/config.cc index 9049c954eb..f8e2577477 100644 --- a/db/config.cc +++ b/db/config.cc @@ -786,6 +786,9 @@ db::config::config(std::shared_ptr exts) "Log a warning when writing rows larger than this value.") , compaction_large_cell_warning_threshold_mb(this, "compaction_large_cell_warning_threshold_mb", liveness::LiveUpdate, value_status::Used, 1, "Log a warning when writing cells larger than this value.") + , large_partition_fail_threshold_mb(this, "large_partition_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") , compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000, "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index cfa33362fb..8241b1ea38 100644 --- a/db/config.hh +++ b/db/config.hh @@ -225,6 +225,7 @@ public: named_value compaction_large_partition_warning_threshold_mb; named_value compaction_large_row_warning_threshold_mb; named_value compaction_large_cell_warning_threshold_mb; + named_value large_partition_fail_threshold_mb; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; named_value compaction_large_data_records_per_sstable; From 64fc53cb7c750e7b776ace7fbc2cc3e35ff763fa Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:47:17 +0200 Subject: [PATCH 03/16] db: add rows_count_fail_threshold config option Reject writes targeting a partition whose on-disk row count already exceeds this threshold. Code default is 0 (disabled) for existing clusters; scylla.yaml ships 200000 for new deployments. --- conf/scylla.yaml | 4 ++++ db/config.cc | 3 +++ db/config.hh | 1 + 3 files changed, 8 insertions(+) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index 5b0bcdd4ad..6fa9a2c50e 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -438,6 +438,10 @@ large_partition_fail_threshold_mb: 2000 # Log a warning when row number is larger than this value # compaction_rows_count_warning_threshold: 100000 +# Reject writes to partitions whose on-disk row count exceeds this threshold. +# Set to 0 to disable. +rows_count_fail_threshold: 200000 + # Log a warning when writing a collection containing more elements than this value # compaction_collection_elements_count_warning_threshold: 10000 diff --git a/db/config.cc b/db/config.cc index f8e2577477..19edebc554 100644 --- a/db/config.cc +++ b/db/config.cc @@ -789,6 +789,9 @@ db::config::config(std::shared_ptr exts) , large_partition_fail_threshold_mb(this, "large_partition_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, "Reject writes targeting a partition whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " "Set to 0 to disable.") + , rows_count_fail_threshold(this, "rows_count_fail_threshold", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition whose on-disk row count already exceeds this threshold, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") , compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000, "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index 8241b1ea38..c02a905eaf 100644 --- a/db/config.hh +++ b/db/config.hh @@ -226,6 +226,7 @@ public: named_value compaction_large_row_warning_threshold_mb; named_value compaction_large_cell_warning_threshold_mb; named_value large_partition_fail_threshold_mb; + named_value rows_count_fail_threshold; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; named_value compaction_large_data_records_per_sstable; From ca2b8352ac1db168db87f5b61bacbae766fc220f Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:47:32 +0200 Subject: [PATCH 04/16] db: add large_row_fail_threshold_mb config option Reject writes targeting a row whose on-disk size already exceeds this threshold (MB). Code default is 0 (disabled) for existing clusters; scylla.yaml ships 20 for new deployments. --- conf/scylla.yaml | 4 ++++ db/config.cc | 3 +++ db/config.hh | 1 + 3 files changed, 8 insertions(+) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index 6fa9a2c50e..e2f9820f8f 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -432,6 +432,10 @@ large_partition_fail_threshold_mb: 2000 # Log a warning when writing rows larger than this value # compaction_large_row_warning_threshold_mb: 10 +# Reject writes to rows whose on-disk size exceeds this threshold (MB). +# Set to 0 to disable. +large_row_fail_threshold_mb: 20 + # Log a warning when writing cells larger than this value # compaction_large_cell_warning_threshold_mb: 1 diff --git a/db/config.cc b/db/config.cc index 19edebc554..8abdf31672 100644 --- a/db/config.cc +++ b/db/config.cc @@ -792,6 +792,9 @@ db::config::config(std::shared_ptr exts) , rows_count_fail_threshold(this, "rows_count_fail_threshold", liveness::LiveUpdate, value_status::Used, 0, "Reject writes targeting a partition whose on-disk row count already exceeds this threshold, as recorded in any SSTable's large data records. " "Set to 0 to disable.") + , large_row_fail_threshold_mb(this, "large_row_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition that contains any row whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") , compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000, "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index c02a905eaf..be60a3f257 100644 --- a/db/config.hh +++ b/db/config.hh @@ -227,6 +227,7 @@ public: named_value compaction_large_cell_warning_threshold_mb; named_value large_partition_fail_threshold_mb; named_value rows_count_fail_threshold; + named_value large_row_fail_threshold_mb; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; named_value compaction_large_data_records_per_sstable; From ed8817724ab8138ae5687b5a7f8e94fb2d910a7d Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:47:45 +0200 Subject: [PATCH 05/16] db: add large_collection_elements_fail_threshold config option Reject writes targeting a collection whose element count already exceeds this threshold. Code default is 0 (disabled) for existing clusters; scylla.yaml ships 20000 for new deployments. --- conf/scylla.yaml | 4 ++++ db/config.cc | 3 +++ db/config.hh | 1 + 3 files changed, 8 insertions(+) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index e2f9820f8f..94b4484d06 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -449,6 +449,10 @@ rows_count_fail_threshold: 200000 # Log a warning when writing a collection containing more elements than this value # compaction_collection_elements_count_warning_threshold: 10000 +# Reject writes to collections whose element count exceeds this threshold. +# Set to 0 to disable. +large_collection_elements_fail_threshold: 20000 + # How long the coordinator should wait for seq or index scans to complete # range_request_timeout_in_ms: 10000 # How long the coordinator should wait for writes to complete diff --git a/db/config.cc b/db/config.cc index 8abdf31672..310d701dff 100644 --- a/db/config.cc +++ b/db/config.cc @@ -795,6 +795,9 @@ db::config::config(std::shared_ptr exts) , large_row_fail_threshold_mb(this, "large_row_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, "Reject writes targeting a partition that contains any row whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " "Set to 0 to disable.") + , large_collection_elements_fail_threshold(this, "large_collection_elements_fail_threshold", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition that contains any collection whose element count already exceeds this threshold, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") , compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000, "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index be60a3f257..4871509755 100644 --- a/db/config.hh +++ b/db/config.hh @@ -228,6 +228,7 @@ public: named_value large_partition_fail_threshold_mb; named_value rows_count_fail_threshold; named_value large_row_fail_threshold_mb; + named_value large_collection_elements_fail_threshold; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; named_value compaction_large_data_records_per_sstable; From e0fee41d251dd79bd56bd5ec29457208192fdc2e Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:47:55 +0200 Subject: [PATCH 06/16] sstables: add intrusive index hook to large_data_record Add a set_member_hook to large_data_record so it can participate directly in a boost::intrusive::multiset without wrapper nodes or side maps. The hook is not listed in describe_type() and is therefore not serialized. Also add a non-const get_large_data_records() overload to sstable so that register_sstable can obtain mutable references to records for hook manipulation. --- sstables/sstables.hh | 4 ++++ sstables/types.hh | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 55de37a404..0abd32b573 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -1101,6 +1101,10 @@ public: return _large_data_records; } + std::optional& get_large_data_records() noexcept { + return _large_data_records; + } + // Return the extended timestamp statistics map. // Some or all entries may be missing if not present in scylla_metadata scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept; diff --git a/sstables/types.hh b/sstables/types.hh index fad5d32779..9f72be9162 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -17,6 +17,7 @@ #include "mutation/tombstone.hh" #include "utils/streaming_histogram.hh" #include "utils/estimated_histogram.hh" +#include #include "sstables/key.hh" #include "sstables/file_writer.hh" #include "db/commitlog/replay_position.hh" @@ -614,6 +615,11 @@ struct large_data_record { uint64_t range_tombstones; // number of range tombstones in the partition uint64_t dead_rows; // number of dead rows in the partition + // Runtime hook for large_data_record_index. Not serialized. + using index_hook_type = boost::intrusive::set_member_hook< + boost::intrusive::link_mode>; + index_hook_type _index_hook; + template auto describe_type(sstable_version_types v, Describer f) { return f(type, partition_key, clustering_key, column_name, value, From ae8879d2f6ffc0a8604086be7004bae541e131eb Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:54:52 +0200 Subject: [PATCH 07/16] db: implement large_data_record_index Per-table index over large_data_records from all live SSTables. Uses three intrusive multisets (partitions, rows, cells) with member hooks directly on large_data_record. Auto-unlink handles cleanup when SSTables are destroyed. Aggregation (max across SSTables for the same key) happens at lookup time via equal_range. --- db/large_data_handler.cc | 76 +++++++++++++++++++++++++++++++++++ db/large_data_handler.hh | 86 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index d3b3e4aaf2..3531a8f269 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -76,6 +76,82 @@ future<> large_data_handler::unplug_system_keyspace() noexcept { co_await _sys_ks.unplug(); } +void large_data_record_index::register_sstable(sstables::shared_sstable sst) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + return; + } + for (auto& rec : records_opt->elements) { + switch (rec.type) { + case sstables::large_data_type::partition_size: + case sstables::large_data_type::rows_in_partition: + _partitions.insert(rec); + break; + case sstables::large_data_type::row_size: + _rows.insert(rec); + break; + case sstables::large_data_type::elements_in_collection: + _collections.insert(rec); + break; + default: + break; + } + } +} + +void large_data_record_index::rebuild( + const std::unordered_set& sstables) { + _partitions.clear(); + _rows.clear(); + _collections.clear(); + for (const auto& sst : sstables) { + register_sstable(sst); + } +} + +std::optional +large_data_record_index::lookup_partition(bytes_view pk_bytes) const { + lookup_key lk{pk_bytes, {}, {}}; + auto [begin, end] = _partitions.equal_range(lk, _partitions.key_comp()); + if (begin == end) { + return std::nullopt; + } + partition_entry result; + for (auto it = begin; it != end; ++it) { + result.partition_size = std::max(result.partition_size, it->value); + result.rows = std::max(result.rows, it->elements_count); + } + return result; +} + +std::optional large_data_record_index::lookup_row(bytes_view pk_bytes, + bytes_view ck_bytes) const { + lookup_key lk{pk_bytes, ck_bytes, {}}; + auto [begin, end] = _rows.equal_range(lk, _rows.key_comp()); + if (begin == end) { + return std::nullopt; + } + uint64_t result = 0; + for (auto it = begin; it != end; ++it) { + result = std::max(result, it->value); + } + return result; +} + +std::optional large_data_record_index::lookup_collection(bytes_view pk_bytes, + bytes_view ck_bytes, bytes_view column_name) const { + lookup_key lk{pk_bytes, ck_bytes, column_name}; + auto [begin, end] = _collections.equal_range(lk, _collections.key_comp()); + if (begin == end) { + return std::nullopt; + } + uint64_t result = 0; + for (auto it = begin; it != end; ++it) { + result = std::max(result, it->elements_count); + } + return result; +} + sstring large_data_handler::sst_filename(const sstables::sstable& sst) { return sst.component_basename(sstables::component_type::Data); } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 1e68ee7bd7..3d805e719b 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -8,11 +8,17 @@ #pragma once +#include #include +#include +#include +#include "bytes.hh" #include "schema/schema_fwd.hh" #include "system_keyspace.hh" #include "sstables/shared_sstable.hh" +#include "sstables/types.hh" #include "utils/assert.hh" +#include "utils/hash.hh" #include "utils/updateable_value.hh" #include "utils/pluggable.hh" @@ -21,10 +27,90 @@ class sstable; class key; } +class partition_key_view; + namespace db { class system_keyspace; +using sstables::large_data_record; + +struct lookup_key { + bytes_view pk; + bytes_view ck; + bytes_view column_name; +}; + +// Compile-time comparison depth, one per multiset. +// partition → pk only +// row → pk + ck +// cell → pk + ck + column_name +enum class record_type { partition, row, collection }; + +template +struct record_compare { +private: + static bytes_view get_pk(const large_data_record& r) noexcept { return bytes_view(r.partition_key.value); } + static bytes_view get_pk(const lookup_key& k) noexcept { return k.pk; } + static bytes_view get_ck(const large_data_record& r) noexcept { return bytes_view(r.clustering_key.value); } + static bytes_view get_ck(const lookup_key& k) noexcept { return k.ck; } + static bytes_view get_col(const large_data_record& r) noexcept { return bytes_view(r.column_name.value); } + static bytes_view get_col(const lookup_key& k) noexcept { return k.column_name; } +public: + template + requires (std::same_as || std::same_as) + bool operator()(const L& a, const R& b) const noexcept { + auto l_pk = get_pk(a), r_pk = get_pk(b); + if (l_pk != r_pk) { + return l_pk < r_pk; + } + if constexpr (Depth == record_type::partition) { + return false; + } + auto l_ck = get_ck(a), r_ck = get_ck(b); + if (l_ck != r_ck) { + return l_ck < r_ck; + } + if constexpr (Depth == record_type::row) { + return false; + } + return get_col(a) < get_col(b); + } +}; + +template +using record_set = boost::intrusive::multiset, + boost::intrusive::compare>, + boost::intrusive::constant_time_size>; + +// Per-table index over large_data_records from all live SSTables. +// Links directly into records stored in each SSTable's scylla_metadata +// via intrusive member hooks (auto_unlink). Aggregation (max across +// SSTables for the same key) happens at lookup time via equal_range. +class large_data_record_index { +public: + struct partition_entry { + uint64_t partition_size = 0; + uint64_t rows = 0; + }; + + void register_sstable(sstables::shared_sstable sst); + + void rebuild(const std::unordered_set& sstables); + + std::optional lookup_partition(bytes_view pk_bytes) const; + std::optional lookup_row(bytes_view pk_bytes, bytes_view ck_bytes) const; + std::optional lookup_collection(bytes_view pk_bytes, + bytes_view ck_bytes, bytes_view column_name) const; + +private: + record_set _partitions; + record_set _rows; + record_set _collections; +}; + class large_data_handler { public: struct stats { From f7ffc64703c7bfca2f323c715e9c1d93cf294089 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 13:55:10 +0200 Subject: [PATCH 08/16] db: implement large_data_guardrail Checks partition size, row count, row size, and collection element count against config thresholds using large_data_record_index lookups. Warns on soft limit, throws large_data_exception on hard limit. --- db/large_data_handler.cc | 128 +++++++++++++++++++++++++++++++++++++++ db/large_data_handler.hh | 60 ++++++++++++++++++ 2 files changed, 188 insertions(+) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index 3531a8f269..c4ed36edb6 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -13,6 +13,8 @@ #include "db/system_keyspace.hh" #include "db/large_data_handler.hh" #include "keys/keys.hh" +#include "mutation/mutation_partition.hh" +#include "replica/exceptions.hh" #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "cql3/untyped_result_set.hh" @@ -21,6 +23,10 @@ static logging::logger large_data_logger("large_data"); namespace db { +namespace { +constexpr uint64_t MB = 1024 * 1024; +} + nop_large_data_handler::nop_large_data_handler() : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max()) { @@ -152,6 +158,127 @@ std::optional large_data_record_index::lookup_collection(bytes_view pk return result; } +void large_data_guardrail::register_sstable(sstables::shared_sstable sst) { + _index.register_sstable(std::move(sst)); +} + +void large_data_guardrail::rebuild(const std::unordered_set& sstables) { + _index.rebuild(sstables); +} + +void large_data_guardrail::check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const { + auto sst_key = sstables::key::from_partition_key(s, pk); + auto pk_bytes = bytes_view(sst_key.get_bytes()); + check_partition(s, pk_bytes, pk); + check_rows_and_collections(s, pk_bytes, mp, pk); +} + +void large_data_guardrail::check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const { + const uint64_t size_fail = uint64_t(_cfg.partition_size_fail_threshold_mb()) * MB; + const uint64_t size_warn = uint64_t(_cfg.partition_size_warn_threshold_mb()) * MB; + const uint64_t rows_fail = uint64_t(_cfg.rows_count_fail_threshold()); + const uint64_t rows_warn = uint64_t(_cfg.rows_count_warn_threshold()); + + auto entry = _index.lookup_partition(pk_bytes); + if (!entry) [[likely]] { + return; + } + if (size_fail > 0 && entry->partition_size >= size_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition size {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->partition_size >= size_warn) { + large_data_logger.warn("Large data guardrail: partition size {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_warn, s.ks_name(), s.cf_name(), pk); + } + if (rows_fail > 0 && entry->rows >= rows_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition row count {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->rows >= rows_warn) { + large_data_logger.warn("Large data guardrail: partition row count {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_warn, s.ks_name(), s.cf_name(), pk); + } +} + +void large_data_guardrail::check_rows_and_collections(const schema& s, bytes_view pk_bytes, + const mutation_partition& mp, partition_key_view pk) const { + + if (!mp.static_row().empty()) { + check_row_size(s, pk_bytes, pk, bytes_view(), nullptr); + mp.static_row().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.static_column_at(id), bytes_view(), nullptr); + }); + } + + for (const auto& cr : mp.non_dummy_rows()) { + auto ck_bytes = cr.key().view().representation().linearize(); + auto ck_bv = bytes_view(ck_bytes); + check_row_size(s, pk_bytes, pk, ck_bv, &cr.key()); + cr.row().cells().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.regular_column_at(id), ck_bv, &cr.key()); + }); + } +} + +void large_data_guardrail::check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const { + const uint64_t fail = uint64_t(_cfg.row_size_fail_threshold_mb()) * MB; + const uint64_t warn = uint64_t(_cfg.row_size_warn_threshold_mb()) * MB; + auto row_size = _index.lookup_row(pk_bytes, ck_bytes); + if (!row_size) [[likely]] { + return; + } + if (fail > 0 && *row_size >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: row size {} exceeds hard limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, fail, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*row_size >= warn) { + large_data_logger.warn("Large data guardrail: row size {} exceeds soft limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, warn, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + +void large_data_guardrail::check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const { + if (cdef.is_atomic()) { + return; + } + const uint64_t fail = uint64_t(_cfg.collection_elements_fail_threshold()); + const uint64_t warn = uint64_t(_cfg.collection_elements_warn_threshold()); + auto col_bytes = to_bytes(cdef.name_as_text()); + auto count = _index.lookup_collection(pk_bytes, ck_bytes, bytes_view(col_bytes)); + if (!count) [[likely]] { + return; + } + if (fail > 0 && *count >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: collection element count {} exceeds hard limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, fail, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*count >= warn) { + large_data_logger.warn("Large data guardrail: collection element count {} exceeds soft limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, warn, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + sstring large_data_handler::sst_filename(const sstables::sstable& sst) { return sst.component_basename(sstables::component_type::Data); } @@ -442,4 +569,5 @@ future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(co large_table_name, s.ks_name(), s.cf_name(), old_name, new_name, std::current_exception()); } } + } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 3d805e719b..b55de253b4 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "bytes.hh" #include "schema/schema_fwd.hh" @@ -28,6 +29,7 @@ class key; } class partition_key_view; +class mutation_partition; namespace db { @@ -111,6 +113,64 @@ private: record_set _collections; }; +struct guardrail_config { + utils::updateable_value partition_size_fail_threshold_mb; + utils::updateable_value partition_size_warn_threshold_mb; + utils::updateable_value rows_count_fail_threshold; + utils::updateable_value rows_count_warn_threshold; + utils::updateable_value row_size_fail_threshold_mb; + utils::updateable_value row_size_warn_threshold_mb; + utils::updateable_value collection_elements_fail_threshold; + utils::updateable_value collection_elements_warn_threshold; +}; + +// Each replica::table holds a unique_ptr to either a real guardrail or a +// noop. The guardrail owns the per-table large_data_record_index, so +// noop tables pay no index-maintenance cost. +class large_data_guardrail_base { +public: + virtual ~large_data_guardrail_base() = default; + virtual void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const = 0; + virtual void register_sstable(sstables::shared_sstable sst) = 0; + virtual void rebuild(const std::unordered_set& sstables) = 0; +}; + +class noop_large_data_guardrail final : public large_data_guardrail_base { +public: + static shared_ptr instance() { + static thread_local auto inst = make_shared(); + return inst; + } + void check(const schema&, const mutation_partition&, + partition_key_view) const override {} + void register_sstable(sstables::shared_sstable) override {} + void rebuild(const std::unordered_set&) override {} +}; + +class large_data_guardrail final : public large_data_guardrail_base { +public: + explicit large_data_guardrail(guardrail_config cfg) noexcept + : _cfg(std::move(cfg)) {} + + void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const override; + void register_sstable(sstables::shared_sstable sst) override; + void rebuild(const std::unordered_set& sstables) override; + +private: + void check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const; + void check_rows_and_collections(const schema& s, bytes_view pk_bytes, const mutation_partition& mp, partition_key_view pk) const; + void check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const; + void check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const; + + guardrail_config _cfg; + large_data_record_index _index; +}; + class large_data_handler { public: struct stats { From 5a0974e781dad95ded2c261cdf62dda97747eda0 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Thu, 14 May 2026 12:42:32 +0200 Subject: [PATCH 09/16] schema: add per-table large_data_guardrails_enabled flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a per-table large_data_guardrails_enabled flag controlled via the CQL table property WITH large_data_guardrails_enabled = true|false. Store the flag as a boolean column in system_schema_ext.scylla_tables. Only write a live cell when enabled; when disabled (the default), omit the cell entirely so that old nodes that don't know this column can still read the SSTable during rolling upgrade or rollback. When the property transitions from true to false via ALTER TABLE, a tombstone is written in make_update_table_mutations to override the previous live cell — this is safe because the CQL feature gate ensures all nodes are upgraded before the property can be set to true. Gate the CQL property behind the LARGE_DATA_GUARDRAILS cluster feature: attempting to set large_data_guardrails_enabled = true before all nodes advertise the feature raises a ConfigurationException. --- cql3/statements/cf_prop_defs.cc | 11 +++++++++++ cql3/statements/cf_prop_defs.hh | 1 + db/schema_tables.cc | 31 +++++++++++++++++++++++++++++++ gms/feature_service.hh | 1 + schema/schema.cc | 1 + schema/schema.hh | 5 +++++ schema/schema_builder.hh | 4 ++++ test/boost/schema_change_test.cc | 2 +- 8 files changed, 55 insertions(+), 1 deletion(-) diff --git a/cql3/statements/cf_prop_defs.cc b/cql3/statements/cf_prop_defs.cc index fb055de7d9..5d336c51a9 100644 --- a/cql3/statements/cf_prop_defs.cc +++ b/cql3/statements/cf_prop_defs.cc @@ -60,6 +60,7 @@ const sstring cf_prop_defs::COMPACTION_ENABLED_KEY = "enabled"; const sstring cf_prop_defs::KW_TABLETS = "tablets"; const sstring cf_prop_defs::KW_STORAGE_ENGINE = "storage_engine"; +const sstring cf_prop_defs::KW_LARGE_DATA_GUARDRAILS_ENABLED = "large_data_guardrails_enabled"; schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions& exts) const { schema::extensions_map er; @@ -109,6 +110,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS, KW_SYNCHRONOUS_UPDATES, KW_TABLETS, KW_STORAGE_ENGINE, + KW_LARGE_DATA_GUARDRAILS_ENABLED, }); static std::set obsolete_keywords({ sstring("index_interval"), @@ -210,6 +212,12 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, throw exceptions::configuration_exception(format("Illegal value for '{}'", KW_STORAGE_ENGINE)); } } + + if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED) && get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false)) { + if (!db.features().large_data_guardrails) { + throw exceptions::configuration_exception("large_data_guardrails_enabled cannot be used until all nodes in the cluster enable this feature"); + } + } } std::map cf_prop_defs::get_compaction_type_options() const { @@ -417,6 +425,9 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_ builder.set_logstor(); } } + if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED)) { + builder.set_large_data_guardrails_enabled(get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false)); + } } void cf_prop_defs::validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh index b1d236b6d9..9157f755f0 100644 --- a/cql3/statements/cf_prop_defs.hh +++ b/cql3/statements/cf_prop_defs.hh @@ -65,6 +65,7 @@ public: static const sstring KW_TABLETS; static const sstring KW_STORAGE_ENGINE; + static const sstring KW_LARGE_DATA_GUARDRAILS_ENABLED; // FIXME: In origin the following consts are in CFMetaData. static constexpr int32_t DEFAULT_DEFAULT_TIME_TO_LIVE = 0; diff --git a/db/schema_tables.cc b/db/schema_tables.cc index e3d2dccc4b..acc310ed1a 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -338,6 +338,7 @@ schema_ptr scylla_tables(schema_features features) { sb.with_column("tablets", map_type_impl::get_instance(utf8_type, utf8_type, false)); sb.with_column("storage_engine", utf8_type); + sb.with_column("large_data_guardrails_enabled", boolean_type); sb.with_hash_version(); s = sb.build(); @@ -1702,6 +1703,17 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times if (table->logstor_enabled()) { m.set_clustered_cell(ckey, "storage_engine", "logstor", timestamp); } + // Write the large_data_guardrails_enabled column only when enabled. + // When disabled (the default), omit the cell entirely so that old nodes + // that don't know this column can still read the SSTable during rolling + // upgrade or rollback. The CQL validation gate ensures that 'true' can + // only be set once all nodes support the column, so it is safe to write + // the live cell at that point. + if (table->large_data_guardrails_enabled()) { + auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled"); + m.set_clustered_cell(ckey, guardrails_cdef, + atomic_cell::make_live(*boolean_type, timestamp, boolean_type->decompose(true))); + } // In-memory tables are deprecated since scylla-2024.1.0 // FIXME: delete the column when there's no live version supporting it anymore. // Writing it here breaks upgrade rollback to versions that do not support the in_memory schema_feature @@ -1935,6 +1947,23 @@ utils::chunked_vector make_update_table_mutations(service::storage_pro utils::chunked_vector mutations; add_table_or_view_to_schema_mutation(new_table, timestamp, false, mutations); make_update_indices_mutations(sp, old_table, new_table, timestamp, mutations); + + // When large_data_guardrails_enabled transitions from true to false, + // write a tombstone to override the previous live cell in scylla_tables. + // make_scylla_tables_mutation only writes a live cell when enabled, so + // the tombstone must be added here where we have the old schema. + // This is safe because the CQL feature gate ensures all nodes are + // upgraded before the property can be set to true. + if (old_table->large_data_guardrails_enabled() && !new_table->large_data_guardrails_enabled()) { + schema_ptr s = tables(); + auto pkey = partition_key::from_singular(*s, new_table->ks_name()); + auto ckey = clustering_key::from_singular(*s, new_table->cf_name()); + mutation m(scylla_tables(), pkey); + auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled"); + m.set_clustered_cell(ckey, guardrails_cdef, atomic_cell::make_dead(timestamp, gc_clock::now())); + mutations.emplace_back(std::move(m)); + } + make_update_columns_mutations(std::move(old_table), std::move(new_table), timestamp, mutations); warn(unimplemented::cause::TRIGGERS); @@ -2194,6 +2223,8 @@ static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, sche throw std::invalid_argument(format("Invalid value for storage_engine: {}", *storage_engine)); } } + auto guardrails_enabled = table_row.get("large_data_guardrails_enabled"); + builder.set_large_data_guardrails_enabled(guardrails_enabled.value_or(false)); } schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional version) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 0a7c1be7ad..8fb2ac024b 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -181,6 +181,7 @@ public: gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv }; gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv }; gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv }; + gms::feature large_data_guardrails { *this, "LARGE_DATA_GUARDRAILS"sv }; gms::feature keyspace_multi_rf_change { *this, "KEYSPACE_MULTI_RF_CHANGE"sv }; public: diff --git a/schema/schema.cc b/schema/schema.cc index e21d318ee5..6bd426d258 100644 --- a/schema/schema.cc +++ b/schema/schema.cc @@ -711,6 +711,7 @@ table_schema_version schema::calculate_digest(const schema::raw_schema& r) { } feed_hash(h, r._props.tablet_options); + feed_hash(h, r._large_data_guardrails_enabled); return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize())); } diff --git a/schema/schema.hh b/schema/schema.hh index 1f127ba681..63d50f24ab 100644 --- a/schema/schema.hh +++ b/schema/schema.hh @@ -612,6 +612,7 @@ private: // schema digest. It is also not set locally on a schema tables. std::reference_wrapper _sharder; bool _in_memory = false; + bool _large_data_guardrails_enabled = false; std::optional _view_info; user_properties _props; @@ -814,6 +815,10 @@ public: return _raw._per_partition_rate_limit_options; } + bool large_data_guardrails_enabled() const { + return _raw._large_data_guardrails_enabled; + } + const ::speculative_retry& speculative_retry() const { return _raw._props.speculative_retry; } diff --git a/schema/schema_builder.hh b/schema/schema_builder.hh index 7ee1eb8c22..6464a23e2a 100644 --- a/schema/schema_builder.hh +++ b/schema/schema_builder.hh @@ -251,6 +251,10 @@ public: _raw._in_memory = in_memory; return *this; } + schema_builder& set_large_data_guardrails_enabled(bool enabled) { + _raw._large_data_guardrails_enabled = enabled; + return *this; + } schema_builder& set_tablet_options(std::map&& hints); diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index ca6d739076..4c963c1e50 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -676,7 +676,7 @@ SEASTAR_TEST_CASE(test_system_schema_version_is_stable) { // If you changed the schema of system.batchlog then this is expected to fail. // Just replace expected version with the new version. - BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("c3f984e4-f886-3616-bb80-f8c68ed93595"))); + BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("95ec5e20-12f2-361a-90b3-662f8999400b"))); }); } From 23881db289e5bcb905816673c5aaa8c03a3a1add Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Thu, 14 May 2026 15:27:46 +0200 Subject: [PATCH 10/16] replica: wire large_data_guardrail into the write path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread the per-table large_data_guardrail through the write path so that mutations exceeding configured thresholds are rejected before being applied to the memtable. The guardrail is selected in database::do_apply — either the table's own guardrail or a static noop when skip_large_data_guardrails is set. It flows through apply_in_memory → table::apply → memtable::apply, where the check runs after partition_builder deserializes the frozen mutation. For large mutations (>128KB), the check runs after unfreeze_gently instead. --- db/commitlog/commitlog_replayer.cc | 2 +- replica/database.cc | 43 +++++++++++++++++++++++------- replica/database.hh | 29 +++++++++++++++----- replica/distributed_loader.cc | 6 ++++- replica/memtable.cc | 11 +++++--- replica/memtable.hh | 8 ++++-- replica/table.cc | 39 ++++++++++++++++++++++++--- 7 files changed, 112 insertions(+), 26 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index f6445dde47..a47fc9117a 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -282,7 +282,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout); }); } else { - return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout); + return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout, db::noop_large_data_guardrail::instance()); } }).then_wrapped([s] (future<> f) { try { diff --git a/replica/database.cc b/replica/database.cc index 0166002464..755150578d 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1612,6 +1612,16 @@ keyspace::make_column_family_config(const schema& s, const database& db) const { cfg.data_listeners = &db.data_listeners(); cfg.enable_compacting_data_for_streaming_and_repair = db_config.enable_compacting_data_for_streaming_and_repair; cfg.enable_tombstone_gc_for_streaming_and_repair = db_config.enable_tombstone_gc_for_streaming_and_repair; + cfg.guardrail_config = db::guardrail_config{ + .partition_size_fail_threshold_mb = db_config.large_partition_fail_threshold_mb, + .partition_size_warn_threshold_mb = db_config.compaction_large_partition_warning_threshold_mb, + .rows_count_fail_threshold = db_config.rows_count_fail_threshold, + .rows_count_warn_threshold = db_config.compaction_rows_count_warning_threshold, + .row_size_fail_threshold_mb = db_config.large_row_fail_threshold_mb, + .row_size_warn_threshold_mb = db_config.compaction_large_row_warning_threshold_mb, + .collection_elements_fail_threshold = db_config.large_collection_elements_fail_threshold, + .collection_elements_warn_threshold = db_config.compaction_collection_elements_count_warning_threshold, + }; return cfg; } @@ -2150,20 +2160,29 @@ std::vector memtable_list::clear_and_add() { return std::exchange(_memtables, std::move(new_memtables)); } -future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) { +future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point timeout, + shared_ptr guardrails) { auto& cf = find_column_family(m.column_family_id()); data_listeners().on_write(m_schema, m); if (m.representation().size() > 128*1024) { - return unfreeze_gently(m, std::move(m_schema)).then([&cf, h = std::move(h), timeout] (auto m) mutable { + // Big mutation: unfreeze_gently (yields), then check guardrails on + // the already-deserialized mutation before applying. + auto pk = m.key(); + return unfreeze_gently(m, std::move(m_schema)).then( + [&cf, h = std::move(h), timeout, guardrails, pk] (auto m) mutable { + guardrails->check(*m.schema(), m.partition(), pk); return do_with(std::move(m), [&cf, h = std::move(h), timeout] (auto& m) mutable { return cf.apply(m, std::move(h), timeout); }); }); } - return cf.apply(m, std::move(m_schema), std::move(h), timeout); + // Small mutation: forward guardrails to memtable::apply which will check + // after partition_builder deserializes — no redundant unfreeze. + return cf.apply(m, std::move(m_schema), std::move(h), timeout, std::move(guardrails)); } future<> database::apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&& h, db::timeout_clock::time_point timeout) { @@ -2344,13 +2363,14 @@ future<> database::do_apply_many(const utils::chunked_vector& m auto handles = co_await cl->add_entries(std::move(writers), timeout); // FIXME: Memtable application is not atomic so reads may observe mutations partially applied until restart. + auto noop = db::noop_large_data_guardrail::instance(); for (size_t i = 0; i < muts.size(); ++i) { auto s = local_schema_registry().get(muts[i].schema_version()); - co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout); + co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout, noop); } } -future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) { +future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { ++_stats->total_writes; // assume failure until proven otherwise auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; }); @@ -2379,6 +2399,10 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::return_exception(replica::critical_disk_utilization_exception{"rejected write mutation"}); } + auto guardrails = skip_large_data_guardrails + ? db::noop_large_data_guardrail::instance() + : cf.get_large_data_guardrail(); + if (!std::holds_alternative(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) { auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second(); auto& write_label = cf.get_rate_limiter_label_for_writes(); @@ -2437,7 +2461,8 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::exception(std::move(ex)); } } - auto f = co_await coroutine::as_future(this->apply_in_memory(m, s, std::move(h), timeout)); + auto f = co_await coroutine::as_future( + this->apply_in_memory(m, s, std::move(h), timeout, std::move(guardrails))); if (f.failed()) { auto ex = f.get_exception(); if (try_catch(ex)) { @@ -2501,7 +2526,7 @@ void database::update_write_metrics_for_rejected_writes() { ++_stats->total_writes_rejected_due_to_out_of_space_prevention; } -future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { +future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { if (dblog.is_enabled(logging::log_level::trace)) { dblog.trace("apply {}", m.pretty_printer(s)); } @@ -2512,7 +2537,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_ if (!s->is_synced()) { on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } - return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info); + return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info, skip_large_data_guardrails); } future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) { @@ -2522,7 +2547,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t if (!s->is_synced()) { on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } - return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}); + return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}, false); } keyspace::config diff --git a/replica/database.hh b/replica/database.hh index d1a0d7fb28..8acce6912f 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -77,6 +77,7 @@ #include "replica/tables_metadata_lock.hh" #include "service/topology_guard.hh" #include "utils/disk_space_monitor.hh" +#include "db/large_data_handler.hh" class cell_locker; class cell_locker_stats; @@ -480,6 +481,7 @@ public: unsigned x_log2_compaction_groups{0}; utils::updateable_value enable_compacting_data_for_streaming_and_repair; utils::updateable_value enable_tombstone_gc_for_streaming_and_repair; + db::guardrail_config guardrail_config; }; using snapshot_details = db::snapshot_ctl::table_snapshot_details; @@ -553,6 +555,8 @@ private: std::unique_ptr _counter_cell_locks; // Memory-intensive; allocate only when needed. + shared_ptr _large_data_guardrail; + // Labels used to identify writes and reads for this table in the rate_limiter structure. db::rate_limiter::label _rate_limiter_label_for_writes; db::rate_limiter::label _rate_limiter_label_for_reads; @@ -1011,6 +1015,7 @@ public: [[gnu::always_inline]] bool uses_tablets() const; int64_t calculate_tablet_count() const; private: + shared_ptr make_large_data_guardrail() const; void update_tombstone_gc_rf_one(); future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg); @@ -1028,13 +1033,14 @@ public: // Applies given mutation to this column family // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) { - do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema); + do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema, *db::noop_large_data_guardrail::instance()); } void apply(const mutation& m, db::rp_handle&& h = {}) { do_apply(compaction_group_for_token(m.token()), std::move(h), m); } - future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point tmo); + future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point tmo, shared_ptr guardrails); future<> apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point tmo); // Returns at most "cmd.limit" rows @@ -1306,6 +1312,14 @@ public: return _index_manager; } + shared_ptr get_large_data_guardrail() const noexcept { + return _large_data_guardrail; + } + + // Rebuild from current SSTable set. Used at startup; after that + // the index is maintained incrementally via register_sstable. + void rebuild_large_data_index(); + sstables::sstables_manager& get_sstables_manager() noexcept { return _sstables_manager; } @@ -1704,7 +1718,8 @@ private: tracing::trace_state_ptr, db::timeout_clock::time_point, db::commitlog_force_sync, - db::per_partition_rate_limit::info> _apply_stage; + db::per_partition_rate_limit::info, + bool /* skip_large_data_guardrails */> _apply_stage; flat_hash_map _keyspaces; tables_metadata _tables_metadata; @@ -1767,7 +1782,9 @@ public: future<> init_logstor(); future<> recover_logstor(); const gms::feature_service& features() const { return _feat; } - future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); + future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, + db::timeout_clock::time_point timeout, + shared_ptr guardrails); future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout); drain_progress get_drain_progress() const noexcept { @@ -1795,7 +1812,7 @@ private: auto sum_read_concurrency_sem_var(std::invocable auto member); auto sum_read_concurrency_sem_stat(std::invocable auto stats_member); - future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info); + future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails); future<> do_apply_many(const utils::chunked_vector&, db::timeout_clock::time_point timeout); future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout); @@ -1973,7 +1990,7 @@ public: tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, bool tombstone_gc_enabled = true); // Apply the mutation atomically. // Throws timed_out_error when timeout is reached. - future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{}); + future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{}, bool skip_large_data_guardrails = false); // Apply mutations atomically. // On restart, either all mutations will be replayed or none of them. // All mutations must belong to the same commitlog domain. diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 33147bd1fe..c9f14bd44f 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -518,7 +518,11 @@ future<> distributed_loader::populate_keyspace(sharded& db, if (!is_system_keyspace(ks_name)) { co_await smp::invoke_on_all([&] { auto s = gtable->schema(); - db.local().find_column_family(s).mark_ready_for_writes(db.local().commitlog_for(s)); + auto& cf = db.local().find_column_family(s); + // Populate the large data caches from SSTable metadata + // before the table becomes writable. + cf.rebuild_large_data_index(); + cf.mark_ready_for_writes(db.local().commitlog_for(s)); }); } }); diff --git a/replica/memtable.cc b/replica/memtable.cc index 4910797dfa..60ad0b40cc 100644 --- a/replica/memtable.cc +++ b/replica/memtable.cc @@ -9,6 +9,7 @@ #include "utils/assert.hh" #include "memtable.hh" #include "replica/database.hh" +#include "replica/exceptions.hh" #include "mutation/frozen_mutation.hh" #include "replica/partition_snapshot_reader.hh" #include "partition_builder.hh" @@ -16,6 +17,8 @@ #include "readers/empty.hh" #include "readers/forwardable.hh" #include "sstables/types.hh" +#include "keys/keys.hh" +#include "db/large_data_handler.hh" namespace replica { @@ -797,13 +800,15 @@ memtable::apply(const mutation& m, db::rp_handle&& h) { } void -memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h) { - with_allocator(allocator(), [this, &m, &m_schema] { +memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, + const db::large_data_guardrail_base& guardrails, db::rp_handle&& h) { + with_allocator(allocator(), [this, &m, &m_schema, &guardrails] { _table_shared_data.allocating_section(*this, [&, this] { - auto& p = find_or_create_partition_slow(m.key()); mutation_partition mp(*m_schema); partition_builder pb(*m_schema, mp); m.partition().accept(*m_schema, pb); + guardrails.check(*m_schema, mp, m.key()); + auto& p = find_or_create_partition_slow(m.key()); _stats_collector.update(*m_schema, mp); p.apply(region(), cleaner(), *_schema, std::move(mp), *m_schema, _table_stats.memtable_app_stats); }); diff --git a/replica/memtable.hh b/replica/memtable.hh index 36d799ac40..b0107f3221 100644 --- a/replica/memtable.hh +++ b/replica/memtable.hh @@ -22,6 +22,7 @@ #include "utils/double-decker.hh" #include "readers/empty.hh" #include "readers/mutation_source.hh" +#include "db/large_data_handler.hh" class frozen_mutation; class row_cache; @@ -236,8 +237,11 @@ public: // Applies mutation to this memtable. // The mutation is upgraded to current schema. void apply(const mutation& m, db::rp_handle&& = {}); - // The mutation is upgraded to current schema. - void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {}); + void apply(const frozen_mutation& m, const schema_ptr& m_schema, + const db::large_data_guardrail_base& guardrails, db::rp_handle&& = {}); + void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) { + apply(m, m_schema, *db::noop_large_data_guardrail::instance(), std::move(h)); + } void evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept; static memtable& from_region(logalloc::region& r) noexcept { diff --git a/replica/table.cc b/replica/table.cc index f5102b452b..493b8867fc 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -53,6 +53,7 @@ #include "mutation/mutation_source_metadata.hh" #include "gms/gossiper.hh" #include "gms/feature_service.hh" +#include "cdc/log.hh" #include "db/config.hh" #include "db/commitlog/commitlog.hh" #include "utils/lister.hh" @@ -1607,6 +1608,7 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss add_maintenance_sstable(cg, sst); } update_stats_for_new_sstable(sst); + _large_data_guardrail->register_sstable(sst); if (trigger_compaction) { try_trigger_compaction(cg); } @@ -2055,6 +2057,10 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrregister_sstable(sst); + } + co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> { const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name()); if (this_table_name == handler.get("table_name")) { @@ -2270,6 +2276,11 @@ void table::rebuild_statistics() { }); } +void table::rebuild_large_data_index() { + auto& sstables = *get_sstables(); + _large_data_guardrail->rebuild(sstables); +} + void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept { _stats.live_disk_space_used -= cg.live_disk_space_used_full_stats(); _stats.total_disk_space_used -= cg.total_disk_space_used_full_stats(); @@ -2442,6 +2453,8 @@ compaction_group::merge_sstables_from(compaction_group& group) { backlog_tracker_adjust_charges({}, sstables_to_merge); }); _t.rebuild_statistics(); + // No large data index update needed: SSTables just moved between + // compaction groups within the same table, and the index is table-wide. } future<> @@ -2569,6 +2582,9 @@ compaction_group::update_sstable_sets_on_compaction_completion(compaction::compa cache.refresh_snapshot(); _t.rebuild_statistics(); + for (auto& sst : desc.new_sstables) { + _t._large_data_guardrail->register_sstable(sst); + } co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc))); } @@ -3269,6 +3285,7 @@ table::table(schema_ptr schema, config config, lw_shared_ptras_data_dictionary()) , _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name())) , _counter_cell_locks(_schema->is_counter() ? std::make_unique(_schema, cl_stats) : nullptr) + , _large_data_guardrail(make_large_data_guardrail()) , _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name())) , _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name())) , _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name())) @@ -4723,6 +4740,17 @@ db::commitlog* table::commitlog() const { return _commitlog; } +shared_ptr table::make_large_data_guardrail() const { + bool enabled = _schema->large_data_guardrails_enabled() + && !is_internal_keyspace(_schema->ks_name()) + && !_schema->is_view() + && !cdc::is_log_name(_schema->cf_name()); + if (enabled) { + return make_shared(_config.guardrail_config); + } + return db::noop_large_data_guardrail::instance(); +} + void table::set_schema(schema_ptr s) { SCYLLA_ASSERT(s->is_counter() == _schema->is_counter()); tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}", @@ -4744,6 +4772,8 @@ void table::set_schema(schema_ptr s) { _logstor_index->set_schema(s); } _schema = std::move(s); + _large_data_guardrail = make_large_data_guardrail(); + _large_data_guardrail->rebuild(*get_sstables()); for (auto&& v : _views) { v->view_info()->reset_view_info(); @@ -4983,7 +5013,8 @@ future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::t template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&); -future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) { +future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point timeout, shared_ptr guardrails) { if (_virtual_writer) [[unlikely]] { return (*_virtual_writer)(m); } @@ -4996,12 +5027,12 @@ future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_hand return _logstor->write(m.unfreeze(m_schema), cg, std::move(ss_holder)); } - return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable { - do_apply(cg, std::move(h), m, m_schema); + return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder), guardrails = std::move(guardrails)]() mutable { + do_apply(cg, std::move(h), m, m_schema, *guardrails); }, timeout); } -template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&); +template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&, const db::large_data_guardrail_base&); future<> write_memtable_to_sstable(mutation_reader reader, From 0201c1530ee4a6bf2f19bda1418f2effaaf22b43 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 15:24:41 +0200 Subject: [PATCH 11/16] test/cluster: add large data guardrails rolling upgrade test Simulated rolling upgrade: start a 2-node cluster where one node suppresses the LARGE_DATA_GUARDRAILS feature, verify that enabling guardrails is rejected, then upgrade the old node and verify that enabling guardrails succeeds. --- .../cluster/test_large_partition_guardrail.py | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 test/cluster/test_large_partition_guardrail.py diff --git a/test/cluster/test_large_partition_guardrail.py b/test/cluster/test_large_partition_guardrail.py new file mode 100644 index 0000000000..f0edcf9e27 --- /dev/null +++ b/test/cluster/test_large_partition_guardrail.py @@ -0,0 +1,102 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 +# + +import logging +import pytest +import time +import asyncio + +from cassandra import WriteFailure +from cassandra.protocol import ConfigurationException + +from test.pylib.manager_client import ManagerClient + +logger = logging.getLogger(__name__) + +REJECT_MSG_RE = "(?i)Write rejected.*(partition size|partition row count)" + + +async def _make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes): + """Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`.""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + for ck in range(num_rows): + await cql.run_async(insert, [pk, ck, bytes(value_size_bytes)]) + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_large_data_guardrails_rolling_upgrade(manager: ManagerClient): + """Verify that large_data_guardrails_enabled is properly gated by the + LARGE_DATA_GUARDRAILS cluster feature during a simulated rolling upgrade: + + 1. Start a 2-node cluster where one node suppresses the feature. + 2. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true + is rejected (even on the upgraded node). + 3. "Upgrade" the old node (remove suppress_features, restart). + 4. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true + now succeeds. + """ + cfg_base = { + "compaction_large_partition_warning_threshold_mb": 1, + "large_partition_fail_threshold_mb": 1, + } + cfg_old = { + **cfg_base, + "error_injections_at_startup": [ + {"name": "suppress_features", "value": "LARGE_DATA_GUARDRAILS"}, + ], + } + + servers = [] + servers.append(await manager.server_add(config=cfg_old)) # "old" node + servers.append(await manager.server_add(config=cfg_base)) # "new" node + + # Connect to the "new" node — it supports the feature but the cluster + # as a whole does not (because the old node doesn't advertise it). + cql = await manager.get_cql_exclusive(servers[1]) + + await cql.run_async( + "CREATE KEYSPACE ks_upgrade_test WITH REPLICATION = " + "{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" + ) + + # Feature not yet cluster-wide — enabling guardrails must be rejected. + with pytest.raises(ConfigurationException, match="cannot be used until all nodes"): + await cql.run_async( + "CREATE TABLE ks_upgrade_test.tbl1 " + "(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) " + "WITH large_data_guardrails_enabled = true" + ) + + # "Upgrade" the old node: remove the injection, restart. + await manager.server_stop_gracefully(servers[0].server_id) + await manager.server_remove_config_option(servers[0].server_id, "error_injections_at_startup") + await manager.server_start(servers[0].server_id) + + # Wait for the upgraded node to realize the feature is now cluster-wide. + timeout = time.time() + 60 + success = False + while not success and time.time() < timeout: + try: + await cql.run_async( + "CREATE TABLE ks_upgrade_test.tbl1 " + "(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) " + "WITH large_data_guardrails_enabled = true" + ) + success = True + except ConfigurationException as e: + assert "cannot be used until all nodes" in str(e) + await asyncio.sleep(0.5) + assert success, "Feature was not enabled cluster-wide within timeout" + + # Verify the guardrail actually works on the newly-created table. + await _make_oversized_partition(cql, "ks_upgrade_test.tbl1", pk=1, num_rows=2, value_size_bytes=600 * 1024) + # Flush on both servers — with RF=1 we don't know which owns pk=1. + for srv in servers: + await manager.api.keyspace_flush(srv.ip_addr, "ks_upgrade_test", "tbl1") + insert = cql.prepare("INSERT INTO ks_upgrade_test.tbl1 (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_MSG_RE): + await cql.run_async(insert, [1, 99, b"\x00"]) From ff84b1dbc45fa7ae9b3c723dceeb3ffc3858e40a Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 13 May 2026 20:36:47 +0200 Subject: [PATCH 12/16] test/boost: add large_data_guardrail unit tests 8 tests covering the record_compare template comparator, intrusive multiset equal_range grouping with heterogeneous lookup_key, and auto_unlink on record destruction. --- configure.py | 1 + test/boost/CMakeLists.txt | 1 + test/boost/large_data_guardrail_test.cc | 233 ++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 test/boost/large_data_guardrail_test.cc diff --git a/configure.py b/configure.py index 80841f95bf..6e094042aa 100755 --- a/configure.py +++ b/configure.py @@ -1694,6 +1694,7 @@ deps['test/boost/combined_tests'] += [ 'test/boost/group0_voter_calculator_test.cc', 'test/boost/index_with_paging_test.cc', 'test/boost/json_cql_query_test.cc', + 'test/boost/large_data_guardrail_test.cc', 'test/boost/large_paging_state_test.cc', 'test/boost/loading_cache_test.cc', 'test/boost/memtable_test.cc', diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 8c42a09293..8df5178524 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -349,6 +349,7 @@ add_scylla_test(combined_tests group0_voter_calculator_test.cc index_with_paging_test.cc json_cql_query_test.cc + large_data_guardrail_test.cc large_paging_state_test.cc loading_cache_test.cc memtable_test.cc diff --git a/test/boost/large_data_guardrail_test.cc b/test/boost/large_data_guardrail_test.cc new file mode 100644 index 0000000000..d41d448deb --- /dev/null +++ b/test/boost/large_data_guardrail_test.cc @@ -0,0 +1,233 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#include +#include +#include "db/large_data_handler.hh" + +using sstables::large_data_record; +using sstables::large_data_type; +using db::lookup_key; +using db::record_type; +using db::record_compare; +using db::record_set; + +namespace { + +large_data_record& make_record(std::deque& store, + large_data_type type, bytes pk, bytes ck = {}, bytes col = {}, + uint64_t value = 0, uint64_t elements_count = 0) { + store.emplace_back(); + auto& rec = store.back(); + rec.type = type; + rec.partition_key.value = std::move(pk); + rec.clustering_key.value = std::move(ck); + rec.column_name.value = std::move(col); + rec.value = value; + rec.elements_count = elements_count; + rec.range_tombstones = 0; + rec.dead_rows = 0; + return rec; +} + +} // anonymous namespace + +BOOST_AUTO_TEST_SUITE(large_data_guardrail_test) + +BOOST_AUTO_TEST_CASE(test_partition_comparator_orders_by_pk_only) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::partition_size, to_bytes("aaa")); + auto& b = make_record(store, large_data_type::partition_size, to_bytes("bbb")); + + BOOST_REQUIRE(cmp(a, b)); + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(!cmp(a, a)); + + // Clustering key differences are ignored at partition level + auto& c = make_record(store, large_data_type::partition_size, + to_bytes("aaa"), to_bytes("zzz")); + BOOST_REQUIRE(!cmp(a, c)); + BOOST_REQUIRE(!cmp(c, a)); +} + +BOOST_AUTO_TEST_CASE(test_row_comparator_orders_by_pk_then_ck) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_a")); + auto& b = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_b")); + auto& c = make_record(store, large_data_type::row_size, + to_bytes("pk2"), to_bytes("ck_a")); + + BOOST_REQUIRE(cmp(a, b)); // same pk, ck_a < ck_b + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(cmp(a, c)); // pk1 < pk2 + BOOST_REQUIRE(cmp(b, c)); // pk1 < pk2 regardless of ck + + // Column name differences are ignored at row level + auto& d = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_a"), to_bytes("col_z")); + BOOST_REQUIRE(!cmp(a, d)); + BOOST_REQUIRE(!cmp(d, a)); +} + +BOOST_AUTO_TEST_CASE(test_collection_comparator_orders_by_pk_ck_col) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a")); + auto& b = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_b")); + + BOOST_REQUIRE(cmp(a, b)); + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(!cmp(a, a)); + + // Same pk+ck+col → equivalent + auto& c = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a")); + BOOST_REQUIRE(!cmp(a, c)); + BOOST_REQUIRE(!cmp(c, a)); +} + +BOOST_AUTO_TEST_CASE(test_heterogeneous_lookup_with_lookup_key) { + record_compare cmp; + std::deque store; + + auto& rec = make_record(store, large_data_type::partition_size, to_bytes("pk1")); + auto pk = to_bytes("pk1"); + lookup_key lk{bytes_view(pk), {}, {}}; + + // (record, key) and (key, record) must be consistent + BOOST_REQUIRE(!cmp(rec, lk)); + BOOST_REQUIRE(!cmp(lk, rec)); + + auto pk2 = to_bytes("pk2"); + lookup_key lk2{bytes_view(pk2), {}, {}}; + BOOST_REQUIRE(cmp(rec, lk2)); // pk1 < pk2 + BOOST_REQUIRE(!cmp(lk2, rec)); // pk2 > pk1 +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_partition_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::partition_size, + to_bytes("pk_a"), {}, {}, 100); + auto& r2 = make_record(store, large_data_type::partition_size, + to_bytes("pk_b"), {}, {}, 200); + auto& r3 = make_record(store, large_data_type::rows_in_partition, + to_bytes("pk_a"), {}, {}, 0, 50); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + // pk_a should match r1 (partition_size) and r3 (rows_in_partition) + auto pk_a = to_bytes("pk_a"); + lookup_key lk{bytes_view(pk_a), {}, {}}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_size = 0, max_rows = 0; + int count = 0; + for (auto it = begin; it != end; ++it) { + max_size = std::max(max_size, it->value); + max_rows = std::max(max_rows, it->elements_count); + ++count; + } + BOOST_REQUIRE_EQUAL(count, 2); + BOOST_REQUIRE_EQUAL(max_size, 100); + BOOST_REQUIRE_EQUAL(max_rows, 50); + + // pk_c: no match + auto pk_c = to_bytes("pk_c"); + lookup_key lk_miss{bytes_view(pk_c), {}, {}}; + auto [b2, e2] = set.equal_range(lk_miss, set.key_comp()); + BOOST_REQUIRE(b2 == e2); +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_row_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_a"), {}, 100); + auto& r2 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_a"), {}, 300); + auto& r3 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_b"), {}, 200); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + auto pk = to_bytes("pk"); + auto ck_a = to_bytes("ck_a"); + lookup_key lk{bytes_view(pk), bytes_view(ck_a), {}}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_size = 0; + int count = 0; + for (auto it = begin; it != end; ++it) { + max_size = std::max(max_size, it->value); + ++count; + } + BOOST_REQUIRE_EQUAL(count, 2); // r1 and r2 + BOOST_REQUIRE_EQUAL(max_size, 300); +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_collection_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 100); + auto& r2 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 500); + auto& r3 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("other"), 0, 200); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + auto pk = to_bytes("pk"); + auto ck = to_bytes("ck"); + auto col = to_bytes("col"); + lookup_key lk{bytes_view(pk), bytes_view(ck), bytes_view(col)}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_count = 0; + int n = 0; + for (auto it = begin; it != end; ++it) { + max_count = std::max(max_count, it->elements_count); + ++n; + } + BOOST_REQUIRE_EQUAL(n, 2); // r1 and r2 + BOOST_REQUIRE_EQUAL(max_count, 500); +} + +BOOST_AUTO_TEST_CASE(test_auto_unlink_on_record_destruction) { + record_set set; + { + std::deque store; + auto& r1 = make_record(store, large_data_type::partition_size, + to_bytes("pk_a"), {}, {}, 100); + auto& r2 = make_record(store, large_data_type::partition_size, + to_bytes("pk_b"), {}, {}, 200); + set.insert(r1); + set.insert(r2); + BOOST_REQUIRE(!set.empty()); + } + // store destroyed → records destroyed → auto_unlink fires + BOOST_REQUIRE(set.empty()); +} + +BOOST_AUTO_TEST_SUITE_END() From 67b659e2bf2faf86821c6432a8e6e66cf5a66e72 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Fri, 29 May 2026 12:34:36 +0200 Subject: [PATCH 13/16] test/cqlpy: add large partition guardrail tests Tests for partition size and row-count guardrails: hard-limit rejection, disabled-when-zero, soft-limit log warnings, and no-warning below threshold. Includes shared helpers and log assertion utilities used by subsequent commits. --- test/cqlpy/test_large_data_guardrail.py | 228 ++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 test/cqlpy/test_large_data_guardrail.py diff --git a/test/cqlpy/test_large_data_guardrail.py b/test/cqlpy/test_large_data_guardrail.py new file mode 100644 index 0000000000..84a0743507 --- /dev/null +++ b/test/cqlpy/test_large_data_guardrail.py @@ -0,0 +1,228 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 +# + +# Tests for the large-data guardrail: write-path rejection and warnings for +# partitions, rows, and collections that exceed configured thresholds. + +import pytest +import re +import time +import io +from contextlib import ExitStack +from cassandra import WriteFailure + +from .util import new_test_table, config_value_context +from . import nodetool +from .test_logs import logfile, logfile_path, wait_for_log + +# All tests are Scylla-only (guardrails don't exist in Cassandra). +@pytest.fixture(scope="function", autouse=True) +def all_tests_are_scylla_only(scylla_only): + pass + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +REJECT_PARTITION_RE = "(?i)Write rejected.*(partition size|partition row count)" +REJECT_ROW_RE = "(?i)Write rejected.*row size" +REJECT_COLLECTION_RE = "(?i)Write rejected.*collection" +WARN_RE = "Large data guardrail" + + +def make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes): + """Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`.""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + for ck in range(num_rows): + cql.execute(insert, [pk, ck, bytes(value_size_bytes)]) + + +def make_large_row(cql, tbl, pk, ck, value_size_bytes): + """Insert a single row with a blob of `value_size_bytes` under (pk, ck).""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [pk, ck, bytes(value_size_bytes)]) + + +def build_large_collection(cql, tbl, pk, ck, num_elements): + """Build a set with `num_elements` entries under (pk, ck).""" + update = cql.prepare(f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(num_elements): + cql.execute(update, [{i}, pk, ck]) + + +def assert_no_log(logfile, pattern, after_action, timeout=0.5): + """Run `after_action`, then verify `pattern` does NOT appear in the log.""" + logfile.seek(0, io.SEEK_END) + after_action() + time.sleep(timeout) + new_content = logfile.read() + assert not re.search(pattern, new_content), \ + f"Expected no match for '{pattern}', got: {new_content}" + + +def wait_for_all_logs(logfile, patterns, timeout=5): + """Wait until every regex in `patterns` has appeared in the log.""" + contents = logfile.read() + remaining = set(patterns) + remaining = {p for p in remaining if not re.search(p, contents)} + if not remaining: + return + end = time.time() + timeout + while remaining and time.time() < end: + s = logfile.read() + if s: + contents += s + remaining = {p for p in remaining if not re.search(p, contents)} + if not remaining: + return + time.sleep(0.1) + pytest.fail(f'Timed out ({timeout}s) waiting for patterns: {remaining}') + + +# --------------------------------------------------------------------------- +# Partition size threshold +# --------------------------------------------------------------------------- + + +def test_partition_size_rejects_after_flush(cql, test_keyspace, logfile): + """A partition whose on-disk size exceeds the hard limit must be rejected + after flush.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + # Different partition still succeeds. + cql.execute(insert, [2, 0, b"\x00"]) + + +def test_partition_size_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '0')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Partition row-count threshold +# --------------------------------------------------------------------------- + + +def test_partition_rows_rejects_after_flush(cql, test_keyspace): + """A partition exceeding rows_count_fail_threshold must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_rows_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '50')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=60, + value_size_bytes=8) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 999, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — partition +# --------------------------------------------------------------------------- + + +def test_partition_size_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A partition above the detection threshold but below the hard limit must + produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_partition_rows_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A partition above the row-count detection threshold but below the hard + limit must produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_rows_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '1000')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=60, + value_size_bytes=8) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 999, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_partition_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small partition must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, b"\x00"])) From 19a9e45da804388e3d2303c45b3961d1cb061772 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Fri, 29 May 2026 12:35:13 +0200 Subject: [PATCH 14/16] test/cqlpy: add large row guardrail tests Tests for row-size guardrail: hard-limit rejection, disabled-when-zero, soft-limit log warning, and no-warning below threshold. --- test/cqlpy/test_large_data_guardrail.py | 95 +++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/test/cqlpy/test_large_data_guardrail.py b/test/cqlpy/test_large_data_guardrail.py index 84a0743507..28c2d1b05f 100644 --- a/test/cqlpy/test_large_data_guardrail.py +++ b/test/cqlpy/test_large_data_guardrail.py @@ -226,3 +226,98 @@ def test_partition_no_warning_below_soft_limit(cql, test_keyspace, logfile): assert_no_log(logfile, WARN_RE, lambda: cql.execute(insert, [1, 1, b"\x00"])) + + +# --------------------------------------------------------------------------- +# Row size threshold +# --------------------------------------------------------------------------- + + +def test_row_size_rejects_after_flush(cql, test_keyspace): + """A row whose on-disk size exceeds the hard limit must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_ROW_RE): + cql.execute(insert, [1, 0, b"\x00"]) + + # Different CK succeeds. + cql.execute(insert, [1, 99, b"\x00"]) + # Different partition succeeds. + cql.execute(insert, [2, 0, b"\x00"]) + + +def test_row_size_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '0')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — row +# --------------------------------------------------------------------------- + + +def test_row_size_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A row above the detection threshold but below the hard limit must + produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_row_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small row must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, b"\x00"])) From 7d365844a3e46011eca92d8ce26fb01a40b454da Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Fri, 29 May 2026 12:36:12 +0200 Subject: [PATCH 15/16] test/cqlpy: add large collection guardrail tests Tests for collection element-count guardrail: hard-limit rejection, disabled-when-zero, soft-limit log warning, and no-warning below threshold. --- test/cqlpy/test_large_data_guardrail.py | 100 ++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/test/cqlpy/test_large_data_guardrail.py b/test/cqlpy/test_large_data_guardrail.py index 28c2d1b05f..8ace66ca68 100644 --- a/test/cqlpy/test_large_data_guardrail.py +++ b/test/cqlpy/test_large_data_guardrail.py @@ -321,3 +321,103 @@ def test_row_no_warning_below_soft_limit(cql, test_keyspace, logfile): assert_no_log(logfile, WARN_RE, lambda: cql.execute(insert, [1, 1, b"\x00"])) + + +# --------------------------------------------------------------------------- +# Collection element count threshold +# --------------------------------------------------------------------------- + + +def test_collection_rejects_after_flush(cql, test_keyspace): + """A collection exceeding the element-count hard limit must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '50')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_COLLECTION_RE): + cql.execute(insert, [1, 0, {0}]) + + # Different CK succeeds. + cql.execute(insert, [1, 99, {0}]) + # Different partition succeeds. + cql.execute(insert, [2, 0, {0}]) + + +def test_collection_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '0')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {0}]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — collection +# --------------------------------------------------------------------------- + + +def test_collection_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A collection above the detection threshold but below the hard limit + must produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '10000')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {0}]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_collection_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small collection must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '10000')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {1, 2, 3}]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, {4, 5}])) From 9abf5943974e930b40b783040c76985b67a075a5 Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Fri, 29 May 2026 12:37:31 +0200 Subject: [PATCH 16/16] test/cqlpy: add per-table toggle, LWT exemption, and multi-category tests Per-table toggle: disabled-at-create, alter-disable, alter-reenable. LWT exemption: Paxos learn must bypass the guardrail. Multi-category independence: all three guardrails warn/reject independently when SSTable records span partition, row, and collection categories. --- test/cqlpy/test_large_data_guardrail.py | 215 ++++++++++++++++++++++++ 1 file changed, 215 insertions(+) diff --git a/test/cqlpy/test_large_data_guardrail.py b/test/cqlpy/test_large_data_guardrail.py index 8ace66ca68..d7ac382ba2 100644 --- a/test/cqlpy/test_large_data_guardrail.py +++ b/test/cqlpy/test_large_data_guardrail.py @@ -421,3 +421,218 @@ def test_collection_no_warning_below_soft_limit(cql, test_keyspace, logfile): assert_no_log(logfile, WARN_RE, lambda: cql.execute(insert, [1, 1, {4, 5}])) + + +# --------------------------------------------------------------------------- +# Per-table enable/disable toggle +# --------------------------------------------------------------------------- + + +def test_per_table_disabled_at_create(cql, test_keyspace): + """A table created with guardrails disabled must never reject writes.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = false") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + +def test_per_table_alter_disable(cql, test_keyspace): + """ALTER TABLE ... WITH large_data_guardrails_enabled = false must stop + rejection.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = false") + cql.execute(insert, [1, 99, b"\x00"]) + + +def test_per_table_alter_reenable(cql, test_keyspace): + """After disabling then re-enabling guardrails, writes to a large + partition must be rejected again.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = false") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = true") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 100, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Exemptions +# --------------------------------------------------------------------------- + + +def test_lwt_paxos_learn_is_exempt(cql, test_keyspace): + """LWT (Paxos) commit writes must bypass the guardrail.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + # Non-LWT write is rejected. + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + # LWT write succeeds (Paxos learn bypasses guardrail). + lwt = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?) IF NOT EXISTS") + cql.execute(lwt, [1, 100, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Multi-category: all guardrails fire independently +# --------------------------------------------------------------------------- + + +def test_all_guardrails_warn_independently(cql, test_keyspace, logfile): + """When an SSTable has records in all three categories, each guardrail + must warn independently (fail=0 means warn-only).""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '0')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '0')) + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '0')) + + schema = "pk int, ck int, v blob, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + # Build data large in all three dimensions under pk=1, ck=0. + insert_v = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert_v, [1, 0, bytes(1200 * 1024)]) + update_s = cql.prepare( + f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(60): + cql.execute(update_s, [{i}, 1, 0]) + + nodetool.flush(cql, tbl) + + insert_both = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v, s) VALUES (?, ?, ?, ?)") + cql.execute(insert_both, [1, 0, b"\x00", {999}]) + + wait_for_all_logs(logfile, [ + r"partition size.*exceeds", + r"row size.*exceeds", + r"collection element count.*exceeds", + ], timeout=5) + + +def test_each_guardrail_rejects_independently(cql, test_keyspace): + """Each hard limit must reject independently, with separate PKs isolating + each category.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '2')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '0')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '50')) + + schema = "pk int, ck int, v blob, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert_v = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + + # pk=1: 20 rows x 150 KB ~ 3 MB partition, each row < 1 MB. + for ck in range(20): + cql.execute(insert_v, [1, ck, bytes(150 * 1024)]) + + # pk=2: single 1.2 MB row; partition total < 2 MB. + cql.execute(insert_v, [2, 0, bytes(1200 * 1024)]) + + # pk=3: 60-element collection; row and partition tiny. + update_s = cql.prepare( + f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(60): + cql.execute(update_s, [{i}, 3, 0]) + + nodetool.flush(cql, tbl) + + # Partition rejection. + with pytest.raises(WriteFailure, match="(?i)partition size.*exceeds"): + cql.execute(insert_v, [1, 99, b"\x00"]) + + # Row rejection. + with pytest.raises(WriteFailure, match="(?i)row size.*exceeds"): + cql.execute(insert_v, [2, 0, b"\x00"]) + # Different CK in pk=2 succeeds. + cql.execute(insert_v, [2, 99, b"\x00"]) + + # Collection rejection. + insert_s = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, + match="(?i)collection element count.*exceeds"): + cql.execute(insert_s, [3, 0, {999}]) + # Writing without the collection column succeeds. + cql.execute(insert_v, [3, 0, b"\x00"]) + # Different CK in pk=3 succeeds. + cql.execute(insert_s, [3, 99, {999}])