diff --git a/conf/scylla.yaml b/conf/scylla.yaml index be7554d4c2..1a6b071887 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -425,18 +425,34 @@ column_index_size_in_kb: 1 # Log a warning when writing partitions larger than this value # compaction_large_partition_warning_threshold_mb: 1000 +# Reject writes to partitions whose on-disk size exceeds this threshold (MB). +# Set to 0 to disable. +large_partition_fail_threshold_mb: 2000 + # Log a warning when writing rows larger than this value # compaction_large_row_warning_threshold_mb: 10 +# Reject writes to rows whose on-disk size exceeds this threshold (MB). +# Set to 0 to disable. +large_row_fail_threshold_mb: 20 + # Log a warning when writing cells larger than this value # compaction_large_cell_warning_threshold_mb: 1 # Log a warning when row number is larger than this value # compaction_rows_count_warning_threshold: 100000 +# Reject writes to partitions whose on-disk row count exceeds this threshold. +# Set to 0 to disable. +rows_count_fail_threshold: 200000 + # Log a warning when writing a collection containing more elements than this value # compaction_collection_elements_count_warning_threshold: 10000 +# Reject writes to collections whose element count exceeds this threshold. +# Set to 0 to disable. +large_collection_elements_fail_threshold: 20000 + # How long the coordinator should wait for seq or index scans to complete # range_request_timeout_in_ms: 10000 # How long the coordinator should wait for writes to complete diff --git a/configure.py b/configure.py index 2ff2628506..27c01e4717 100755 --- a/configure.py +++ b/configure.py @@ -1701,6 +1701,7 @@ deps['test/boost/combined_tests'] += [ 'test/boost/group0_voter_calculator_test.cc', 'test/boost/index_with_paging_test.cc', 'test/boost/json_cql_query_test.cc', + 'test/boost/large_data_guardrail_test.cc', 'test/boost/large_paging_state_test.cc', 'test/boost/loading_cache_test.cc', 'test/boost/memtable_test.cc', diff --git a/cql3/statements/cf_prop_defs.cc b/cql3/statements/cf_prop_defs.cc index fb055de7d9..5d336c51a9 100644 --- a/cql3/statements/cf_prop_defs.cc +++ b/cql3/statements/cf_prop_defs.cc @@ -60,6 +60,7 @@ const sstring cf_prop_defs::COMPACTION_ENABLED_KEY = "enabled"; const sstring cf_prop_defs::KW_TABLETS = "tablets"; const sstring cf_prop_defs::KW_STORAGE_ENGINE = "storage_engine"; +const sstring cf_prop_defs::KW_LARGE_DATA_GUARDRAILS_ENABLED = "large_data_guardrails_enabled"; schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions& exts) const { schema::extensions_map er; @@ -109,6 +110,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS, KW_SYNCHRONOUS_UPDATES, KW_TABLETS, KW_STORAGE_ENGINE, + KW_LARGE_DATA_GUARDRAILS_ENABLED, }); static std::set obsolete_keywords({ sstring("index_interval"), @@ -210,6 +212,12 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name, throw exceptions::configuration_exception(format("Illegal value for '{}'", KW_STORAGE_ENGINE)); } } + + if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED) && get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false)) { + if (!db.features().large_data_guardrails) { + throw exceptions::configuration_exception("large_data_guardrails_enabled cannot be used until all nodes in the cluster enable this feature"); + } + } } std::map cf_prop_defs::get_compaction_type_options() const { @@ -417,6 +425,9 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_ builder.set_logstor(); } } + if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED)) { + builder.set_large_data_guardrails_enabled(get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false)); + } } void cf_prop_defs::validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const diff --git a/cql3/statements/cf_prop_defs.hh b/cql3/statements/cf_prop_defs.hh index b1d236b6d9..9157f755f0 100644 --- a/cql3/statements/cf_prop_defs.hh +++ b/cql3/statements/cf_prop_defs.hh @@ -65,6 +65,7 @@ public: static const sstring KW_TABLETS; static const sstring KW_STORAGE_ENGINE; + static const sstring KW_LARGE_DATA_GUARDRAILS_ENABLED; // FIXME: In origin the following consts are in CFMetaData. static constexpr int32_t DEFAULT_DEFAULT_TIME_TO_LIVE = 0; diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 3be1969d06..d4e4b41cb4 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -282,7 +282,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout); }); } else { - return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout); + return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout, db::noop_large_data_guardrail::instance()); } }).then_wrapped([s] (future<> f) { try { diff --git a/db/config.cc b/db/config.cc index e1cfd4644f..9f2e9f95bb 100644 --- a/db/config.cc +++ b/db/config.cc @@ -842,6 +842,18 @@ db::config::config(std::shared_ptr exts) "Log a warning when writing rows larger than this value.") , compaction_large_cell_warning_threshold_mb(this, "compaction_large_cell_warning_threshold_mb", liveness::LiveUpdate, value_status::Used, 1, "Log a warning when writing cells larger than this value.") + , large_partition_fail_threshold_mb(this, "large_partition_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") + , rows_count_fail_threshold(this, "rows_count_fail_threshold", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition whose on-disk row count already exceeds this threshold, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") + , large_row_fail_threshold_mb(this, "large_row_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition that contains any row whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") + , large_collection_elements_fail_threshold(this, "large_collection_elements_fail_threshold", liveness::LiveUpdate, value_status::Used, 0, + "Reject writes targeting a partition that contains any collection whose element count already exceeds this threshold, as recorded in any SSTable's large data records. " + "Set to 0 to disable.") , compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000, "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, diff --git a/db/config.hh b/db/config.hh index d62eb0586f..07dbda7e13 100644 --- a/db/config.hh +++ b/db/config.hh @@ -233,6 +233,10 @@ public: named_value compaction_large_partition_warning_threshold_mb; named_value compaction_large_row_warning_threshold_mb; named_value compaction_large_cell_warning_threshold_mb; + named_value large_partition_fail_threshold_mb; + named_value rows_count_fail_threshold; + named_value large_row_fail_threshold_mb; + named_value large_collection_elements_fail_threshold; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; named_value compaction_large_data_records_per_sstable; diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index d3b3e4aaf2..c4ed36edb6 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -13,6 +13,8 @@ #include "db/system_keyspace.hh" #include "db/large_data_handler.hh" #include "keys/keys.hh" +#include "mutation/mutation_partition.hh" +#include "replica/exceptions.hh" #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "cql3/untyped_result_set.hh" @@ -21,6 +23,10 @@ static logging::logger large_data_logger("large_data"); namespace db { +namespace { +constexpr uint64_t MB = 1024 * 1024; +} + nop_large_data_handler::nop_large_data_handler() : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max()) { @@ -76,6 +82,203 @@ future<> large_data_handler::unplug_system_keyspace() noexcept { co_await _sys_ks.unplug(); } +void large_data_record_index::register_sstable(sstables::shared_sstable sst) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + return; + } + for (auto& rec : records_opt->elements) { + switch (rec.type) { + case sstables::large_data_type::partition_size: + case sstables::large_data_type::rows_in_partition: + _partitions.insert(rec); + break; + case sstables::large_data_type::row_size: + _rows.insert(rec); + break; + case sstables::large_data_type::elements_in_collection: + _collections.insert(rec); + break; + default: + break; + } + } +} + +void large_data_record_index::rebuild( + const std::unordered_set& sstables) { + _partitions.clear(); + _rows.clear(); + _collections.clear(); + for (const auto& sst : sstables) { + register_sstable(sst); + } +} + +std::optional +large_data_record_index::lookup_partition(bytes_view pk_bytes) const { + lookup_key lk{pk_bytes, {}, {}}; + auto [begin, end] = _partitions.equal_range(lk, _partitions.key_comp()); + if (begin == end) { + return std::nullopt; + } + partition_entry result; + for (auto it = begin; it != end; ++it) { + result.partition_size = std::max(result.partition_size, it->value); + result.rows = std::max(result.rows, it->elements_count); + } + return result; +} + +std::optional large_data_record_index::lookup_row(bytes_view pk_bytes, + bytes_view ck_bytes) const { + lookup_key lk{pk_bytes, ck_bytes, {}}; + auto [begin, end] = _rows.equal_range(lk, _rows.key_comp()); + if (begin == end) { + return std::nullopt; + } + uint64_t result = 0; + for (auto it = begin; it != end; ++it) { + result = std::max(result, it->value); + } + return result; +} + +std::optional large_data_record_index::lookup_collection(bytes_view pk_bytes, + bytes_view ck_bytes, bytes_view column_name) const { + lookup_key lk{pk_bytes, ck_bytes, column_name}; + auto [begin, end] = _collections.equal_range(lk, _collections.key_comp()); + if (begin == end) { + return std::nullopt; + } + uint64_t result = 0; + for (auto it = begin; it != end; ++it) { + result = std::max(result, it->elements_count); + } + return result; +} + +void large_data_guardrail::register_sstable(sstables::shared_sstable sst) { + _index.register_sstable(std::move(sst)); +} + +void large_data_guardrail::rebuild(const std::unordered_set& sstables) { + _index.rebuild(sstables); +} + +void large_data_guardrail::check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const { + auto sst_key = sstables::key::from_partition_key(s, pk); + auto pk_bytes = bytes_view(sst_key.get_bytes()); + check_partition(s, pk_bytes, pk); + check_rows_and_collections(s, pk_bytes, mp, pk); +} + +void large_data_guardrail::check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const { + const uint64_t size_fail = uint64_t(_cfg.partition_size_fail_threshold_mb()) * MB; + const uint64_t size_warn = uint64_t(_cfg.partition_size_warn_threshold_mb()) * MB; + const uint64_t rows_fail = uint64_t(_cfg.rows_count_fail_threshold()); + const uint64_t rows_warn = uint64_t(_cfg.rows_count_warn_threshold()); + + auto entry = _index.lookup_partition(pk_bytes); + if (!entry) [[likely]] { + return; + } + if (size_fail > 0 && entry->partition_size >= size_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition size {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->partition_size >= size_warn) { + large_data_logger.warn("Large data guardrail: partition size {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_warn, s.ks_name(), s.cf_name(), pk); + } + if (rows_fail > 0 && entry->rows >= rows_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition row count {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->rows >= rows_warn) { + large_data_logger.warn("Large data guardrail: partition row count {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_warn, s.ks_name(), s.cf_name(), pk); + } +} + +void large_data_guardrail::check_rows_and_collections(const schema& s, bytes_view pk_bytes, + const mutation_partition& mp, partition_key_view pk) const { + + if (!mp.static_row().empty()) { + check_row_size(s, pk_bytes, pk, bytes_view(), nullptr); + mp.static_row().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.static_column_at(id), bytes_view(), nullptr); + }); + } + + for (const auto& cr : mp.non_dummy_rows()) { + auto ck_bytes = cr.key().view().representation().linearize(); + auto ck_bv = bytes_view(ck_bytes); + check_row_size(s, pk_bytes, pk, ck_bv, &cr.key()); + cr.row().cells().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.regular_column_at(id), ck_bv, &cr.key()); + }); + } +} + +void large_data_guardrail::check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const { + const uint64_t fail = uint64_t(_cfg.row_size_fail_threshold_mb()) * MB; + const uint64_t warn = uint64_t(_cfg.row_size_warn_threshold_mb()) * MB; + auto row_size = _index.lookup_row(pk_bytes, ck_bytes); + if (!row_size) [[likely]] { + return; + } + if (fail > 0 && *row_size >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: row size {} exceeds hard limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, fail, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*row_size >= warn) { + large_data_logger.warn("Large data guardrail: row size {} exceeds soft limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, warn, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + +void large_data_guardrail::check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const { + if (cdef.is_atomic()) { + return; + } + const uint64_t fail = uint64_t(_cfg.collection_elements_fail_threshold()); + const uint64_t warn = uint64_t(_cfg.collection_elements_warn_threshold()); + auto col_bytes = to_bytes(cdef.name_as_text()); + auto count = _index.lookup_collection(pk_bytes, ck_bytes, bytes_view(col_bytes)); + if (!count) [[likely]] { + return; + } + if (fail > 0 && *count >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: collection element count {} exceeds hard limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, fail, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*count >= warn) { + large_data_logger.warn("Large data guardrail: collection element count {} exceeds soft limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, warn, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + sstring large_data_handler::sst_filename(const sstables::sstable& sst) { return sst.component_basename(sstables::component_type::Data); } @@ -366,4 +569,5 @@ future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(co large_table_name, s.ks_name(), s.cf_name(), old_name, new_name, std::current_exception()); } } + } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index e0cc4baa6b..1530341523 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -8,11 +8,18 @@ #pragma once +#include #include +#include +#include +#include +#include "bytes.hh" #include "schema/schema_fwd.hh" #include "system_keyspace.hh" #include "sstables/shared_sstable.hh" +#include "sstables/types.hh" #include "utils/assert.hh" +#include "utils/hash.hh" #include "utils/updateable_value.hh" #include "utils/pluggable.hh" @@ -21,10 +28,149 @@ class sstable; class key; } +class partition_key_view; +class mutation_partition; + namespace db { class system_keyspace; +using sstables::large_data_record; + +struct lookup_key { + bytes_view pk; + bytes_view ck; + bytes_view column_name; +}; + +// Compile-time comparison depth, one per multiset. +// partition → pk only +// row → pk + ck +// cell → pk + ck + column_name +enum class record_type { partition, row, collection }; + +template +struct record_compare { +private: + static bytes_view get_pk(const large_data_record& r) noexcept { return bytes_view(r.partition_key.value); } + static bytes_view get_pk(const lookup_key& k) noexcept { return k.pk; } + static bytes_view get_ck(const large_data_record& r) noexcept { return bytes_view(r.clustering_key.value); } + static bytes_view get_ck(const lookup_key& k) noexcept { return k.ck; } + static bytes_view get_col(const large_data_record& r) noexcept { return bytes_view(r.column_name.value); } + static bytes_view get_col(const lookup_key& k) noexcept { return k.column_name; } +public: + template + requires (std::same_as || std::same_as) + bool operator()(const L& a, const R& b) const noexcept { + auto l_pk = get_pk(a), r_pk = get_pk(b); + if (l_pk != r_pk) { + return l_pk < r_pk; + } + if constexpr (Depth == record_type::partition) { + return false; + } + auto l_ck = get_ck(a), r_ck = get_ck(b); + if (l_ck != r_ck) { + return l_ck < r_ck; + } + if constexpr (Depth == record_type::row) { + return false; + } + return get_col(a) < get_col(b); + } +}; + +template +using record_set = boost::intrusive::multiset, + boost::intrusive::compare>, + boost::intrusive::constant_time_size>; + +// Per-table index over large_data_records from all live SSTables. +// Links directly into records stored in each SSTable's scylla_metadata +// via intrusive member hooks (auto_unlink). Aggregation (max across +// SSTables for the same key) happens at lookup time via equal_range. +class large_data_record_index { +public: + struct partition_entry { + uint64_t partition_size = 0; + uint64_t rows = 0; + }; + + void register_sstable(sstables::shared_sstable sst); + + void rebuild(const std::unordered_set& sstables); + + std::optional lookup_partition(bytes_view pk_bytes) const; + std::optional lookup_row(bytes_view pk_bytes, bytes_view ck_bytes) const; + std::optional lookup_collection(bytes_view pk_bytes, + bytes_view ck_bytes, bytes_view column_name) const; + +private: + record_set _partitions; + record_set _rows; + record_set _collections; +}; + +struct guardrail_config { + utils::updateable_value partition_size_fail_threshold_mb; + utils::updateable_value partition_size_warn_threshold_mb; + utils::updateable_value rows_count_fail_threshold; + utils::updateable_value rows_count_warn_threshold; + utils::updateable_value row_size_fail_threshold_mb; + utils::updateable_value row_size_warn_threshold_mb; + utils::updateable_value collection_elements_fail_threshold; + utils::updateable_value collection_elements_warn_threshold; +}; + +// Each replica::table holds a unique_ptr to either a real guardrail or a +// noop. The guardrail owns the per-table large_data_record_index, so +// noop tables pay no index-maintenance cost. +class large_data_guardrail_base { +public: + virtual ~large_data_guardrail_base() = default; + virtual void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const = 0; + virtual void register_sstable(sstables::shared_sstable sst) = 0; + virtual void rebuild(const std::unordered_set& sstables) = 0; +}; + +class noop_large_data_guardrail final : public large_data_guardrail_base { +public: + static shared_ptr instance() { + static thread_local auto inst = make_shared(); + return inst; + } + void check(const schema&, const mutation_partition&, + partition_key_view) const override {} + void register_sstable(sstables::shared_sstable) override {} + void rebuild(const std::unordered_set&) override {} +}; + +class large_data_guardrail final : public large_data_guardrail_base { +public: + explicit large_data_guardrail(guardrail_config cfg) noexcept + : _cfg(std::move(cfg)) {} + + void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const override; + void register_sstable(sstables::shared_sstable sst) override; + void rebuild(const std::unordered_set& sstables) override; + +private: + void check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const; + void check_rows_and_collections(const schema& s, bytes_view pk_bytes, const mutation_partition& mp, partition_key_view pk) const; + void check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const; + void check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const; + + guardrail_config _cfg; + large_data_record_index _index; +}; + class large_data_handler { public: struct stats { diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 7384c30aba..944a3b57ec 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -339,6 +339,7 @@ schema_ptr scylla_tables(schema_features features) { sb.with_column("tablets", map_type_impl::get_instance(utf8_type, utf8_type, false)); sb.with_column("storage_engine", utf8_type); + sb.with_column("large_data_guardrails_enabled", boolean_type); sb.with_hash_version(); s = sb.build(); @@ -1700,6 +1701,17 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times if (table->logstor_enabled()) { m.set_clustered_cell(ckey, "storage_engine", "logstor", timestamp); } + // Write the large_data_guardrails_enabled column only when enabled. + // When disabled (the default), omit the cell entirely so that old nodes + // that don't know this column can still read the SSTable during rolling + // upgrade or rollback. The CQL validation gate ensures that 'true' can + // only be set once all nodes support the column, so it is safe to write + // the live cell at that point. + if (table->large_data_guardrails_enabled()) { + auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled"); + m.set_clustered_cell(ckey, guardrails_cdef, + atomic_cell::make_live(*boolean_type, timestamp, boolean_type->decompose(true))); + } // In-memory tables are deprecated since scylla-2024.1.0 // FIXME: delete the column when there's no live version supporting it anymore. // Writing it here breaks upgrade rollback to versions that do not support the in_memory schema_feature @@ -1938,6 +1950,23 @@ utils::chunked_vector make_update_table_mutations(service::storage_pro utils::chunked_vector mutations; add_table_or_view_to_schema_mutation(new_table, timestamp, false, mutations); make_update_indices_mutations(sp, old_table, new_table, timestamp, mutations); + + // When large_data_guardrails_enabled transitions from true to false, + // write a tombstone to override the previous live cell in scylla_tables. + // make_scylla_tables_mutation only writes a live cell when enabled, so + // the tombstone must be added here where we have the old schema. + // This is safe because the CQL feature gate ensures all nodes are + // upgraded before the property can be set to true. + if (old_table->large_data_guardrails_enabled() && !new_table->large_data_guardrails_enabled()) { + schema_ptr s = tables(); + auto pkey = partition_key::from_singular(*s, new_table->ks_name()); + auto ckey = clustering_key::from_singular(*s, new_table->cf_name()); + mutation m(scylla_tables(), pkey); + auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled"); + m.set_clustered_cell(ckey, guardrails_cdef, atomic_cell::make_dead(timestamp, gc_clock::now())); + mutations.emplace_back(std::move(m)); + } + make_update_columns_mutations(std::move(old_table), std::move(new_table), timestamp, mutations); warn(unimplemented::cause::TRIGGERS); @@ -2197,6 +2226,8 @@ static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, sche throw std::invalid_argument(format("Invalid value for storage_engine: {}", *storage_engine)); } } + auto guardrails_enabled = table_row.get("large_data_guardrails_enabled"); + builder.set_large_data_guardrails_enabled(guardrails_enabled.value_or(false)); } schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional version) diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 3f2a2188c5..e0e777259e 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -180,6 +180,7 @@ public: gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv }; gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv }; gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv }; + gms::feature large_data_guardrails { *this, "LARGE_DATA_GUARDRAILS"sv }; gms::feature keyspace_multi_rf_change { *this, "KEYSPACE_MULTI_RF_CHANGE"sv }; gms::feature view_building_tasks_min_task_id { *this, "VIEW_BUILDING_TASKS_MIN_TASK_ID"sv }; public: diff --git a/idl/replica_exception.idl.hh b/idl/replica_exception.idl.hh index c22687ba6a..411cf40734 100644 --- a/idl/replica_exception.idl.hh +++ b/idl/replica_exception.idl.hh @@ -27,13 +27,20 @@ class critical_disk_utilization_exception { sstring failed_action(); }; +class large_data_exception { + sstring ks_name(); + sstring cf_name(); + sstring message(); +}; + struct exception_variant { std::variant reason; }; diff --git a/replica/database.cc b/replica/database.cc index 6be9a2298a..ec359e9685 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1626,6 +1626,16 @@ keyspace::make_column_family_config(const schema& s, const database& db) const { cfg.data_listeners = &db.data_listeners(); cfg.enable_compacting_data_for_streaming_and_repair = db_config.enable_compacting_data_for_streaming_and_repair; cfg.enable_tombstone_gc_for_streaming_and_repair = db_config.enable_tombstone_gc_for_streaming_and_repair; + cfg.guardrail_config = db::guardrail_config{ + .partition_size_fail_threshold_mb = db_config.large_partition_fail_threshold_mb, + .partition_size_warn_threshold_mb = db_config.compaction_large_partition_warning_threshold_mb, + .rows_count_fail_threshold = db_config.rows_count_fail_threshold, + .rows_count_warn_threshold = db_config.compaction_rows_count_warning_threshold, + .row_size_fail_threshold_mb = db_config.large_row_fail_threshold_mb, + .row_size_warn_threshold_mb = db_config.compaction_large_row_warning_threshold_mb, + .collection_elements_fail_threshold = db_config.large_collection_elements_fail_threshold, + .collection_elements_warn_threshold = db_config.compaction_collection_elements_count_warning_threshold, + }; return cfg; } @@ -2164,20 +2174,29 @@ std::vector memtable_list::clear_and_add() { return std::exchange(_memtables, std::move(new_memtables)); } -future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) { +future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point timeout, + shared_ptr guardrails) { auto& cf = find_column_family(m.column_family_id()); data_listeners().on_write(m_schema, m); if (m.representation().size() > 128*1024) { - return unfreeze_gently(m, std::move(m_schema)).then([&cf, h = std::move(h), timeout] (auto m) mutable { + // Big mutation: unfreeze_gently (yields), then check guardrails on + // the already-deserialized mutation before applying. + auto pk = m.key(); + return unfreeze_gently(m, std::move(m_schema)).then( + [&cf, h = std::move(h), timeout, guardrails, pk] (auto m) mutable { + guardrails->check(*m.schema(), m.partition(), pk); return do_with(std::move(m), [&cf, h = std::move(h), timeout] (auto& m) mutable { return cf.apply(m, std::move(h), timeout); }); }); } - return cf.apply(m, std::move(m_schema), std::move(h), timeout); + // Small mutation: forward guardrails to memtable::apply which will check + // after partition_builder deserializes — no redundant unfreeze. + return cf.apply(m, std::move(m_schema), std::move(h), timeout, std::move(guardrails)); } future<> database::apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&& h, db::timeout_clock::time_point timeout) { @@ -2358,13 +2377,14 @@ future<> database::do_apply_many(const utils::chunked_vector& m auto handles = co_await cl->add_entries(std::move(writers), timeout); // FIXME: Memtable application is not atomic so reads may observe mutations partially applied until restart. + auto noop = db::noop_large_data_guardrail::instance(); for (size_t i = 0; i < muts.size(); ++i) { auto s = local_schema_registry().get(muts[i].schema_version()); - co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout); + co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout, noop); } } -future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) { +future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { ++_stats->total_writes; // assume failure until proven otherwise auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; }); @@ -2393,6 +2413,10 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::return_exception(replica::critical_disk_utilization_exception{"rejected write mutation"}); } + auto guardrails = skip_large_data_guardrails + ? db::noop_large_data_guardrail::instance() + : cf.get_large_data_guardrail(); + if (!std::holds_alternative(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) { auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second(); auto& write_label = cf.get_rate_limiter_label_for_writes(); @@ -2451,7 +2475,8 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::exception(std::move(ex)); } } - auto f = co_await coroutine::as_future(this->apply_in_memory(m, s, std::move(h), timeout)); + auto f = co_await coroutine::as_future( + this->apply_in_memory(m, s, std::move(h), timeout, std::move(guardrails))); if (f.failed()) { auto ex = f.get_exception(); if (try_catch(ex)) { @@ -2515,7 +2540,7 @@ void database::update_write_metrics_for_rejected_writes() { ++_stats->total_writes_rejected_due_to_out_of_space_prevention; } -future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) { +future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { if (dblog.is_enabled(logging::log_level::trace)) { dblog.trace("apply {}", m.pretty_printer(s)); } @@ -2526,7 +2551,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_ if (!s->is_synced()) { on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } - return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info); + return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info, skip_large_data_guardrails); } future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) { @@ -2536,7 +2561,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t if (!s->is_synced()) { on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); } - return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}); + return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}, false); } keyspace::config diff --git a/replica/database.hh b/replica/database.hh index 5e8c999208..c76e88ab19 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -77,6 +77,7 @@ #include "replica/tables_metadata_lock.hh" #include "service/topology_guard.hh" #include "utils/disk_space_monitor.hh" +#include "db/large_data_handler.hh" class cell_locker; class cell_locker_stats; @@ -480,6 +481,7 @@ public: unsigned x_log2_compaction_groups{0}; utils::updateable_value enable_compacting_data_for_streaming_and_repair; utils::updateable_value enable_tombstone_gc_for_streaming_and_repair; + db::guardrail_config guardrail_config; }; using snapshot_details = db::snapshot_ctl::table_snapshot_details; @@ -553,6 +555,8 @@ private: std::unique_ptr _counter_cell_locks; // Memory-intensive; allocate only when needed. + shared_ptr _large_data_guardrail; + // Labels used to identify writes and reads for this table in the rate_limiter structure. db::rate_limiter::label _rate_limiter_label_for_writes; db::rate_limiter::label _rate_limiter_label_for_reads; @@ -1011,6 +1015,7 @@ public: [[gnu::always_inline]] bool uses_tablets() const; int64_t calculate_tablet_count() const; private: + shared_ptr make_large_data_guardrail() const; void update_tombstone_gc_rf_one(); future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg); @@ -1028,13 +1033,14 @@ public: // Applies given mutation to this column family // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) { - do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema); + do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema, *db::noop_large_data_guardrail::instance()); } void apply(const mutation& m, db::rp_handle&& h = {}) { do_apply(compaction_group_for_token(m.token()), std::move(h), m); } - future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point tmo); + future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point tmo, shared_ptr guardrails); future<> apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point tmo); // Returns at most "cmd.limit" rows @@ -1306,6 +1312,14 @@ public: return _index_manager; } + shared_ptr get_large_data_guardrail() const noexcept { + return _large_data_guardrail; + } + + // Rebuild from current SSTable set. Used at startup; after that + // the index is maintained incrementally via register_sstable. + void rebuild_large_data_index(); + sstables::sstables_manager& get_sstables_manager() noexcept { return _sstables_manager; } @@ -1705,7 +1719,8 @@ private: tracing::trace_state_ptr, db::timeout_clock::time_point, db::commitlog_force_sync, - db::per_partition_rate_limit::info> _apply_stage; + db::per_partition_rate_limit::info, + bool /* skip_large_data_guardrails */> _apply_stage; flat_hash_map _keyspaces; tables_metadata _tables_metadata; @@ -1768,7 +1783,9 @@ public: future<> init_logstor(); future<> recover_logstor(); const gms::feature_service& features() const { return _feat; } - future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout); + future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, + db::timeout_clock::time_point timeout, + shared_ptr guardrails); future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout); drain_progress get_drain_progress() const noexcept { @@ -1796,7 +1813,7 @@ private: auto sum_read_concurrency_sem_var(std::invocable auto member); auto sum_read_concurrency_sem_stat(std::invocable auto stats_member); - future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info); + future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails); future<> do_apply_many(const utils::chunked_vector&, db::timeout_clock::time_point timeout); future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout); @@ -1974,7 +1991,7 @@ public: tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, bool tombstone_gc_enabled = true); // Apply the mutation atomically. // Throws timed_out_error when timeout is reached. - future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{}); + future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{}, bool skip_large_data_guardrails = false); // Apply mutations atomically. // On restart, either all mutations will be replayed or none of them. // All mutations must belong to the same commitlog domain. diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 77b37f631d..edb9595f94 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -518,7 +518,11 @@ future<> distributed_loader::populate_keyspace(sharded& db, if (!is_system_keyspace(ks_name)) { co_await smp::invoke_on_all([&] { auto s = gtable->schema(); - db.local().find_column_family(s).mark_ready_for_writes(db.local().commitlog_for(s)); + auto& cf = db.local().find_column_family(s); + // Populate the large data caches from SSTable metadata + // before the table becomes writable. + cf.rebuild_large_data_index(); + cf.mark_ready_for_writes(db.local().commitlog_for(s)); }); } }); diff --git a/replica/exceptions.cc b/replica/exceptions.cc index 1a8a180d56..0567839321 100644 --- a/replica/exceptions.cc +++ b/replica/exceptions.cc @@ -25,6 +25,8 @@ exception_variant try_encode_replica_exception(std::exception_ptr eptr) { return abort_requested_exception(); } catch (const critical_disk_utilization_exception& e) { return e; + } catch (const large_data_exception& e) { + return e; } catch (...) { return no_exception{}; } diff --git a/replica/exceptions.hh b/replica/exceptions.hh index e60bea392e..f0deeb2df8 100644 --- a/replica/exceptions.hh +++ b/replica/exceptions.hh @@ -80,6 +80,25 @@ public: virtual const char* what() const noexcept override { return _message.c_str(); } }; +class large_data_exception final : public replica_exception { + seastar::sstring _ks_name; + seastar::sstring _cf_name; + seastar::sstring _message; +public: + large_data_exception(std::string_view ks_name, std::string_view cf_name, std::string_view detail) noexcept + : replica_exception() + , _ks_name(ks_name) + , _cf_name(cf_name) + , _message(seastar::format("Write rejected for {}.{}: {}", ks_name, cf_name, detail)) + { } + + const seastar::sstring& ks_name() const { return _ks_name; } + const seastar::sstring& cf_name() const { return _cf_name; } + const seastar::sstring& message() const { return _message; } + + virtual const char* what() const noexcept override { return _message.c_str(); } +}; + using abort_requested_exception = seastar::abort_requested_exception; struct exception_variant { @@ -88,7 +107,8 @@ struct exception_variant { rate_limit_exception, stale_topology_exception, abort_requested_exception, - critical_disk_utilization_exception + critical_disk_utilization_exception, + large_data_exception > reason; exception_variant() diff --git a/replica/memtable.cc b/replica/memtable.cc index 82dbe8c8a5..32fee5802e 100644 --- a/replica/memtable.cc +++ b/replica/memtable.cc @@ -9,6 +9,7 @@ #include "utils/assert.hh" #include "memtable.hh" #include "replica/database.hh" +#include "replica/exceptions.hh" #include "mutation/frozen_mutation.hh" #include "replica/partition_snapshot_reader.hh" #include "partition_builder.hh" @@ -16,6 +17,8 @@ #include "readers/empty.hh" #include "readers/forwardable.hh" #include "sstables/types.hh" +#include "keys/keys.hh" +#include "db/large_data_handler.hh" namespace replica { @@ -796,13 +799,15 @@ memtable::apply(const mutation& m, db::rp_handle&& h) { } void -memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h) { - with_allocator(allocator(), [this, &m, &m_schema] { +memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, + const db::large_data_guardrail_base& guardrails, db::rp_handle&& h) { + with_allocator(allocator(), [this, &m, &m_schema, &guardrails] { _table_shared_data.allocating_section(*this, [&, this] { - auto& p = find_or_create_partition_slow(m.key()); mutation_partition mp(*m_schema); partition_builder pb(*m_schema, mp); m.partition().accept(*m_schema, pb); + guardrails.check(*m_schema, mp, m.key()); + auto& p = find_or_create_partition_slow(m.key()); _stats_collector.update(*m_schema, mp); p.apply(region(), cleaner(), *_schema, std::move(mp), *m_schema, _table_stats.memtable_app_stats); }); diff --git a/replica/memtable.hh b/replica/memtable.hh index 36d799ac40..b0107f3221 100644 --- a/replica/memtable.hh +++ b/replica/memtable.hh @@ -22,6 +22,7 @@ #include "utils/double-decker.hh" #include "readers/empty.hh" #include "readers/mutation_source.hh" +#include "db/large_data_handler.hh" class frozen_mutation; class row_cache; @@ -236,8 +237,11 @@ public: // Applies mutation to this memtable. // The mutation is upgraded to current schema. void apply(const mutation& m, db::rp_handle&& = {}); - // The mutation is upgraded to current schema. - void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {}); + void apply(const frozen_mutation& m, const schema_ptr& m_schema, + const db::large_data_guardrail_base& guardrails, db::rp_handle&& = {}); + void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) { + apply(m, m_schema, *db::noop_large_data_guardrail::instance(), std::move(h)); + } void evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept; static memtable& from_region(logalloc::region& r) noexcept { diff --git a/replica/table.cc b/replica/table.cc index 77c312ecff..3b14f34650 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -53,6 +53,7 @@ #include "mutation/mutation_source_metadata.hh" #include "gms/gossiper.hh" #include "gms/feature_service.hh" +#include "cdc/log.hh" #include "db/config.hh" #include "db/commitlog/commitlog.hh" #include "utils/lister.hh" @@ -1603,6 +1604,7 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss add_maintenance_sstable(cg, sst); } update_stats_for_new_sstable(sst); + _large_data_guardrail->register_sstable(sst); if (trigger_compaction) { try_trigger_compaction(cg); } @@ -2051,6 +2053,10 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptrregister_sstable(sst); + } + co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> { const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name()); if (this_table_name == handler.get("table_name")) { @@ -2266,6 +2272,11 @@ void table::rebuild_statistics() { }); } +void table::rebuild_large_data_index() { + auto& sstables = *get_sstables(); + _large_data_guardrail->rebuild(sstables); +} + void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept { _stats.live_disk_space_used -= cg.live_disk_space_used_full_stats(); _stats.total_disk_space_used -= cg.total_disk_space_used_full_stats(); @@ -2438,6 +2449,8 @@ compaction_group::merge_sstables_from(compaction_group& group) { backlog_tracker_adjust_charges({}, sstables_to_merge); }); _t.rebuild_statistics(); + // No large data index update needed: SSTables just moved between + // compaction groups within the same table, and the index is table-wide. } future<> @@ -2565,6 +2578,9 @@ compaction_group::update_sstable_sets_on_compaction_completion(compaction::compa cache.refresh_snapshot(); _t.rebuild_statistics(); + for (auto& sst : desc.new_sstables) { + _t._large_data_guardrail->register_sstable(sst); + } co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc))); } @@ -3268,6 +3284,7 @@ table::table(schema_ptr schema, config config, lw_shared_ptras_data_dictionary()) , _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name())) , _counter_cell_locks(_schema->is_counter() ? std::make_unique(_schema, cl_stats) : nullptr) + , _large_data_guardrail(make_large_data_guardrail()) , _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name())) , _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name())) , _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name())) @@ -4728,6 +4745,17 @@ db::commitlog* table::commitlog() const { return _commitlog; } +shared_ptr table::make_large_data_guardrail() const { + bool enabled = _schema->large_data_guardrails_enabled() + && !is_internal_keyspace(_schema->ks_name()) + && !_schema->is_view() + && !cdc::is_log_name(_schema->cf_name()); + if (enabled) { + return make_shared(_config.guardrail_config); + } + return db::noop_large_data_guardrail::instance(); +} + void table::set_schema(schema_ptr s) { SCYLLA_ASSERT(s->is_counter() == _schema->is_counter()); tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}", @@ -4749,6 +4777,8 @@ void table::set_schema(schema_ptr s) { _logstor_index->set_schema(s); } _schema = std::move(s); + _large_data_guardrail = make_large_data_guardrail(); + _large_data_guardrail->rebuild(*get_sstables()); for (auto&& v : _views) { v->view_info()->reset_view_info(); @@ -4988,7 +5018,8 @@ future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::t template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&); -future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) { +future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, + db::timeout_clock::time_point timeout, shared_ptr guardrails) { if (_virtual_writer) [[unlikely]] { return (*_virtual_writer)(m); } @@ -5001,12 +5032,12 @@ future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_hand return _logstor->write(m.unfreeze(m_schema), cg, std::move(ss_holder)); } - return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable { - do_apply(cg, std::move(h), m, m_schema); + return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder), guardrails = std::move(guardrails)]() mutable { + do_apply(cg, std::move(h), m, m_schema, *guardrails); }, timeout); } -template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&); +template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&, const db::large_data_guardrail_base&); future<> write_memtable_to_sstable(mutation_reader reader, diff --git a/schema/schema.cc b/schema/schema.cc index 54aa87673e..7afd4cbb19 100644 --- a/schema/schema.cc +++ b/schema/schema.cc @@ -712,6 +712,7 @@ table_schema_version schema::calculate_digest(const schema::raw_schema& r) { } feed_hash(h, r._props.tablet_options); + feed_hash(h, r._large_data_guardrails_enabled); return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize())); } diff --git a/schema/schema.hh b/schema/schema.hh index 45bb2525cc..a96f42d4a0 100644 --- a/schema/schema.hh +++ b/schema/schema.hh @@ -612,6 +612,7 @@ private: // schema digest. It is also not set locally on a schema tables. std::reference_wrapper _sharder; bool _in_memory = false; + bool _large_data_guardrails_enabled = false; std::optional _view_info; user_properties _props; @@ -814,6 +815,10 @@ public: return _raw._per_partition_rate_limit_options; } + bool large_data_guardrails_enabled() const { + return _raw._large_data_guardrails_enabled; + } + const ::speculative_retry& speculative_retry() const { return _raw._props.speculative_retry; } diff --git a/schema/schema_builder.hh b/schema/schema_builder.hh index c4f0eafe51..5a94ab88bd 100644 --- a/schema/schema_builder.hh +++ b/schema/schema_builder.hh @@ -252,6 +252,10 @@ public: _raw._in_memory = in_memory; return *this; } + schema_builder& set_large_data_guardrails_enabled(bool enabled) { + _raw._large_data_guardrails_enabled = enabled; + return *this; + } schema_builder& set_tablet_options(std::map&& hints); diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 86da441e15..9901cf68e9 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -260,7 +260,14 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_ on_internal_error(logger, format("schema version in learn does not match current schema")); } - co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); + // Skip the large-partition write guardrail for the Paxos learn write. + // Paxos cannot recover from a partial-replica rejection here: rejecting + // on a subset of replicas would leave the cluster in an inconsistent + // state where some replicas have applied the decision and others + // haven't. The decision was already accepted; it must be durably + // committed by every reachable replica. + co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout, + std::monostate{}, /* skip_large_data_guardrails */ true); } else { logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision); tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f1c62a5324..153ea5d6a6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -650,9 +650,10 @@ private: seastar::log_level l = seastar::log_level::warn; if (is_timeout_exception(eptr) || std::holds_alternative(errors.local.reason) + || std::holds_alternative(errors.local.reason) || std::holds_alternative(errors.local.reason)) { - // ignore timeouts, abort requests and rate limit exceptions so that logs are not flooded. - // database's total_writes_timedout or total_writes_rate_limited counter was incremented. + // ignore timeouts, abort requests, rate limit and large-data rejections so that logs are not flooded. + // database's total_writes_timedout, total_writes_rate_limited or total_writes_rejected_due_to_large_partition counter was incremented. l = seastar::log_level::debug; } slogger.log(l, "Failed to apply mutation from {}#{}: {}", reply_to_host_id, shard, eptr); @@ -794,6 +795,9 @@ private: } else if constexpr (std::is_same_v) { msg = e.what(); return error::FAILURE; + } else if constexpr (std::is_same_v) { + msg = e.message(); + return error::FAILURE; } }, exception->reason); } @@ -3461,17 +3465,17 @@ storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_sta future<> storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, - smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) { + smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) { auto erm = _db.local().find_column_family(s).get_effective_replication_map(); - auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info] (shard_id shard) { + auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info, skip_large_data_guardrails] (shard_id shard) { get_stats().replica_cross_shard_ops += shard != this_shard_id(); auto shard_rate_limit = rate_limit_info; if (shard == this_shard_id()) { shard_rate_limit = adjust_rate_limit_for_local_operation(shard_rate_limit); } return _db.invoke_on(shard, {smp_grp, timeout}, - [&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit] (replica::database& db) mutable -> future<> { - return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit); + [&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit, skip_large_data_guardrails] (replica::database& db) mutable -> future<> { + return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit, skip_large_data_guardrails); }); }; return apply_on_shards(erm, *s, m.token(*s), std::move(apply)); @@ -4778,6 +4782,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo msg = e->grab_cause(); } else if (auto* e = try_catch(eptr)) { msg = e->what(); + } else if (auto* e = try_catch(eptr)) { + msg = e->message(); } else { slogger.error("exception during mutation write to {}.{} on {}: {}", schema->ks_name(), schema->cf_name(), coordinator, eptr); diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 1580bf129c..71f717cab0 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -593,7 +593,7 @@ private: // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, - smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info); + smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails = false); // Applies mutations on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(utils::chunked_vector mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info); @@ -702,8 +702,8 @@ public: } // Applies mutation on this node. // Resolves with timed_out_error when timeout is reached. - future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { - return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info); + future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate(), bool skip_large_data_guardrails = false) { + return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info, skip_large_data_guardrails); } // Applies materialized view mutation on this node. // Resolves with timed_out_error when timeout is reached. diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 7439f62296..0cedaf15e1 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -1102,6 +1102,10 @@ public: return _large_data_records; } + std::optional& get_large_data_records() noexcept { + return _large_data_records; + } + // Return the extended timestamp statistics map. // Some or all entries may be missing if not present in scylla_metadata scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept; diff --git a/sstables/types.hh b/sstables/types.hh index fad5d32779..9f72be9162 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -17,6 +17,7 @@ #include "mutation/tombstone.hh" #include "utils/streaming_histogram.hh" #include "utils/estimated_histogram.hh" +#include #include "sstables/key.hh" #include "sstables/file_writer.hh" #include "db/commitlog/replay_position.hh" @@ -614,6 +615,11 @@ struct large_data_record { uint64_t range_tombstones; // number of range tombstones in the partition uint64_t dead_rows; // number of dead rows in the partition + // Runtime hook for large_data_record_index. Not serialized. + using index_hook_type = boost::intrusive::set_member_hook< + boost::intrusive::link_mode>; + index_hook_type _index_hook; + template auto describe_type(sstable_version_types v, Describer f) { return f(type, partition_key, clustering_key, column_name, value, diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 1d83f9634a..790c1141b7 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -355,6 +355,7 @@ add_scylla_test(combined_tests group0_voter_calculator_test.cc index_with_paging_test.cc json_cql_query_test.cc + large_data_guardrail_test.cc large_paging_state_test.cc loading_cache_test.cc memtable_test.cc diff --git a/test/boost/large_data_guardrail_test.cc b/test/boost/large_data_guardrail_test.cc new file mode 100644 index 0000000000..d41d448deb --- /dev/null +++ b/test/boost/large_data_guardrail_test.cc @@ -0,0 +1,233 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#include +#include +#include "db/large_data_handler.hh" + +using sstables::large_data_record; +using sstables::large_data_type; +using db::lookup_key; +using db::record_type; +using db::record_compare; +using db::record_set; + +namespace { + +large_data_record& make_record(std::deque& store, + large_data_type type, bytes pk, bytes ck = {}, bytes col = {}, + uint64_t value = 0, uint64_t elements_count = 0) { + store.emplace_back(); + auto& rec = store.back(); + rec.type = type; + rec.partition_key.value = std::move(pk); + rec.clustering_key.value = std::move(ck); + rec.column_name.value = std::move(col); + rec.value = value; + rec.elements_count = elements_count; + rec.range_tombstones = 0; + rec.dead_rows = 0; + return rec; +} + +} // anonymous namespace + +BOOST_AUTO_TEST_SUITE(large_data_guardrail_test) + +BOOST_AUTO_TEST_CASE(test_partition_comparator_orders_by_pk_only) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::partition_size, to_bytes("aaa")); + auto& b = make_record(store, large_data_type::partition_size, to_bytes("bbb")); + + BOOST_REQUIRE(cmp(a, b)); + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(!cmp(a, a)); + + // Clustering key differences are ignored at partition level + auto& c = make_record(store, large_data_type::partition_size, + to_bytes("aaa"), to_bytes("zzz")); + BOOST_REQUIRE(!cmp(a, c)); + BOOST_REQUIRE(!cmp(c, a)); +} + +BOOST_AUTO_TEST_CASE(test_row_comparator_orders_by_pk_then_ck) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_a")); + auto& b = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_b")); + auto& c = make_record(store, large_data_type::row_size, + to_bytes("pk2"), to_bytes("ck_a")); + + BOOST_REQUIRE(cmp(a, b)); // same pk, ck_a < ck_b + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(cmp(a, c)); // pk1 < pk2 + BOOST_REQUIRE(cmp(b, c)); // pk1 < pk2 regardless of ck + + // Column name differences are ignored at row level + auto& d = make_record(store, large_data_type::row_size, + to_bytes("pk1"), to_bytes("ck_a"), to_bytes("col_z")); + BOOST_REQUIRE(!cmp(a, d)); + BOOST_REQUIRE(!cmp(d, a)); +} + +BOOST_AUTO_TEST_CASE(test_collection_comparator_orders_by_pk_ck_col) { + record_compare cmp; + std::deque store; + + auto& a = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a")); + auto& b = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_b")); + + BOOST_REQUIRE(cmp(a, b)); + BOOST_REQUIRE(!cmp(b, a)); + BOOST_REQUIRE(!cmp(a, a)); + + // Same pk+ck+col → equivalent + auto& c = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a")); + BOOST_REQUIRE(!cmp(a, c)); + BOOST_REQUIRE(!cmp(c, a)); +} + +BOOST_AUTO_TEST_CASE(test_heterogeneous_lookup_with_lookup_key) { + record_compare cmp; + std::deque store; + + auto& rec = make_record(store, large_data_type::partition_size, to_bytes("pk1")); + auto pk = to_bytes("pk1"); + lookup_key lk{bytes_view(pk), {}, {}}; + + // (record, key) and (key, record) must be consistent + BOOST_REQUIRE(!cmp(rec, lk)); + BOOST_REQUIRE(!cmp(lk, rec)); + + auto pk2 = to_bytes("pk2"); + lookup_key lk2{bytes_view(pk2), {}, {}}; + BOOST_REQUIRE(cmp(rec, lk2)); // pk1 < pk2 + BOOST_REQUIRE(!cmp(lk2, rec)); // pk2 > pk1 +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_partition_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::partition_size, + to_bytes("pk_a"), {}, {}, 100); + auto& r2 = make_record(store, large_data_type::partition_size, + to_bytes("pk_b"), {}, {}, 200); + auto& r3 = make_record(store, large_data_type::rows_in_partition, + to_bytes("pk_a"), {}, {}, 0, 50); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + // pk_a should match r1 (partition_size) and r3 (rows_in_partition) + auto pk_a = to_bytes("pk_a"); + lookup_key lk{bytes_view(pk_a), {}, {}}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_size = 0, max_rows = 0; + int count = 0; + for (auto it = begin; it != end; ++it) { + max_size = std::max(max_size, it->value); + max_rows = std::max(max_rows, it->elements_count); + ++count; + } + BOOST_REQUIRE_EQUAL(count, 2); + BOOST_REQUIRE_EQUAL(max_size, 100); + BOOST_REQUIRE_EQUAL(max_rows, 50); + + // pk_c: no match + auto pk_c = to_bytes("pk_c"); + lookup_key lk_miss{bytes_view(pk_c), {}, {}}; + auto [b2, e2] = set.equal_range(lk_miss, set.key_comp()); + BOOST_REQUIRE(b2 == e2); +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_row_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_a"), {}, 100); + auto& r2 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_a"), {}, 300); + auto& r3 = make_record(store, large_data_type::row_size, + to_bytes("pk"), to_bytes("ck_b"), {}, 200); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + auto pk = to_bytes("pk"); + auto ck_a = to_bytes("ck_a"); + lookup_key lk{bytes_view(pk), bytes_view(ck_a), {}}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_size = 0; + int count = 0; + for (auto it = begin; it != end; ++it) { + max_size = std::max(max_size, it->value); + ++count; + } + BOOST_REQUIRE_EQUAL(count, 2); // r1 and r2 + BOOST_REQUIRE_EQUAL(max_size, 300); +} + +BOOST_AUTO_TEST_CASE(test_equal_range_groups_collection_records) { + record_set set; + std::deque store; + + auto& r1 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 100); + auto& r2 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 500); + auto& r3 = make_record(store, large_data_type::elements_in_collection, + to_bytes("pk"), to_bytes("ck"), to_bytes("other"), 0, 200); + set.insert(r1); + set.insert(r2); + set.insert(r3); + + auto pk = to_bytes("pk"); + auto ck = to_bytes("ck"); + auto col = to_bytes("col"); + lookup_key lk{bytes_view(pk), bytes_view(ck), bytes_view(col)}; + auto [begin, end] = set.equal_range(lk, set.key_comp()); + + uint64_t max_count = 0; + int n = 0; + for (auto it = begin; it != end; ++it) { + max_count = std::max(max_count, it->elements_count); + ++n; + } + BOOST_REQUIRE_EQUAL(n, 2); // r1 and r2 + BOOST_REQUIRE_EQUAL(max_count, 500); +} + +BOOST_AUTO_TEST_CASE(test_auto_unlink_on_record_destruction) { + record_set set; + { + std::deque store; + auto& r1 = make_record(store, large_data_type::partition_size, + to_bytes("pk_a"), {}, {}, 100); + auto& r2 = make_record(store, large_data_type::partition_size, + to_bytes("pk_b"), {}, {}, 200); + set.insert(r1); + set.insert(r2); + BOOST_REQUIRE(!set.empty()); + } + // store destroyed → records destroyed → auto_unlink fires + BOOST_REQUIRE(set.empty()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index 6cfce04e49..2050e1cd9d 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -676,7 +676,7 @@ SEASTAR_TEST_CASE(test_system_schema_version_is_stable) { // If you changed the schema of system.batchlog then this is expected to fail. // Just replace expected version with the new version. - BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("c3f984e4-f886-3616-bb80-f8c68ed93595"))); + BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("95ec5e20-12f2-361a-90b3-662f8999400b"))); }); } diff --git a/test/cluster/test_large_partition_guardrail.py b/test/cluster/test_large_partition_guardrail.py new file mode 100644 index 0000000000..f0edcf9e27 --- /dev/null +++ b/test/cluster/test_large_partition_guardrail.py @@ -0,0 +1,102 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 +# + +import logging +import pytest +import time +import asyncio + +from cassandra import WriteFailure +from cassandra.protocol import ConfigurationException + +from test.pylib.manager_client import ManagerClient + +logger = logging.getLogger(__name__) + +REJECT_MSG_RE = "(?i)Write rejected.*(partition size|partition row count)" + + +async def _make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes): + """Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`.""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + for ck in range(num_rows): + await cql.run_async(insert, [pk, ck, bytes(value_size_bytes)]) + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_large_data_guardrails_rolling_upgrade(manager: ManagerClient): + """Verify that large_data_guardrails_enabled is properly gated by the + LARGE_DATA_GUARDRAILS cluster feature during a simulated rolling upgrade: + + 1. Start a 2-node cluster where one node suppresses the feature. + 2. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true + is rejected (even on the upgraded node). + 3. "Upgrade" the old node (remove suppress_features, restart). + 4. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true + now succeeds. + """ + cfg_base = { + "compaction_large_partition_warning_threshold_mb": 1, + "large_partition_fail_threshold_mb": 1, + } + cfg_old = { + **cfg_base, + "error_injections_at_startup": [ + {"name": "suppress_features", "value": "LARGE_DATA_GUARDRAILS"}, + ], + } + + servers = [] + servers.append(await manager.server_add(config=cfg_old)) # "old" node + servers.append(await manager.server_add(config=cfg_base)) # "new" node + + # Connect to the "new" node — it supports the feature but the cluster + # as a whole does not (because the old node doesn't advertise it). + cql = await manager.get_cql_exclusive(servers[1]) + + await cql.run_async( + "CREATE KEYSPACE ks_upgrade_test WITH REPLICATION = " + "{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" + ) + + # Feature not yet cluster-wide — enabling guardrails must be rejected. + with pytest.raises(ConfigurationException, match="cannot be used until all nodes"): + await cql.run_async( + "CREATE TABLE ks_upgrade_test.tbl1 " + "(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) " + "WITH large_data_guardrails_enabled = true" + ) + + # "Upgrade" the old node: remove the injection, restart. + await manager.server_stop_gracefully(servers[0].server_id) + await manager.server_remove_config_option(servers[0].server_id, "error_injections_at_startup") + await manager.server_start(servers[0].server_id) + + # Wait for the upgraded node to realize the feature is now cluster-wide. + timeout = time.time() + 60 + success = False + while not success and time.time() < timeout: + try: + await cql.run_async( + "CREATE TABLE ks_upgrade_test.tbl1 " + "(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) " + "WITH large_data_guardrails_enabled = true" + ) + success = True + except ConfigurationException as e: + assert "cannot be used until all nodes" in str(e) + await asyncio.sleep(0.5) + assert success, "Feature was not enabled cluster-wide within timeout" + + # Verify the guardrail actually works on the newly-created table. + await _make_oversized_partition(cql, "ks_upgrade_test.tbl1", pk=1, num_rows=2, value_size_bytes=600 * 1024) + # Flush on both servers — with RF=1 we don't know which owns pk=1. + for srv in servers: + await manager.api.keyspace_flush(srv.ip_addr, "ks_upgrade_test", "tbl1") + insert = cql.prepare("INSERT INTO ks_upgrade_test.tbl1 (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_MSG_RE): + await cql.run_async(insert, [1, 99, b"\x00"]) diff --git a/test/cqlpy/test_large_data_guardrail.py b/test/cqlpy/test_large_data_guardrail.py new file mode 100644 index 0000000000..d7ac382ba2 --- /dev/null +++ b/test/cqlpy/test_large_data_guardrail.py @@ -0,0 +1,638 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 +# + +# Tests for the large-data guardrail: write-path rejection and warnings for +# partitions, rows, and collections that exceed configured thresholds. + +import pytest +import re +import time +import io +from contextlib import ExitStack +from cassandra import WriteFailure + +from .util import new_test_table, config_value_context +from . import nodetool +from .test_logs import logfile, logfile_path, wait_for_log + +# All tests are Scylla-only (guardrails don't exist in Cassandra). +@pytest.fixture(scope="function", autouse=True) +def all_tests_are_scylla_only(scylla_only): + pass + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +REJECT_PARTITION_RE = "(?i)Write rejected.*(partition size|partition row count)" +REJECT_ROW_RE = "(?i)Write rejected.*row size" +REJECT_COLLECTION_RE = "(?i)Write rejected.*collection" +WARN_RE = "Large data guardrail" + + +def make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes): + """Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`.""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + for ck in range(num_rows): + cql.execute(insert, [pk, ck, bytes(value_size_bytes)]) + + +def make_large_row(cql, tbl, pk, ck, value_size_bytes): + """Insert a single row with a blob of `value_size_bytes` under (pk, ck).""" + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [pk, ck, bytes(value_size_bytes)]) + + +def build_large_collection(cql, tbl, pk, ck, num_elements): + """Build a set with `num_elements` entries under (pk, ck).""" + update = cql.prepare(f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(num_elements): + cql.execute(update, [{i}, pk, ck]) + + +def assert_no_log(logfile, pattern, after_action, timeout=0.5): + """Run `after_action`, then verify `pattern` does NOT appear in the log.""" + logfile.seek(0, io.SEEK_END) + after_action() + time.sleep(timeout) + new_content = logfile.read() + assert not re.search(pattern, new_content), \ + f"Expected no match for '{pattern}', got: {new_content}" + + +def wait_for_all_logs(logfile, patterns, timeout=5): + """Wait until every regex in `patterns` has appeared in the log.""" + contents = logfile.read() + remaining = set(patterns) + remaining = {p for p in remaining if not re.search(p, contents)} + if not remaining: + return + end = time.time() + timeout + while remaining and time.time() < end: + s = logfile.read() + if s: + contents += s + remaining = {p for p in remaining if not re.search(p, contents)} + if not remaining: + return + time.sleep(0.1) + pytest.fail(f'Timed out ({timeout}s) waiting for patterns: {remaining}') + + +# --------------------------------------------------------------------------- +# Partition size threshold +# --------------------------------------------------------------------------- + + +def test_partition_size_rejects_after_flush(cql, test_keyspace, logfile): + """A partition whose on-disk size exceeds the hard limit must be rejected + after flush.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + # Different partition still succeeds. + cql.execute(insert, [2, 0, b"\x00"]) + + +def test_partition_size_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '0')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Partition row-count threshold +# --------------------------------------------------------------------------- + + +def test_partition_rows_rejects_after_flush(cql, test_keyspace): + """A partition exceeding rows_count_fail_threshold must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_rows_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '50')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=60, + value_size_bytes=8) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 999, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — partition +# --------------------------------------------------------------------------- + + +def test_partition_size_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A partition above the detection threshold but below the hard limit must + produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_partition_rows_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A partition above the row-count detection threshold but below the hard + limit must produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_rows_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '1000')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=60, + value_size_bytes=8) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 999, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_partition_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small partition must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, b"\x00"])) + + +# --------------------------------------------------------------------------- +# Row size threshold +# --------------------------------------------------------------------------- + + +def test_row_size_rejects_after_flush(cql, test_keyspace): + """A row whose on-disk size exceeds the hard limit must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_ROW_RE): + cql.execute(insert, [1, 0, b"\x00"]) + + # Different CK succeeds. + cql.execute(insert, [1, 99, b"\x00"]) + # Different partition succeeds. + cql.execute(insert, [2, 0, b"\x00"]) + + +def test_row_size_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '0')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — row +# --------------------------------------------------------------------------- + + +def test_row_size_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A row above the detection threshold but below the hard limit must + produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_large_row(cql, tbl, pk=1, ck=0, + value_size_bytes=1200 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_row_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small row must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '10')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, b"\x00"]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, b"\x00"])) + + +# --------------------------------------------------------------------------- +# Collection element count threshold +# --------------------------------------------------------------------------- + + +def test_collection_rejects_after_flush(cql, test_keyspace): + """A collection exceeding the element-count hard limit must be rejected.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '50')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_COLLECTION_RE): + cql.execute(insert, [1, 0, {0}]) + + # Different CK succeeds. + cql.execute(insert, [1, 99, {0}]) + # Different partition succeeds. + cql.execute(insert, [2, 0, {0}]) + + +def test_collection_disabled_when_zero(cql, test_keyspace): + """When fail threshold is 0, no rejection even above the soft limit.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '0')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {0}]) + + +# --------------------------------------------------------------------------- +# Soft-limit warnings — collection +# --------------------------------------------------------------------------- + + +def test_collection_soft_limit_logs_warning(cql, test_keyspace, logfile): + """A collection above the detection threshold but below the hard limit + must produce a warning log entry.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '10000')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {0}]) + + wait_for_log(logfile, WARN_RE, timeout=5) + + +def test_collection_no_warning_below_soft_limit(cql, test_keyspace, logfile): + """A small collection must not produce any warning.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '10000')) + + schema = "pk int, ck int, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + cql.execute(insert, [1, 0, {1, 2, 3}]) + nodetool.flush(cql, tbl) + + assert_no_log(logfile, WARN_RE, + lambda: cql.execute(insert, [1, 1, {4, 5}])) + + +# --------------------------------------------------------------------------- +# Per-table enable/disable toggle +# --------------------------------------------------------------------------- + + +def test_per_table_disabled_at_create(cql, test_keyspace): + """A table created with guardrails disabled must never reject writes.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = false") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + +def test_per_table_alter_disable(cql, test_keyspace): + """ALTER TABLE ... WITH large_data_guardrails_enabled = false must stop + rejection.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = false") + cql.execute(insert, [1, 99, b"\x00"]) + + +def test_per_table_alter_reenable(cql, test_keyspace): + """After disabling then re-enabling guardrails, writes to a large + partition must be rejected again.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = false") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert, [1, 99, b"\x00"]) + + cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = true") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 100, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Exemptions +# --------------------------------------------------------------------------- + + +def test_lwt_paxos_learn_is_exempt(cql, test_keyspace): + """LWT (Paxos) commit writes must bypass the guardrail.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '1')) + + schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + make_oversized_partition(cql, tbl, pk=1, num_rows=2, + value_size_bytes=600 * 1024) + nodetool.flush(cql, tbl) + + # Non-LWT write is rejected. + insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE): + cql.execute(insert, [1, 99, b"\x00"]) + + # LWT write succeeds (Paxos learn bypasses guardrail). + lwt = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?) IF NOT EXISTS") + cql.execute(lwt, [1, 100, b"\x00"]) + + +# --------------------------------------------------------------------------- +# Multi-category: all guardrails fire independently +# --------------------------------------------------------------------------- + + +def test_all_guardrails_warn_independently(cql, test_keyspace, logfile): + """When an SSTable has records in all three categories, each guardrail + must warn independently (fail=0 means warn-only).""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '0')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '0')) + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '0')) + + schema = "pk int, ck int, v blob, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + # Build data large in all three dimensions under pk=1, ck=0. + insert_v = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + cql.execute(insert_v, [1, 0, bytes(1200 * 1024)]) + update_s = cql.prepare( + f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(60): + cql.execute(update_s, [{i}, 1, 0]) + + nodetool.flush(cql, tbl) + + insert_both = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v, s) VALUES (?, ?, ?, ?)") + cql.execute(insert_both, [1, 0, b"\x00", {999}]) + + wait_for_all_logs(logfile, [ + r"partition size.*exceeds", + r"row size.*exceeds", + r"collection element count.*exceeds", + ], timeout=5) + + +def test_each_guardrail_rejects_independently(cql, test_keyspace): + """Each hard limit must reject independently, with separate PKs isolating + each category.""" + with ExitStack() as cfg: + cfg.enter_context(config_value_context(cql, + 'compaction_large_partition_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_row_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'compaction_collection_elements_count_warning_threshold', '50')) + cfg.enter_context(config_value_context(cql, + 'compaction_large_cell_warning_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_partition_fail_threshold_mb', '2')) + cfg.enter_context(config_value_context(cql, + 'rows_count_fail_threshold', '0')) + cfg.enter_context(config_value_context(cql, + 'large_row_fail_threshold_mb', '1')) + cfg.enter_context(config_value_context(cql, + 'large_collection_elements_fail_threshold', '50')) + + schema = "pk int, ck int, v blob, s set, PRIMARY KEY (pk, ck)" + with new_test_table(cql, test_keyspace, schema, + extra=" WITH large_data_guardrails_enabled = true") as tbl: + insert_v = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)") + + # pk=1: 20 rows x 150 KB ~ 3 MB partition, each row < 1 MB. + for ck in range(20): + cql.execute(insert_v, [1, ck, bytes(150 * 1024)]) + + # pk=2: single 1.2 MB row; partition total < 2 MB. + cql.execute(insert_v, [2, 0, bytes(1200 * 1024)]) + + # pk=3: 60-element collection; row and partition tiny. + update_s = cql.prepare( + f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?") + for i in range(60): + cql.execute(update_s, [{i}, 3, 0]) + + nodetool.flush(cql, tbl) + + # Partition rejection. + with pytest.raises(WriteFailure, match="(?i)partition size.*exceeds"): + cql.execute(insert_v, [1, 99, b"\x00"]) + + # Row rejection. + with pytest.raises(WriteFailure, match="(?i)row size.*exceeds"): + cql.execute(insert_v, [2, 0, b"\x00"]) + # Different CK in pk=2 succeeds. + cql.execute(insert_v, [2, 99, b"\x00"]) + + # Collection rejection. + insert_s = cql.prepare( + f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)") + with pytest.raises(WriteFailure, + match="(?i)collection element count.*exceeds"): + cql.execute(insert_s, [3, 0, {999}]) + # Writing without the collection column succeeds. + cql.execute(insert_v, [3, 0, b"\x00"]) + # Different CK in pk=3 succeeds. + cql.execute(insert_s, [3, 99, {999}])