Merge 'guardrails: Add replica-side large data guardrails' from Taras Veretilnyk

Adds write-path guardrails that reject or warn on mutations targeting partitions, rows, or collections that already exceed configured size thresholds, based on SSTable `large_data_record` metadata.
ScyllaDB already detects and records large partitions/rows/cells in `system.large_data_records` after compaction, but takes no preventive action on the write path. Once a partition grows past operational limits it causes latency spikes, OOM, and repair failures. These guardrails let operators set hard and soft thresholds so that writes to already-oversized data are rejected (hard) or logged as warnings (soft) before they make the problem worse.
- **Intrusive index over SSTable metadata**: A per-table `large_data_record_index` maintains three `boost::intrusive::multiset`s (partitions, rows, cells) using `auto_unlink` hooks directly on `large_data_record`. SSTable destruction automatically removes records from the index — no explicit deregistration needed.
- **Virtual dispatch for zero-cost disabled path**: `large_data_guardrail_base` → `noop_large_data_guardrail` / `large_data_guardrail`. Tables without guardrails enabled pay only a virtual call to a no-op. No index is built or maintained for disabled tables.
-  **Schema storage**: The per-table flag is stored as a scylla_tables column, following the tablets pattern: only write a live cell when enabled, omit entirely when disabled. The CQL feature gate prevents enabling until all nodes are upgraded.
- **Write-path integration**: The guardrail check runs in `do_apply` after the frozen mutation is deserialized but before it is applied to the memtable. Hint replay and Paxos learn skip the check via `skip_large_data_guardrails`.
Uses existing `large_*_warn_threshold` config options as soft limits and new `large_*_fail_threshold` options as hard limits. Checked dimensions:
- Partition size (bytes)
- Partition row count
- Row size (bytes)
- Collection element count

Backport is not required

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-180

Closes scylladb/scylladb#29733

* github.com:scylladb/scylladb:
  test/cqlpy: add per-table toggle, LWT exemption, and multi-category tests
  test/cqlpy: add large collection guardrail tests
  test/cqlpy: add large row guardrail tests
  test/cqlpy: add large partition guardrail tests
  test/boost: add large_data_guardrail unit tests
  test/cluster: add large data guardrails rolling upgrade test
  replica: wire large_data_guardrail into the write path
  schema: add per-table large_data_guardrails_enabled flag
  db: implement large_data_guardrail
  db: implement large_data_record_index
  sstables: add intrusive index hook to large_data_record
  db: add large_collection_elements_fail_threshold config option
  db: add large_row_fail_threshold_mb config option
  db: add rows_count_fail_threshold config option
  db: add large_partition_fail_threshold_mb config option
  replica: introduce large_data_exception
This commit is contained in:
Botond Dénes
2026-06-01 13:26:00 +03:00
33 changed files with 1588 additions and 39 deletions

View File

@@ -425,18 +425,34 @@ column_index_size_in_kb: 1
# Log a warning when writing partitions larger than this value
# compaction_large_partition_warning_threshold_mb: 1000
# Reject writes to partitions whose on-disk size exceeds this threshold (MB).
# Set to 0 to disable.
large_partition_fail_threshold_mb: 2000
# Log a warning when writing rows larger than this value
# compaction_large_row_warning_threshold_mb: 10
# Reject writes to rows whose on-disk size exceeds this threshold (MB).
# Set to 0 to disable.
large_row_fail_threshold_mb: 20
# Log a warning when writing cells larger than this value
# compaction_large_cell_warning_threshold_mb: 1
# Log a warning when row number is larger than this value
# compaction_rows_count_warning_threshold: 100000
# Reject writes to partitions whose on-disk row count exceeds this threshold.
# Set to 0 to disable.
rows_count_fail_threshold: 200000
# Log a warning when writing a collection containing more elements than this value
# compaction_collection_elements_count_warning_threshold: 10000
# Reject writes to collections whose element count exceeds this threshold.
# Set to 0 to disable.
large_collection_elements_fail_threshold: 20000
# How long the coordinator should wait for seq or index scans to complete
# range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete

View File

@@ -1701,6 +1701,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/group0_voter_calculator_test.cc',
'test/boost/index_with_paging_test.cc',
'test/boost/json_cql_query_test.cc',
'test/boost/large_data_guardrail_test.cc',
'test/boost/large_paging_state_test.cc',
'test/boost/loading_cache_test.cc',
'test/boost/memtable_test.cc',

View File

@@ -60,6 +60,7 @@ const sstring cf_prop_defs::COMPACTION_ENABLED_KEY = "enabled";
const sstring cf_prop_defs::KW_TABLETS = "tablets";
const sstring cf_prop_defs::KW_STORAGE_ENGINE = "storage_engine";
const sstring cf_prop_defs::KW_LARGE_DATA_GUARDRAILS_ENABLED = "large_data_guardrails_enabled";
schema::extensions_map cf_prop_defs::make_schema_extensions(const db::extensions& exts) const {
schema::extensions_map er;
@@ -109,6 +110,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
KW_COMPRESSION, KW_CRC_CHECK_CHANCE, KW_ID, KW_PAXOSGRACESECONDS,
KW_SYNCHRONOUS_UPDATES, KW_TABLETS,
KW_STORAGE_ENGINE,
KW_LARGE_DATA_GUARDRAILS_ENABLED,
});
static std::set<sstring> obsolete_keywords({
sstring("index_interval"),
@@ -210,6 +212,12 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(format("Illegal value for '{}'", KW_STORAGE_ENGINE));
}
}
if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED) && get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false)) {
if (!db.features().large_data_guardrails) {
throw exceptions::configuration_exception("large_data_guardrails_enabled cannot be used until all nodes in the cluster enable this feature");
}
}
}
std::map<sstring, sstring> cf_prop_defs::get_compaction_type_options() const {
@@ -417,6 +425,9 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
builder.set_logstor();
}
}
if (has_property(KW_LARGE_DATA_GUARDRAILS_ENABLED)) {
builder.set_large_data_guardrails_enabled(get_boolean(KW_LARGE_DATA_GUARDRAILS_ENABLED, false));
}
}
void cf_prop_defs::validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const

View File

@@ -65,6 +65,7 @@ public:
static const sstring KW_TABLETS;
static const sstring KW_STORAGE_ENGINE;
static const sstring KW_LARGE_DATA_GUARDRAILS_ENABLED;
// FIXME: In origin the following consts are in CFMetaData.
static constexpr int32_t DEFAULT_DEFAULT_TIME_TO_LIVE = 0;

View File

@@ -282,7 +282,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
});
} else {
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout);
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout, db::noop_large_data_guardrail::instance());
}
}).then_wrapped([s] (future<> f) {
try {

View File

@@ -842,6 +842,18 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Log a warning when writing rows larger than this value.")
, compaction_large_cell_warning_threshold_mb(this, "compaction_large_cell_warning_threshold_mb", liveness::LiveUpdate, value_status::Used, 1,
"Log a warning when writing cells larger than this value.")
, large_partition_fail_threshold_mb(this, "large_partition_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0,
"Reject writes targeting a partition whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. "
"Set to 0 to disable.")
, rows_count_fail_threshold(this, "rows_count_fail_threshold", liveness::LiveUpdate, value_status::Used, 0,
"Reject writes targeting a partition whose on-disk row count already exceeds this threshold, as recorded in any SSTable's large data records. "
"Set to 0 to disable.")
, large_row_fail_threshold_mb(this, "large_row_fail_threshold_mb", liveness::LiveUpdate, value_status::Used, 0,
"Reject writes targeting a partition that contains any row whose on-disk size already exceeds this threshold in MB, as recorded in any SSTable's large data records. "
"Set to 0 to disable.")
, large_collection_elements_fail_threshold(this, "large_collection_elements_fail_threshold", liveness::LiveUpdate, value_status::Used, 0,
"Reject writes targeting a partition that contains any collection whose element count already exceeds this threshold, as recorded in any SSTable's large data records. "
"Set to 0 to disable.")
, compaction_rows_count_warning_threshold(this, "compaction_rows_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 100000,
"Log a warning when writing a number of rows larger than this value.")
, compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000,

View File

@@ -233,6 +233,10 @@ public:
named_value<uint32_t> compaction_large_partition_warning_threshold_mb;
named_value<uint32_t> compaction_large_row_warning_threshold_mb;
named_value<uint32_t> compaction_large_cell_warning_threshold_mb;
named_value<uint32_t> large_partition_fail_threshold_mb;
named_value<uint32_t> rows_count_fail_threshold;
named_value<uint32_t> large_row_fail_threshold_mb;
named_value<uint32_t> large_collection_elements_fail_threshold;
named_value<uint32_t> compaction_rows_count_warning_threshold;
named_value<uint32_t> compaction_collection_elements_count_warning_threshold;
named_value<uint32_t> compaction_large_data_records_per_sstable;

View File

@@ -13,6 +13,8 @@
#include "db/system_keyspace.hh"
#include "db/large_data_handler.hh"
#include "keys/keys.hh"
#include "mutation/mutation_partition.hh"
#include "replica/exceptions.hh"
#include "sstables/sstables.hh"
#include "gms/feature_service.hh"
#include "cql3/untyped_result_set.hh"
@@ -21,6 +23,10 @@ static logging::logger large_data_logger("large_data");
namespace db {
namespace {
constexpr uint64_t MB = 1024 * 1024;
}
nop_large_data_handler::nop_large_data_handler()
: large_data_handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()) {
@@ -76,6 +82,203 @@ future<> large_data_handler::unplug_system_keyspace() noexcept {
co_await _sys_ks.unplug();
}
void large_data_record_index::register_sstable(sstables::shared_sstable sst) {
auto& records_opt = sst->get_large_data_records();
if (!records_opt) {
return;
}
for (auto& rec : records_opt->elements) {
switch (rec.type) {
case sstables::large_data_type::partition_size:
case sstables::large_data_type::rows_in_partition:
_partitions.insert(rec);
break;
case sstables::large_data_type::row_size:
_rows.insert(rec);
break;
case sstables::large_data_type::elements_in_collection:
_collections.insert(rec);
break;
default:
break;
}
}
}
void large_data_record_index::rebuild(
const std::unordered_set<sstables::shared_sstable>& sstables) {
_partitions.clear();
_rows.clear();
_collections.clear();
for (const auto& sst : sstables) {
register_sstable(sst);
}
}
std::optional<large_data_record_index::partition_entry>
large_data_record_index::lookup_partition(bytes_view pk_bytes) const {
lookup_key lk{pk_bytes, {}, {}};
auto [begin, end] = _partitions.equal_range(lk, _partitions.key_comp());
if (begin == end) {
return std::nullopt;
}
partition_entry result;
for (auto it = begin; it != end; ++it) {
result.partition_size = std::max(result.partition_size, it->value);
result.rows = std::max(result.rows, it->elements_count);
}
return result;
}
std::optional<uint64_t> large_data_record_index::lookup_row(bytes_view pk_bytes,
bytes_view ck_bytes) const {
lookup_key lk{pk_bytes, ck_bytes, {}};
auto [begin, end] = _rows.equal_range(lk, _rows.key_comp());
if (begin == end) {
return std::nullopt;
}
uint64_t result = 0;
for (auto it = begin; it != end; ++it) {
result = std::max(result, it->value);
}
return result;
}
std::optional<uint64_t> large_data_record_index::lookup_collection(bytes_view pk_bytes,
bytes_view ck_bytes, bytes_view column_name) const {
lookup_key lk{pk_bytes, ck_bytes, column_name};
auto [begin, end] = _collections.equal_range(lk, _collections.key_comp());
if (begin == end) {
return std::nullopt;
}
uint64_t result = 0;
for (auto it = begin; it != end; ++it) {
result = std::max(result, it->elements_count);
}
return result;
}
void large_data_guardrail::register_sstable(sstables::shared_sstable sst) {
_index.register_sstable(std::move(sst));
}
void large_data_guardrail::rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) {
_index.rebuild(sstables);
}
void large_data_guardrail::check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const {
auto sst_key = sstables::key::from_partition_key(s, pk);
auto pk_bytes = bytes_view(sst_key.get_bytes());
check_partition(s, pk_bytes, pk);
check_rows_and_collections(s, pk_bytes, mp, pk);
}
void large_data_guardrail::check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const {
const uint64_t size_fail = uint64_t(_cfg.partition_size_fail_threshold_mb()) * MB;
const uint64_t size_warn = uint64_t(_cfg.partition_size_warn_threshold_mb()) * MB;
const uint64_t rows_fail = uint64_t(_cfg.rows_count_fail_threshold());
const uint64_t rows_warn = uint64_t(_cfg.rows_count_warn_threshold());
auto entry = _index.lookup_partition(pk_bytes);
if (!entry) [[likely]] {
return;
}
if (size_fail > 0 && entry->partition_size >= size_fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: partition size {} exceeds hard limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->partition_size, size_fail, s.ks_name(), s.cf_name(), pk));
}
if (entry->partition_size >= size_warn) {
large_data_logger.warn("Large data guardrail: partition size {} exceeds soft limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->partition_size, size_warn, s.ks_name(), s.cf_name(), pk);
}
if (rows_fail > 0 && entry->rows >= rows_fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: partition row count {} exceeds hard limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->rows, rows_fail, s.ks_name(), s.cf_name(), pk));
}
if (entry->rows >= rows_warn) {
large_data_logger.warn("Large data guardrail: partition row count {} exceeds soft limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->rows, rows_warn, s.ks_name(), s.cf_name(), pk);
}
}
void large_data_guardrail::check_rows_and_collections(const schema& s, bytes_view pk_bytes,
const mutation_partition& mp, partition_key_view pk) const {
if (!mp.static_row().empty()) {
check_row_size(s, pk_bytes, pk, bytes_view(), nullptr);
mp.static_row().for_each_cell([&](column_id id, const atomic_cell_or_collection&) {
check_collection_element_count(s, pk_bytes, pk, s.static_column_at(id), bytes_view(), nullptr);
});
}
for (const auto& cr : mp.non_dummy_rows()) {
auto ck_bytes = cr.key().view().representation().linearize();
auto ck_bv = bytes_view(ck_bytes);
check_row_size(s, pk_bytes, pk, ck_bv, &cr.key());
cr.row().cells().for_each_cell([&](column_id id, const atomic_cell_or_collection&) {
check_collection_element_count(s, pk_bytes, pk, s.regular_column_at(id), ck_bv, &cr.key());
});
}
}
void large_data_guardrail::check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk,
bytes_view ck_bytes, const clustering_key_prefix* ck) const {
const uint64_t fail = uint64_t(_cfg.row_size_fail_threshold_mb()) * MB;
const uint64_t warn = uint64_t(_cfg.row_size_warn_threshold_mb()) * MB;
auto row_size = _index.lookup_row(pk_bytes, ck_bytes);
if (!row_size) [[likely]] {
return;
}
if (fail > 0 && *row_size >= fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: row size {} exceeds hard limit {} "
"(keyspace={}, table={}, clustering_key={})",
*row_size, fail, s.ks_name(), s.cf_name(),
ck ? format("{}", ck->with_schema(s)) : sstring()));
}
if (*row_size >= warn) {
large_data_logger.warn("Large data guardrail: row size {} exceeds soft limit {} "
"(keyspace={}, table={}, clustering_key={})",
*row_size, warn, s.ks_name(), s.cf_name(),
ck ? format("{}", ck->with_schema(s)) : sstring());
}
}
void large_data_guardrail::check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk,
const column_definition& cdef, bytes_view ck_bytes,
const clustering_key_prefix* ck) const {
if (cdef.is_atomic()) {
return;
}
const uint64_t fail = uint64_t(_cfg.collection_elements_fail_threshold());
const uint64_t warn = uint64_t(_cfg.collection_elements_warn_threshold());
auto col_bytes = to_bytes(cdef.name_as_text());
auto count = _index.lookup_collection(pk_bytes, ck_bytes, bytes_view(col_bytes));
if (!count) [[likely]] {
return;
}
if (fail > 0 && *count >= fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: collection element count {} exceeds hard limit {} "
"(keyspace={}, table={}, column={}, clustering_key={})",
*count, fail, s.ks_name(), s.cf_name(), cdef.name_as_text(),
ck ? format("{}", ck->with_schema(s)) : sstring()));
}
if (*count >= warn) {
large_data_logger.warn("Large data guardrail: collection element count {} exceeds soft limit {} "
"(keyspace={}, table={}, column={}, clustering_key={})",
*count, warn, s.ks_name(), s.cf_name(), cdef.name_as_text(),
ck ? format("{}", ck->with_schema(s)) : sstring());
}
}
sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
return sst.component_basename(sstables::component_type::Data);
}
@@ -366,4 +569,5 @@ future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(co
large_table_name, s.ks_name(), s.cf_name(), old_name, new_name, std::current_exception());
}
}
}

View File

@@ -8,11 +8,18 @@
#pragma once
#include <concepts>
#include <cstdint>
#include <optional>
#include <seastar/core/shared_ptr.hh>
#include <boost/intrusive/set.hpp>
#include "bytes.hh"
#include "schema/schema_fwd.hh"
#include "system_keyspace.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/types.hh"
#include "utils/assert.hh"
#include "utils/hash.hh"
#include "utils/updateable_value.hh"
#include "utils/pluggable.hh"
@@ -21,10 +28,149 @@ class sstable;
class key;
}
class partition_key_view;
class mutation_partition;
namespace db {
class system_keyspace;
using sstables::large_data_record;
struct lookup_key {
bytes_view pk;
bytes_view ck;
bytes_view column_name;
};
// Compile-time comparison depth, one per multiset.
// partition → pk only
// row → pk + ck
// cell → pk + ck + column_name
enum class record_type { partition, row, collection };
template<record_type Depth>
struct record_compare {
private:
static bytes_view get_pk(const large_data_record& r) noexcept { return bytes_view(r.partition_key.value); }
static bytes_view get_pk(const lookup_key& k) noexcept { return k.pk; }
static bytes_view get_ck(const large_data_record& r) noexcept { return bytes_view(r.clustering_key.value); }
static bytes_view get_ck(const lookup_key& k) noexcept { return k.ck; }
static bytes_view get_col(const large_data_record& r) noexcept { return bytes_view(r.column_name.value); }
static bytes_view get_col(const lookup_key& k) noexcept { return k.column_name; }
public:
template<typename L, typename R>
requires (std::same_as<L, large_data_record> || std::same_as<R, large_data_record>)
bool operator()(const L& a, const R& b) const noexcept {
auto l_pk = get_pk(a), r_pk = get_pk(b);
if (l_pk != r_pk) {
return l_pk < r_pk;
}
if constexpr (Depth == record_type::partition) {
return false;
}
auto l_ck = get_ck(a), r_ck = get_ck(b);
if (l_ck != r_ck) {
return l_ck < r_ck;
}
if constexpr (Depth == record_type::row) {
return false;
}
return get_col(a) < get_col(b);
}
};
template<record_type Depth>
using record_set = boost::intrusive::multiset<large_data_record,
boost::intrusive::member_hook<large_data_record,
large_data_record::index_hook_type, &large_data_record::_index_hook>,
boost::intrusive::compare<record_compare<Depth>>,
boost::intrusive::constant_time_size<false>>;
// Per-table index over large_data_records from all live SSTables.
// Links directly into records stored in each SSTable's scylla_metadata
// via intrusive member hooks (auto_unlink). Aggregation (max across
// SSTables for the same key) happens at lookup time via equal_range.
class large_data_record_index {
public:
struct partition_entry {
uint64_t partition_size = 0;
uint64_t rows = 0;
};
void register_sstable(sstables::shared_sstable sst);
void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables);
std::optional<partition_entry> lookup_partition(bytes_view pk_bytes) const;
std::optional<uint64_t> lookup_row(bytes_view pk_bytes, bytes_view ck_bytes) const;
std::optional<uint64_t> lookup_collection(bytes_view pk_bytes,
bytes_view ck_bytes, bytes_view column_name) const;
private:
record_set<record_type::partition> _partitions;
record_set<record_type::row> _rows;
record_set<record_type::collection> _collections;
};
struct guardrail_config {
utils::updateable_value<uint32_t> partition_size_fail_threshold_mb;
utils::updateable_value<uint32_t> partition_size_warn_threshold_mb;
utils::updateable_value<uint32_t> rows_count_fail_threshold;
utils::updateable_value<uint32_t> rows_count_warn_threshold;
utils::updateable_value<uint32_t> row_size_fail_threshold_mb;
utils::updateable_value<uint32_t> row_size_warn_threshold_mb;
utils::updateable_value<uint32_t> collection_elements_fail_threshold;
utils::updateable_value<uint32_t> collection_elements_warn_threshold;
};
// Each replica::table holds a unique_ptr to either a real guardrail or a
// noop. The guardrail owns the per-table large_data_record_index, so
// noop tables pay no index-maintenance cost.
class large_data_guardrail_base {
public:
virtual ~large_data_guardrail_base() = default;
virtual void check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const = 0;
virtual void register_sstable(sstables::shared_sstable sst) = 0;
virtual void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) = 0;
};
class noop_large_data_guardrail final : public large_data_guardrail_base {
public:
static shared_ptr<large_data_guardrail_base> instance() {
static thread_local auto inst = make_shared<noop_large_data_guardrail>();
return inst;
}
void check(const schema&, const mutation_partition&,
partition_key_view) const override {}
void register_sstable(sstables::shared_sstable) override {}
void rebuild(const std::unordered_set<sstables::shared_sstable>&) override {}
};
class large_data_guardrail final : public large_data_guardrail_base {
public:
explicit large_data_guardrail(guardrail_config cfg) noexcept
: _cfg(std::move(cfg)) {}
void check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const override;
void register_sstable(sstables::shared_sstable sst) override;
void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) override;
private:
void check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const;
void check_rows_and_collections(const schema& s, bytes_view pk_bytes, const mutation_partition& mp, partition_key_view pk) const;
void check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk,
bytes_view ck_bytes, const clustering_key_prefix* ck) const;
void check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk,
const column_definition& cdef, bytes_view ck_bytes,
const clustering_key_prefix* ck) const;
guardrail_config _cfg;
large_data_record_index _index;
};
class large_data_handler {
public:
struct stats {

View File

@@ -339,6 +339,7 @@ schema_ptr scylla_tables(schema_features features) {
sb.with_column("tablets", map_type_impl::get_instance(utf8_type, utf8_type, false));
sb.with_column("storage_engine", utf8_type);
sb.with_column("large_data_guardrails_enabled", boolean_type);
sb.with_hash_version();
s = sb.build();
@@ -1700,6 +1701,17 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
if (table->logstor_enabled()) {
m.set_clustered_cell(ckey, "storage_engine", "logstor", timestamp);
}
// Write the large_data_guardrails_enabled column only when enabled.
// When disabled (the default), omit the cell entirely so that old nodes
// that don't know this column can still read the SSTable during rolling
// upgrade or rollback. The CQL validation gate ensures that 'true' can
// only be set once all nodes support the column, so it is safe to write
// the live cell at that point.
if (table->large_data_guardrails_enabled()) {
auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled");
m.set_clustered_cell(ckey, guardrails_cdef,
atomic_cell::make_live(*boolean_type, timestamp, boolean_type->decompose(true)));
}
// In-memory tables are deprecated since scylla-2024.1.0
// FIXME: delete the column when there's no live version supporting it anymore.
// Writing it here breaks upgrade rollback to versions that do not support the in_memory schema_feature
@@ -1938,6 +1950,23 @@ utils::chunked_vector<mutation> make_update_table_mutations(service::storage_pro
utils::chunked_vector<mutation> mutations;
add_table_or_view_to_schema_mutation(new_table, timestamp, false, mutations);
make_update_indices_mutations(sp, old_table, new_table, timestamp, mutations);
// When large_data_guardrails_enabled transitions from true to false,
// write a tombstone to override the previous live cell in scylla_tables.
// make_scylla_tables_mutation only writes a live cell when enabled, so
// the tombstone must be added here where we have the old schema.
// This is safe because the CQL feature gate ensures all nodes are
// upgraded before the property can be set to true.
if (old_table->large_data_guardrails_enabled() && !new_table->large_data_guardrails_enabled()) {
schema_ptr s = tables();
auto pkey = partition_key::from_singular(*s, new_table->ks_name());
auto ckey = clustering_key::from_singular(*s, new_table->cf_name());
mutation m(scylla_tables(), pkey);
auto& guardrails_cdef = *scylla_tables()->get_column_definition("large_data_guardrails_enabled");
m.set_clustered_cell(ckey, guardrails_cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
mutations.emplace_back(std::move(m));
}
make_update_columns_mutations(std::move(old_table), std::move(new_table), timestamp, mutations);
warn(unimplemented::cause::TRIGGERS);
@@ -2197,6 +2226,8 @@ static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, sche
throw std::invalid_argument(format("Invalid value for storage_engine: {}", *storage_engine));
}
}
auto guardrails_enabled = table_row.get<bool>("large_data_guardrails_enabled");
builder.set_large_data_guardrails_enabled(guardrails_enabled.value_or(false));
}
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, schema_ptr cdc_schema, std::optional<table_schema_version> version)

View File

@@ -180,6 +180,7 @@ public:
gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv };
gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv };
gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv };
gms::feature large_data_guardrails { *this, "LARGE_DATA_GUARDRAILS"sv };
gms::feature keyspace_multi_rf_change { *this, "KEYSPACE_MULTI_RF_CHANGE"sv };
gms::feature view_building_tasks_min_task_id { *this, "VIEW_BUILDING_TASKS_MIN_TASK_ID"sv };
public:

View File

@@ -27,13 +27,20 @@ class critical_disk_utilization_exception {
sstring failed_action();
};
class large_data_exception {
sstring ks_name();
sstring cf_name();
sstring message();
};
struct exception_variant {
std::variant<replica::unknown_exception,
replica::no_exception,
replica::rate_limit_exception,
replica::stale_topology_exception,
replica::abort_requested_exception,
replica::critical_disk_utilization_exception
replica::critical_disk_utilization_exception,
replica::large_data_exception
> reason;
};

View File

@@ -1626,6 +1626,16 @@ keyspace::make_column_family_config(const schema& s, const database& db) const {
cfg.data_listeners = &db.data_listeners();
cfg.enable_compacting_data_for_streaming_and_repair = db_config.enable_compacting_data_for_streaming_and_repair;
cfg.enable_tombstone_gc_for_streaming_and_repair = db_config.enable_tombstone_gc_for_streaming_and_repair;
cfg.guardrail_config = db::guardrail_config{
.partition_size_fail_threshold_mb = db_config.large_partition_fail_threshold_mb,
.partition_size_warn_threshold_mb = db_config.compaction_large_partition_warning_threshold_mb,
.rows_count_fail_threshold = db_config.rows_count_fail_threshold,
.rows_count_warn_threshold = db_config.compaction_rows_count_warning_threshold,
.row_size_fail_threshold_mb = db_config.large_row_fail_threshold_mb,
.row_size_warn_threshold_mb = db_config.compaction_large_row_warning_threshold_mb,
.collection_elements_fail_threshold = db_config.large_collection_elements_fail_threshold,
.collection_elements_warn_threshold = db_config.compaction_collection_elements_count_warning_threshold,
};
return cfg;
}
@@ -2164,20 +2174,29 @@ std::vector<replica::shared_memtable> memtable_list::clear_and_add() {
return std::exchange(_memtables, std::move(new_memtables));
}
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h,
db::timeout_clock::time_point timeout,
shared_ptr<db::large_data_guardrail_base> guardrails) {
auto& cf = find_column_family(m.column_family_id());
data_listeners().on_write(m_schema, m);
if (m.representation().size() > 128*1024) {
return unfreeze_gently(m, std::move(m_schema)).then([&cf, h = std::move(h), timeout] (auto m) mutable {
// Big mutation: unfreeze_gently (yields), then check guardrails on
// the already-deserialized mutation before applying.
auto pk = m.key();
return unfreeze_gently(m, std::move(m_schema)).then(
[&cf, h = std::move(h), timeout, guardrails, pk] (auto m) mutable {
guardrails->check(*m.schema(), m.partition(), pk);
return do_with(std::move(m), [&cf, h = std::move(h), timeout] (auto& m) mutable {
return cf.apply(m, std::move(h), timeout);
});
});
}
return cf.apply(m, std::move(m_schema), std::move(h), timeout);
// Small mutation: forward guardrails to memtable::apply which will check
// after partition_builder deserializes — no redundant unfreeze.
return cf.apply(m, std::move(m_schema), std::move(h), timeout, std::move(guardrails));
}
future<> database::apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
@@ -2358,13 +2377,14 @@ future<> database::do_apply_many(const utils::chunked_vector<frozen_mutation>& m
auto handles = co_await cl->add_entries(std::move(writers), timeout);
// FIXME: Memtable application is not atomic so reads may observe mutations partially applied until restart.
auto noop = db::noop_large_data_guardrail::instance();
for (size_t i = 0; i < muts.size(); ++i) {
auto s = local_schema_registry().get(muts[i].schema_version());
co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout);
co_await apply_in_memory(muts[i], s, std::move(handles[i]), timeout, noop);
}
}
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) {
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) {
++_stats->total_writes;
// assume failure until proven otherwise
auto update_writes_failed = defer([&] { ++_stats->total_writes_failed; });
@@ -2393,6 +2413,10 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
co_await coroutine::return_exception(replica::critical_disk_utilization_exception{"rejected write mutation"});
}
auto guardrails = skip_large_data_guardrails
? db::noop_large_data_guardrail::instance()
: cf.get_large_data_guardrail();
if (!std::holds_alternative<std::monostate>(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) {
auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second();
auto& write_label = cf.get_rate_limiter_label_for_writes();
@@ -2451,7 +2475,8 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra
co_await coroutine::exception(std::move(ex));
}
}
auto f = co_await coroutine::as_future(this->apply_in_memory(m, s, std::move(h), timeout));
auto f = co_await coroutine::as_future(
this->apply_in_memory(m, s, std::move(h), timeout, std::move(guardrails)));
if (f.failed()) {
auto ex = f.get_exception();
if (try_catch<mutation_reordered_with_truncate_exception>(ex)) {
@@ -2515,7 +2540,7 @@ void database::update_write_metrics_for_rejected_writes() {
++_stats->total_writes_rejected_due_to_out_of_space_prevention;
}
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) {
if (dblog.is_enabled(logging::log_level::trace)) {
dblog.trace("apply {}", m.pretty_printer(s));
}
@@ -2526,7 +2551,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_
if (!s->is_synced()) {
on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info);
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info, skip_large_data_guardrails);
}
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
@@ -2536,7 +2561,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
if (!s->is_synced()) {
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{});
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}, false);
}
keyspace::config

View File

@@ -77,6 +77,7 @@
#include "replica/tables_metadata_lock.hh"
#include "service/topology_guard.hh"
#include "utils/disk_space_monitor.hh"
#include "db/large_data_handler.hh"
class cell_locker;
class cell_locker_stats;
@@ -480,6 +481,7 @@ public:
unsigned x_log2_compaction_groups{0};
utils::updateable_value<bool> enable_compacting_data_for_streaming_and_repair;
utils::updateable_value<bool> enable_tombstone_gc_for_streaming_and_repair;
db::guardrail_config guardrail_config;
};
using snapshot_details = db::snapshot_ctl::table_snapshot_details;
@@ -553,6 +555,8 @@ private:
std::unique_ptr<cell_locker> _counter_cell_locks; // Memory-intensive; allocate only when needed.
shared_ptr<db::large_data_guardrail_base> _large_data_guardrail;
// Labels used to identify writes and reads for this table in the rate_limiter structure.
db::rate_limiter::label _rate_limiter_label_for_writes;
db::rate_limiter::label _rate_limiter_label_for_reads;
@@ -1011,6 +1015,7 @@ public:
[[gnu::always_inline]] bool uses_tablets() const;
int64_t calculate_tablet_count() const;
private:
shared_ptr<db::large_data_guardrail_base> make_large_data_guardrail() const;
void update_tombstone_gc_rf_one();
future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg);
@@ -1028,13 +1033,14 @@ public:
// Applies given mutation to this column family
// The mutation is always upgraded to current schema.
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) {
do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema);
do_apply(compaction_group_for_key(m.key(), m_schema), std::move(h), m, m_schema, *db::noop_large_data_guardrail::instance());
}
void apply(const mutation& m, db::rp_handle&& h = {}) {
do_apply(compaction_group_for_token(m.token()), std::move(h), m);
}
future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point tmo);
future<> apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h,
db::timeout_clock::time_point tmo, shared_ptr<db::large_data_guardrail_base> guardrails);
future<> apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::time_point tmo);
// Returns at most "cmd.limit" rows
@@ -1306,6 +1312,14 @@ public:
return _index_manager;
}
shared_ptr<db::large_data_guardrail_base> get_large_data_guardrail() const noexcept {
return _large_data_guardrail;
}
// Rebuild from current SSTable set. Used at startup; after that
// the index is maintained incrementally via register_sstable.
void rebuild_large_data_index();
sstables::sstables_manager& get_sstables_manager() noexcept {
return _sstables_manager;
}
@@ -1705,7 +1719,8 @@ private:
tracing::trace_state_ptr,
db::timeout_clock::time_point,
db::commitlog_force_sync,
db::per_partition_rate_limit::info> _apply_stage;
db::per_partition_rate_limit::info,
bool /* skip_large_data_guardrails */> _apply_stage;
flat_hash_map<sstring, keyspace> _keyspaces;
tables_metadata _tables_metadata;
@@ -1768,7 +1783,9 @@ public:
future<> init_logstor();
future<> recover_logstor();
const gms::feature_service& features() const { return _feat; }
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&,
db::timeout_clock::time_point timeout,
shared_ptr<db::large_data_guardrail_base> guardrails);
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);
drain_progress get_drain_progress() const noexcept {
@@ -1796,7 +1813,7 @@ private:
auto sum_read_concurrency_sem_var(std::invocable<reader_concurrency_semaphore&> auto member);
auto sum_read_concurrency_sem_stat(std::invocable<reader_concurrency_semaphore::stats&> auto stats_member);
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info);
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails);
future<> do_apply_many(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
@@ -1974,7 +1991,7 @@ public:
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, bool tombstone_gc_enabled = true);
// Apply the mutation atomically.
// Throws timed_out_error when timeout is reached.
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{}, bool skip_large_data_guardrails = false);
// Apply mutations atomically.
// On restart, either all mutations will be replayed or none of them.
// All mutations must belong to the same commitlog domain.

View File

@@ -518,7 +518,11 @@ future<> distributed_loader::populate_keyspace(sharded<replica::database>& db,
if (!is_system_keyspace(ks_name)) {
co_await smp::invoke_on_all([&] {
auto s = gtable->schema();
db.local().find_column_family(s).mark_ready_for_writes(db.local().commitlog_for(s));
auto& cf = db.local().find_column_family(s);
// Populate the large data caches from SSTable metadata
// before the table becomes writable.
cf.rebuild_large_data_index();
cf.mark_ready_for_writes(db.local().commitlog_for(s));
});
}
});

View File

@@ -25,6 +25,8 @@ exception_variant try_encode_replica_exception(std::exception_ptr eptr) {
return abort_requested_exception();
} catch (const critical_disk_utilization_exception& e) {
return e;
} catch (const large_data_exception& e) {
return e;
} catch (...) {
return no_exception{};
}

View File

@@ -80,6 +80,25 @@ public:
virtual const char* what() const noexcept override { return _message.c_str(); }
};
class large_data_exception final : public replica_exception {
seastar::sstring _ks_name;
seastar::sstring _cf_name;
seastar::sstring _message;
public:
large_data_exception(std::string_view ks_name, std::string_view cf_name, std::string_view detail) noexcept
: replica_exception()
, _ks_name(ks_name)
, _cf_name(cf_name)
, _message(seastar::format("Write rejected for {}.{}: {}", ks_name, cf_name, detail))
{ }
const seastar::sstring& ks_name() const { return _ks_name; }
const seastar::sstring& cf_name() const { return _cf_name; }
const seastar::sstring& message() const { return _message; }
virtual const char* what() const noexcept override { return _message.c_str(); }
};
using abort_requested_exception = seastar::abort_requested_exception;
struct exception_variant {
@@ -88,7 +107,8 @@ struct exception_variant {
rate_limit_exception,
stale_topology_exception,
abort_requested_exception,
critical_disk_utilization_exception
critical_disk_utilization_exception,
large_data_exception
> reason;
exception_variant()

View File

@@ -9,6 +9,7 @@
#include "utils/assert.hh"
#include "memtable.hh"
#include "replica/database.hh"
#include "replica/exceptions.hh"
#include "mutation/frozen_mutation.hh"
#include "replica/partition_snapshot_reader.hh"
#include "partition_builder.hh"
@@ -16,6 +17,8 @@
#include "readers/empty.hh"
#include "readers/forwardable.hh"
#include "sstables/types.hh"
#include "keys/keys.hh"
#include "db/large_data_handler.hh"
namespace replica {
@@ -796,13 +799,15 @@ memtable::apply(const mutation& m, db::rp_handle&& h) {
}
void
memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h) {
with_allocator(allocator(), [this, &m, &m_schema] {
memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema,
const db::large_data_guardrail_base& guardrails, db::rp_handle&& h) {
with_allocator(allocator(), [this, &m, &m_schema, &guardrails] {
_table_shared_data.allocating_section(*this, [&, this] {
auto& p = find_or_create_partition_slow(m.key());
mutation_partition mp(*m_schema);
partition_builder pb(*m_schema, mp);
m.partition().accept(*m_schema, pb);
guardrails.check(*m_schema, mp, m.key());
auto& p = find_or_create_partition_slow(m.key());
_stats_collector.update(*m_schema, mp);
p.apply(region(), cleaner(), *_schema, std::move(mp), *m_schema, _table_stats.memtable_app_stats);
});

View File

@@ -22,6 +22,7 @@
#include "utils/double-decker.hh"
#include "readers/empty.hh"
#include "readers/mutation_source.hh"
#include "db/large_data_handler.hh"
class frozen_mutation;
class row_cache;
@@ -236,8 +237,11 @@ public:
// Applies mutation to this memtable.
// The mutation is upgraded to current schema.
void apply(const mutation& m, db::rp_handle&& = {});
// The mutation is upgraded to current schema.
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {});
void apply(const frozen_mutation& m, const schema_ptr& m_schema,
const db::large_data_guardrail_base& guardrails, db::rp_handle&& = {});
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h = {}) {
apply(m, m_schema, *db::noop_large_data_guardrail::instance(), std::move(h));
}
void evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept;
static memtable& from_region(logalloc::region& r) noexcept {

View File

@@ -53,6 +53,7 @@
#include "mutation/mutation_source_metadata.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "cdc/log.hh"
#include "db/config.hh"
#include "db/commitlog/commitlog.hh"
#include "utils/lister.hh"
@@ -1603,6 +1604,7 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
add_maintenance_sstable(cg, sst);
}
update_stats_for_new_sstable(sst);
_large_data_guardrail->register_sstable(sst);
if (trigger_compaction) {
try_trigger_compaction(cg);
}
@@ -2051,6 +2053,10 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
return update_cache(cg, old, newtabs);
});
for (const auto& sst : newtabs) {
_large_data_guardrail->register_sstable(sst);
}
co_await utils::get_local_injector().inject("replica_post_flush_after_update_cache", [this] (auto& handler) -> future<> {
const auto this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
if (this_table_name == handler.get("table_name")) {
@@ -2266,6 +2272,11 @@ void table::rebuild_statistics() {
});
}
void table::rebuild_large_data_index() {
auto& sstables = *get_sstables();
_large_data_guardrail->rebuild(sstables);
}
void table::subtract_compaction_group_from_stats(const compaction_group& cg) noexcept {
_stats.live_disk_space_used -= cg.live_disk_space_used_full_stats();
_stats.total_disk_space_used -= cg.total_disk_space_used_full_stats();
@@ -2438,6 +2449,8 @@ compaction_group::merge_sstables_from(compaction_group& group) {
backlog_tracker_adjust_charges({}, sstables_to_merge);
});
_t.rebuild_statistics();
// No large data index update needed: SSTables just moved between
// compaction groups within the same table, and the index is table-wide.
}
future<>
@@ -2565,6 +2578,9 @@ compaction_group::update_sstable_sets_on_compaction_completion(compaction::compa
cache.refresh_snapshot();
_t.rebuild_statistics();
for (auto& sst : desc.new_sstables) {
_t._large_data_guardrail->register_sstable(sst);
}
co_await builder.delete_sstables_atomically(unused_sstables_for_deletion(std::move(desc)));
}
@@ -3268,6 +3284,7 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
, _index_manager(this->as_data_dictionary())
, _flush_barrier(format("[table {}.{}] flush_barrier", _schema->ks_name(), _schema->cf_name()))
, _counter_cell_locks(_schema->is_counter() ? std::make_unique<cell_locker>(_schema, cl_stats) : nullptr)
, _large_data_guardrail(make_large_data_guardrail())
, _async_gate(format("[table {}.{}] async_gate", _schema->ks_name(), _schema->cf_name()))
, _pending_writes_phaser(format("[table {}.{}] pending_writes", _schema->ks_name(), _schema->cf_name()))
, _pending_reads_phaser(format("[table {}.{}] pending_reads", _schema->ks_name(), _schema->cf_name()))
@@ -4728,6 +4745,17 @@ db::commitlog* table::commitlog() const {
return _commitlog;
}
shared_ptr<db::large_data_guardrail_base> table::make_large_data_guardrail() const {
bool enabled = _schema->large_data_guardrails_enabled()
&& !is_internal_keyspace(_schema->ks_name())
&& !_schema->is_view()
&& !cdc::is_log_name(_schema->cf_name());
if (enabled) {
return make_shared<db::large_data_guardrail>(_config.guardrail_config);
}
return db::noop_large_data_guardrail::instance();
}
void table::set_schema(schema_ptr s) {
SCYLLA_ASSERT(s->is_counter() == _schema->is_counter());
tlogger.debug("Changing schema version of {}.{} ({}) from {} to {}",
@@ -4749,6 +4777,8 @@ void table::set_schema(schema_ptr s) {
_logstor_index->set_schema(s);
}
_schema = std::move(s);
_large_data_guardrail = make_large_data_guardrail();
_large_data_guardrail->rebuild(*get_sstables());
for (auto&& v : _views) {
v->view_info()->reset_view_info();
@@ -4988,7 +5018,8 @@ future<> table::apply(const mutation& m, db::rp_handle&& h, db::timeout_clock::t
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const mutation&);
future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h, db::timeout_clock::time_point timeout) {
future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&& h,
db::timeout_clock::time_point timeout, shared_ptr<db::large_data_guardrail_base> guardrails) {
if (_virtual_writer) [[unlikely]] {
return (*_virtual_writer)(m);
}
@@ -5001,12 +5032,12 @@ future<> table::apply(const frozen_mutation& m, schema_ptr m_schema, db::rp_hand
return _logstor->write(m.unfreeze(m_schema), cg, std::move(ss_holder));
}
return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder)]() mutable {
do_apply(cg, std::move(h), m, m_schema);
return dirty_memory_region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), h = std::move(h), &cg, holder = std::move(holder), guardrails = std::move(guardrails)]() mutable {
do_apply(cg, std::move(h), m, m_schema, *guardrails);
}, timeout);
}
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&);
template void table::do_apply(compaction_group& cg, db::rp_handle&&, const frozen_mutation&, const schema_ptr&, const db::large_data_guardrail_base&);
future<>
write_memtable_to_sstable(mutation_reader reader,

View File

@@ -712,6 +712,7 @@ table_schema_version schema::calculate_digest(const schema::raw_schema& r) {
}
feed_hash(h, r._props.tablet_options);
feed_hash(h, r._large_data_guardrails_enabled);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}

View File

@@ -612,6 +612,7 @@ private:
// schema digest. It is also not set locally on a schema tables.
std::reference_wrapper<const dht::static_sharder> _sharder;
bool _in_memory = false;
bool _large_data_guardrails_enabled = false;
std::optional<raw_view_info> _view_info;
user_properties _props;
@@ -814,6 +815,10 @@ public:
return _raw._per_partition_rate_limit_options;
}
bool large_data_guardrails_enabled() const {
return _raw._large_data_guardrails_enabled;
}
const ::speculative_retry& speculative_retry() const {
return _raw._props.speculative_retry;
}

View File

@@ -252,6 +252,10 @@ public:
_raw._in_memory = in_memory;
return *this;
}
schema_builder& set_large_data_guardrails_enabled(bool enabled) {
_raw._large_data_guardrails_enabled = enabled;
return *this;
}
schema_builder& set_tablet_options(std::map<sstring, sstring>&& hints);

View File

@@ -260,7 +260,14 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_
on_internal_error(logger, format("schema version in learn does not match current schema"));
}
co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
// Skip the large-partition write guardrail for the Paxos learn write.
// Paxos cannot recover from a partial-replica rejection here: rejecting
// on a subset of replicas would leave the cluster in an inconsistent
// state where some replicas have applied the decision and others
// haven't. The decision was already accepted; it must be durably
// committed by every reachable replica.
co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout,
std::monostate{}, /* skip_large_data_guardrails */ true);
} else {
logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);

View File

@@ -650,9 +650,10 @@ private:
seastar::log_level l = seastar::log_level::warn;
if (is_timeout_exception(eptr)
|| std::holds_alternative<replica::rate_limit_exception>(errors.local.reason)
|| std::holds_alternative<replica::large_data_exception>(errors.local.reason)
|| std::holds_alternative<abort_requested_exception>(errors.local.reason)) {
// ignore timeouts, abort requests and rate limit exceptions so that logs are not flooded.
// database's total_writes_timedout or total_writes_rate_limited counter was incremented.
// ignore timeouts, abort requests, rate limit and large-data rejections so that logs are not flooded.
// database's total_writes_timedout, total_writes_rate_limited or total_writes_rejected_due_to_large_partition counter was incremented.
l = seastar::log_level::debug;
}
slogger.log(l, "Failed to apply mutation from {}#{}: {}", reply_to_host_id, shard, eptr);
@@ -794,6 +795,9 @@ private:
} else if constexpr (std::is_same_v<Ex, replica::critical_disk_utilization_exception>) {
msg = e.what();
return error::FAILURE;
} else if constexpr (std::is_same_v<Ex, replica::large_data_exception>) {
msg = e.message();
return error::FAILURE;
}
}, exception->reason);
}
@@ -3461,17 +3465,17 @@ storage_proxy::mutate_locally(const mutation& m, tracing::trace_state_ptr tr_sta
future<>
storage_proxy::mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout,
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info) {
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails) {
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info] (shard_id shard) {
auto apply = [this, erm, s, &m, tr_state, sync, timeout, smp_grp, rate_limit_info, skip_large_data_guardrails] (shard_id shard) {
get_stats().replica_cross_shard_ops += shard != this_shard_id();
auto shard_rate_limit = rate_limit_info;
if (shard == this_shard_id()) {
shard_rate_limit = adjust_rate_limit_for_local_operation(shard_rate_limit);
}
return _db.invoke_on(shard, {smp_grp, timeout},
[&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit] (replica::database& db) mutable -> future<> {
return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit);
[&m, erm, gs = global_schema_ptr(s), gtr = tracing::global_trace_state_ptr(std::move(tr_state)), timeout, sync, shard_rate_limit, skip_large_data_guardrails] (replica::database& db) mutable -> future<> {
return db.apply(gs, m, gtr.get(), sync, timeout, shard_rate_limit, skip_large_data_guardrails);
});
};
return apply_on_shards(erm, *s, m.token(*s), std::move(apply));
@@ -4778,6 +4782,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
msg = e->grab_cause();
} else if (auto* e = try_catch<replica::critical_disk_utilization_exception>(eptr)) {
msg = e->what();
} else if (auto* e = try_catch<replica::large_data_exception>(eptr)) {
msg = e->message();
} else {
slogger.error("exception during mutation write to {}.{} on {}: {}",
schema->ks_name(), schema->cf_name(), coordinator, eptr);

View File

@@ -593,7 +593,7 @@ private:
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout,
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info, bool skip_large_data_guardrails = false);
// Applies mutations on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(utils::chunked_vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);
@@ -702,8 +702,8 @@ public:
}
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) {
return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info);
future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate(), bool skip_large_data_guardrails = false) {
return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info, skip_large_data_guardrails);
}
// Applies materialized view mutation on this node.
// Resolves with timed_out_error when timeout is reached.

View File

@@ -1102,6 +1102,10 @@ public:
return _large_data_records;
}
std::optional<scylla_metadata::large_data_records>& get_large_data_records() noexcept {
return _large_data_records;
}
// Return the extended timestamp statistics map.
// Some or all entries may be missing if not present in scylla_metadata
scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept;

View File

@@ -17,6 +17,7 @@
#include "mutation/tombstone.hh"
#include "utils/streaming_histogram.hh"
#include "utils/estimated_histogram.hh"
#include <boost/intrusive/set.hpp>
#include "sstables/key.hh"
#include "sstables/file_writer.hh"
#include "db/commitlog/replay_position.hh"
@@ -614,6 +615,11 @@ struct large_data_record {
uint64_t range_tombstones; // number of range tombstones in the partition
uint64_t dead_rows; // number of dead rows in the partition
// Runtime hook for large_data_record_index. Not serialized.
using index_hook_type = boost::intrusive::set_member_hook<
boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
index_hook_type _index_hook;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) {
return f(type, partition_key, clustering_key, column_name, value,

View File

@@ -355,6 +355,7 @@ add_scylla_test(combined_tests
group0_voter_calculator_test.cc
index_with_paging_test.cc
json_cql_query_test.cc
large_data_guardrail_test.cc
large_paging_state_test.cc
loading_cache_test.cc
memtable_test.cc

View File

@@ -0,0 +1,233 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <boost/test/unit_test.hpp>
#include <deque>
#include "db/large_data_handler.hh"
using sstables::large_data_record;
using sstables::large_data_type;
using db::lookup_key;
using db::record_type;
using db::record_compare;
using db::record_set;
namespace {
large_data_record& make_record(std::deque<large_data_record>& store,
large_data_type type, bytes pk, bytes ck = {}, bytes col = {},
uint64_t value = 0, uint64_t elements_count = 0) {
store.emplace_back();
auto& rec = store.back();
rec.type = type;
rec.partition_key.value = std::move(pk);
rec.clustering_key.value = std::move(ck);
rec.column_name.value = std::move(col);
rec.value = value;
rec.elements_count = elements_count;
rec.range_tombstones = 0;
rec.dead_rows = 0;
return rec;
}
} // anonymous namespace
BOOST_AUTO_TEST_SUITE(large_data_guardrail_test)
BOOST_AUTO_TEST_CASE(test_partition_comparator_orders_by_pk_only) {
record_compare<record_type::partition> cmp;
std::deque<large_data_record> store;
auto& a = make_record(store, large_data_type::partition_size, to_bytes("aaa"));
auto& b = make_record(store, large_data_type::partition_size, to_bytes("bbb"));
BOOST_REQUIRE(cmp(a, b));
BOOST_REQUIRE(!cmp(b, a));
BOOST_REQUIRE(!cmp(a, a));
// Clustering key differences are ignored at partition level
auto& c = make_record(store, large_data_type::partition_size,
to_bytes("aaa"), to_bytes("zzz"));
BOOST_REQUIRE(!cmp(a, c));
BOOST_REQUIRE(!cmp(c, a));
}
BOOST_AUTO_TEST_CASE(test_row_comparator_orders_by_pk_then_ck) {
record_compare<record_type::row> cmp;
std::deque<large_data_record> store;
auto& a = make_record(store, large_data_type::row_size,
to_bytes("pk1"), to_bytes("ck_a"));
auto& b = make_record(store, large_data_type::row_size,
to_bytes("pk1"), to_bytes("ck_b"));
auto& c = make_record(store, large_data_type::row_size,
to_bytes("pk2"), to_bytes("ck_a"));
BOOST_REQUIRE(cmp(a, b)); // same pk, ck_a < ck_b
BOOST_REQUIRE(!cmp(b, a));
BOOST_REQUIRE(cmp(a, c)); // pk1 < pk2
BOOST_REQUIRE(cmp(b, c)); // pk1 < pk2 regardless of ck
// Column name differences are ignored at row level
auto& d = make_record(store, large_data_type::row_size,
to_bytes("pk1"), to_bytes("ck_a"), to_bytes("col_z"));
BOOST_REQUIRE(!cmp(a, d));
BOOST_REQUIRE(!cmp(d, a));
}
BOOST_AUTO_TEST_CASE(test_collection_comparator_orders_by_pk_ck_col) {
record_compare<record_type::collection> cmp;
std::deque<large_data_record> store;
auto& a = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a"));
auto& b = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_b"));
BOOST_REQUIRE(cmp(a, b));
BOOST_REQUIRE(!cmp(b, a));
BOOST_REQUIRE(!cmp(a, a));
// Same pk+ck+col → equivalent
auto& c = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk1"), to_bytes("ck1"), to_bytes("col_a"));
BOOST_REQUIRE(!cmp(a, c));
BOOST_REQUIRE(!cmp(c, a));
}
BOOST_AUTO_TEST_CASE(test_heterogeneous_lookup_with_lookup_key) {
record_compare<record_type::partition> cmp;
std::deque<large_data_record> store;
auto& rec = make_record(store, large_data_type::partition_size, to_bytes("pk1"));
auto pk = to_bytes("pk1");
lookup_key lk{bytes_view(pk), {}, {}};
// (record, key) and (key, record) must be consistent
BOOST_REQUIRE(!cmp(rec, lk));
BOOST_REQUIRE(!cmp(lk, rec));
auto pk2 = to_bytes("pk2");
lookup_key lk2{bytes_view(pk2), {}, {}};
BOOST_REQUIRE(cmp(rec, lk2)); // pk1 < pk2
BOOST_REQUIRE(!cmp(lk2, rec)); // pk2 > pk1
}
BOOST_AUTO_TEST_CASE(test_equal_range_groups_partition_records) {
record_set<record_type::partition> set;
std::deque<large_data_record> store;
auto& r1 = make_record(store, large_data_type::partition_size,
to_bytes("pk_a"), {}, {}, 100);
auto& r2 = make_record(store, large_data_type::partition_size,
to_bytes("pk_b"), {}, {}, 200);
auto& r3 = make_record(store, large_data_type::rows_in_partition,
to_bytes("pk_a"), {}, {}, 0, 50);
set.insert(r1);
set.insert(r2);
set.insert(r3);
// pk_a should match r1 (partition_size) and r3 (rows_in_partition)
auto pk_a = to_bytes("pk_a");
lookup_key lk{bytes_view(pk_a), {}, {}};
auto [begin, end] = set.equal_range(lk, set.key_comp());
uint64_t max_size = 0, max_rows = 0;
int count = 0;
for (auto it = begin; it != end; ++it) {
max_size = std::max(max_size, it->value);
max_rows = std::max(max_rows, it->elements_count);
++count;
}
BOOST_REQUIRE_EQUAL(count, 2);
BOOST_REQUIRE_EQUAL(max_size, 100);
BOOST_REQUIRE_EQUAL(max_rows, 50);
// pk_c: no match
auto pk_c = to_bytes("pk_c");
lookup_key lk_miss{bytes_view(pk_c), {}, {}};
auto [b2, e2] = set.equal_range(lk_miss, set.key_comp());
BOOST_REQUIRE(b2 == e2);
}
BOOST_AUTO_TEST_CASE(test_equal_range_groups_row_records) {
record_set<record_type::row> set;
std::deque<large_data_record> store;
auto& r1 = make_record(store, large_data_type::row_size,
to_bytes("pk"), to_bytes("ck_a"), {}, 100);
auto& r2 = make_record(store, large_data_type::row_size,
to_bytes("pk"), to_bytes("ck_a"), {}, 300);
auto& r3 = make_record(store, large_data_type::row_size,
to_bytes("pk"), to_bytes("ck_b"), {}, 200);
set.insert(r1);
set.insert(r2);
set.insert(r3);
auto pk = to_bytes("pk");
auto ck_a = to_bytes("ck_a");
lookup_key lk{bytes_view(pk), bytes_view(ck_a), {}};
auto [begin, end] = set.equal_range(lk, set.key_comp());
uint64_t max_size = 0;
int count = 0;
for (auto it = begin; it != end; ++it) {
max_size = std::max(max_size, it->value);
++count;
}
BOOST_REQUIRE_EQUAL(count, 2); // r1 and r2
BOOST_REQUIRE_EQUAL(max_size, 300);
}
BOOST_AUTO_TEST_CASE(test_equal_range_groups_collection_records) {
record_set<record_type::collection> set;
std::deque<large_data_record> store;
auto& r1 = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 100);
auto& r2 = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk"), to_bytes("ck"), to_bytes("col"), 0, 500);
auto& r3 = make_record(store, large_data_type::elements_in_collection,
to_bytes("pk"), to_bytes("ck"), to_bytes("other"), 0, 200);
set.insert(r1);
set.insert(r2);
set.insert(r3);
auto pk = to_bytes("pk");
auto ck = to_bytes("ck");
auto col = to_bytes("col");
lookup_key lk{bytes_view(pk), bytes_view(ck), bytes_view(col)};
auto [begin, end] = set.equal_range(lk, set.key_comp());
uint64_t max_count = 0;
int n = 0;
for (auto it = begin; it != end; ++it) {
max_count = std::max(max_count, it->elements_count);
++n;
}
BOOST_REQUIRE_EQUAL(n, 2); // r1 and r2
BOOST_REQUIRE_EQUAL(max_count, 500);
}
BOOST_AUTO_TEST_CASE(test_auto_unlink_on_record_destruction) {
record_set<record_type::partition> set;
{
std::deque<large_data_record> store;
auto& r1 = make_record(store, large_data_type::partition_size,
to_bytes("pk_a"), {}, {}, 100);
auto& r2 = make_record(store, large_data_type::partition_size,
to_bytes("pk_b"), {}, {}, 200);
set.insert(r1);
set.insert(r2);
BOOST_REQUIRE(!set.empty());
}
// store destroyed → records destroyed → auto_unlink fires
BOOST_REQUIRE(set.empty());
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -676,7 +676,7 @@ SEASTAR_TEST_CASE(test_system_schema_version_is_stable) {
// If you changed the schema of system.batchlog then this is expected to fail.
// Just replace expected version with the new version.
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("c3f984e4-f886-3616-bb80-f8c68ed93595")));
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("95ec5e20-12f2-361a-90b3-662f8999400b")));
});
}

View File

@@ -0,0 +1,102 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import logging
import pytest
import time
import asyncio
from cassandra import WriteFailure
from cassandra.protocol import ConfigurationException
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
REJECT_MSG_RE = "(?i)Write rejected.*(partition size|partition row count)"
async def _make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes):
"""Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`."""
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
for ck in range(num_rows):
await cql.run_async(insert, [pk, ck, bytes(value_size_bytes)])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_large_data_guardrails_rolling_upgrade(manager: ManagerClient):
"""Verify that large_data_guardrails_enabled is properly gated by the
LARGE_DATA_GUARDRAILS cluster feature during a simulated rolling upgrade:
1. Start a 2-node cluster where one node suppresses the feature.
2. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true
is rejected (even on the upgraded node).
3. "Upgrade" the old node (remove suppress_features, restart).
4. Verify that CREATE TABLE WITH large_data_guardrails_enabled = true
now succeeds.
"""
cfg_base = {
"compaction_large_partition_warning_threshold_mb": 1,
"large_partition_fail_threshold_mb": 1,
}
cfg_old = {
**cfg_base,
"error_injections_at_startup": [
{"name": "suppress_features", "value": "LARGE_DATA_GUARDRAILS"},
],
}
servers = []
servers.append(await manager.server_add(config=cfg_old)) # "old" node
servers.append(await manager.server_add(config=cfg_base)) # "new" node
# Connect to the "new" node — it supports the feature but the cluster
# as a whole does not (because the old node doesn't advertise it).
cql = await manager.get_cql_exclusive(servers[1])
await cql.run_async(
"CREATE KEYSPACE ks_upgrade_test WITH REPLICATION = "
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
)
# Feature not yet cluster-wide — enabling guardrails must be rejected.
with pytest.raises(ConfigurationException, match="cannot be used until all nodes"):
await cql.run_async(
"CREATE TABLE ks_upgrade_test.tbl1 "
"(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) "
"WITH large_data_guardrails_enabled = true"
)
# "Upgrade" the old node: remove the injection, restart.
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_remove_config_option(servers[0].server_id, "error_injections_at_startup")
await manager.server_start(servers[0].server_id)
# Wait for the upgraded node to realize the feature is now cluster-wide.
timeout = time.time() + 60
success = False
while not success and time.time() < timeout:
try:
await cql.run_async(
"CREATE TABLE ks_upgrade_test.tbl1 "
"(pk int, ck int, v blob, PRIMARY KEY (pk, ck)) "
"WITH large_data_guardrails_enabled = true"
)
success = True
except ConfigurationException as e:
assert "cannot be used until all nodes" in str(e)
await asyncio.sleep(0.5)
assert success, "Feature was not enabled cluster-wide within timeout"
# Verify the guardrail actually works on the newly-created table.
await _make_oversized_partition(cql, "ks_upgrade_test.tbl1", pk=1, num_rows=2, value_size_bytes=600 * 1024)
# Flush on both servers — with RF=1 we don't know which owns pk=1.
for srv in servers:
await manager.api.keyspace_flush(srv.ip_addr, "ks_upgrade_test", "tbl1")
insert = cql.prepare("INSERT INTO ks_upgrade_test.tbl1 (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_MSG_RE):
await cql.run_async(insert, [1, 99, b"\x00"])

View File

@@ -0,0 +1,638 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
# Tests for the large-data guardrail: write-path rejection and warnings for
# partitions, rows, and collections that exceed configured thresholds.
import pytest
import re
import time
import io
from contextlib import ExitStack
from cassandra import WriteFailure
from .util import new_test_table, config_value_context
from . import nodetool
from .test_logs import logfile, logfile_path, wait_for_log
# All tests are Scylla-only (guardrails don't exist in Cassandra).
@pytest.fixture(scope="function", autouse=True)
def all_tests_are_scylla_only(scylla_only):
pass
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
REJECT_PARTITION_RE = "(?i)Write rejected.*(partition size|partition row count)"
REJECT_ROW_RE = "(?i)Write rejected.*row size"
REJECT_COLLECTION_RE = "(?i)Write rejected.*collection"
WARN_RE = "Large data guardrail"
def make_oversized_partition(cql, tbl, pk, num_rows, value_size_bytes):
"""Insert `num_rows` rows of `value_size_bytes` blobs under partition key `pk`."""
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
for ck in range(num_rows):
cql.execute(insert, [pk, ck, bytes(value_size_bytes)])
def make_large_row(cql, tbl, pk, ck, value_size_bytes):
"""Insert a single row with a blob of `value_size_bytes` under (pk, ck)."""
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [pk, ck, bytes(value_size_bytes)])
def build_large_collection(cql, tbl, pk, ck, num_elements):
"""Build a set<int> with `num_elements` entries under (pk, ck)."""
update = cql.prepare(f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?")
for i in range(num_elements):
cql.execute(update, [{i}, pk, ck])
def assert_no_log(logfile, pattern, after_action, timeout=0.5):
"""Run `after_action`, then verify `pattern` does NOT appear in the log."""
logfile.seek(0, io.SEEK_END)
after_action()
time.sleep(timeout)
new_content = logfile.read()
assert not re.search(pattern, new_content), \
f"Expected no match for '{pattern}', got: {new_content}"
def wait_for_all_logs(logfile, patterns, timeout=5):
"""Wait until every regex in `patterns` has appeared in the log."""
contents = logfile.read()
remaining = set(patterns)
remaining = {p for p in remaining if not re.search(p, contents)}
if not remaining:
return
end = time.time() + timeout
while remaining and time.time() < end:
s = logfile.read()
if s:
contents += s
remaining = {p for p in remaining if not re.search(p, contents)}
if not remaining:
return
time.sleep(0.1)
pytest.fail(f'Timed out ({timeout}s) waiting for patterns: {remaining}')
# ---------------------------------------------------------------------------
# Partition size threshold
# ---------------------------------------------------------------------------
def test_partition_size_rejects_after_flush(cql, test_keyspace, logfile):
"""A partition whose on-disk size exceeds the hard limit must be rejected
after flush."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE):
cql.execute(insert, [1, 99, b"\x00"])
# Different partition still succeeds.
cql.execute(insert, [2, 0, b"\x00"])
def test_partition_size_disabled_when_zero(cql, test_keyspace):
"""When fail threshold is 0, no rejection even above the soft limit."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '0'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 99, b"\x00"])
# ---------------------------------------------------------------------------
# Partition row-count threshold
# ---------------------------------------------------------------------------
def test_partition_rows_rejects_after_flush(cql, test_keyspace):
"""A partition exceeding rows_count_fail_threshold must be rejected."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_rows_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'rows_count_fail_threshold', '50'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=60,
value_size_bytes=8)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE):
cql.execute(insert, [1, 999, b"\x00"])
# ---------------------------------------------------------------------------
# Soft-limit warnings — partition
# ---------------------------------------------------------------------------
def test_partition_size_soft_limit_logs_warning(cql, test_keyspace, logfile):
"""A partition above the detection threshold but below the hard limit must
produce a warning log entry."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '10'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 99, b"\x00"])
wait_for_log(logfile, WARN_RE, timeout=5)
def test_partition_rows_soft_limit_logs_warning(cql, test_keyspace, logfile):
"""A partition above the row-count detection threshold but below the hard
limit must produce a warning log entry."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_rows_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'rows_count_fail_threshold', '1000'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=60,
value_size_bytes=8)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 999, b"\x00"])
wait_for_log(logfile, WARN_RE, timeout=5)
def test_partition_no_warning_below_soft_limit(cql, test_keyspace, logfile):
"""A small partition must not produce any warning."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '10'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, b"\x00"])
nodetool.flush(cql, tbl)
assert_no_log(logfile, WARN_RE,
lambda: cql.execute(insert, [1, 1, b"\x00"]))
# ---------------------------------------------------------------------------
# Row size threshold
# ---------------------------------------------------------------------------
def test_row_size_rejects_after_flush(cql, test_keyspace):
"""A row whose on-disk size exceeds the hard limit must be rejected."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_large_row(cql, tbl, pk=1, ck=0,
value_size_bytes=1200 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_ROW_RE):
cql.execute(insert, [1, 0, b"\x00"])
# Different CK succeeds.
cql.execute(insert, [1, 99, b"\x00"])
# Different partition succeeds.
cql.execute(insert, [2, 0, b"\x00"])
def test_row_size_disabled_when_zero(cql, test_keyspace):
"""When fail threshold is 0, no rejection even above the soft limit."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '0'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_large_row(cql, tbl, pk=1, ck=0,
value_size_bytes=1200 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, b"\x00"])
# ---------------------------------------------------------------------------
# Soft-limit warnings — row
# ---------------------------------------------------------------------------
def test_row_size_soft_limit_logs_warning(cql, test_keyspace, logfile):
"""A row above the detection threshold but below the hard limit must
produce a warning log entry."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '10'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_large_row(cql, tbl, pk=1, ck=0,
value_size_bytes=1200 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, b"\x00"])
wait_for_log(logfile, WARN_RE, timeout=5)
def test_row_no_warning_below_soft_limit(cql, test_keyspace, logfile):
"""A small row must not produce any warning."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '10'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, b"\x00"])
nodetool.flush(cql, tbl)
assert_no_log(logfile, WARN_RE,
lambda: cql.execute(insert, [1, 1, b"\x00"]))
# ---------------------------------------------------------------------------
# Collection element count threshold
# ---------------------------------------------------------------------------
def test_collection_rejects_after_flush(cql, test_keyspace):
"""A collection exceeding the element-count hard limit must be rejected."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '50'))
schema = "pk int, ck int, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_COLLECTION_RE):
cql.execute(insert, [1, 0, {0}])
# Different CK succeeds.
cql.execute(insert, [1, 99, {0}])
# Different partition succeeds.
cql.execute(insert, [2, 0, {0}])
def test_collection_disabled_when_zero(cql, test_keyspace):
"""When fail threshold is 0, no rejection even above the soft limit."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '0'))
schema = "pk int, ck int, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, {0}])
# ---------------------------------------------------------------------------
# Soft-limit warnings — collection
# ---------------------------------------------------------------------------
def test_collection_soft_limit_logs_warning(cql, test_keyspace, logfile):
"""A collection above the detection threshold but below the hard limit
must produce a warning log entry."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '10000'))
schema = "pk int, ck int, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
build_large_collection(cql, tbl, pk=1, ck=0, num_elements=60)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, {0}])
wait_for_log(logfile, WARN_RE, timeout=5)
def test_collection_no_warning_below_soft_limit(cql, test_keyspace, logfile):
"""A small collection must not produce any warning."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '10000'))
schema = "pk int, ck int, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)")
cql.execute(insert, [1, 0, {1, 2, 3}])
nodetool.flush(cql, tbl)
assert_no_log(logfile, WARN_RE,
lambda: cql.execute(insert, [1, 1, {4, 5}]))
# ---------------------------------------------------------------------------
# Per-table enable/disable toggle
# ---------------------------------------------------------------------------
def test_per_table_disabled_at_create(cql, test_keyspace):
"""A table created with guardrails disabled must never reject writes."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = false") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 99, b"\x00"])
def test_per_table_alter_disable(cql, test_keyspace):
"""ALTER TABLE ... WITH large_data_guardrails_enabled = false must stop
rejection."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE):
cql.execute(insert, [1, 99, b"\x00"])
cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = false")
cql.execute(insert, [1, 99, b"\x00"])
def test_per_table_alter_reenable(cql, test_keyspace):
"""After disabling then re-enabling guardrails, writes to a large
partition must be rejected again."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = false") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert, [1, 99, b"\x00"])
cql.execute(f"ALTER TABLE {tbl} WITH large_data_guardrails_enabled = true")
with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE):
cql.execute(insert, [1, 100, b"\x00"])
# ---------------------------------------------------------------------------
# Exemptions
# ---------------------------------------------------------------------------
def test_lwt_paxos_learn_is_exempt(cql, test_keyspace):
"""LWT (Paxos) commit writes must bypass the guardrail."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '1'))
schema = "pk int, ck int, v blob, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
make_oversized_partition(cql, tbl, pk=1, num_rows=2,
value_size_bytes=600 * 1024)
nodetool.flush(cql, tbl)
# Non-LWT write is rejected.
insert = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure, match=REJECT_PARTITION_RE):
cql.execute(insert, [1, 99, b"\x00"])
# LWT write succeeds (Paxos learn bypasses guardrail).
lwt = cql.prepare(
f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?) IF NOT EXISTS")
cql.execute(lwt, [1, 100, b"\x00"])
# ---------------------------------------------------------------------------
# Multi-category: all guardrails fire independently
# ---------------------------------------------------------------------------
def test_all_guardrails_warn_independently(cql, test_keyspace, logfile):
"""When an SSTable has records in all three categories, each guardrail
must warn independently (fail=0 means warn-only)."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '0'))
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '0'))
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '0'))
schema = "pk int, ck int, v blob, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
# Build data large in all three dimensions under pk=1, ck=0.
insert_v = cql.prepare(f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
cql.execute(insert_v, [1, 0, bytes(1200 * 1024)])
update_s = cql.prepare(
f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?")
for i in range(60):
cql.execute(update_s, [{i}, 1, 0])
nodetool.flush(cql, tbl)
insert_both = cql.prepare(
f"INSERT INTO {tbl} (pk, ck, v, s) VALUES (?, ?, ?, ?)")
cql.execute(insert_both, [1, 0, b"\x00", {999}])
wait_for_all_logs(logfile, [
r"partition size.*exceeds",
r"row size.*exceeds",
r"collection element count.*exceeds",
], timeout=5)
def test_each_guardrail_rejects_independently(cql, test_keyspace):
"""Each hard limit must reject independently, with separate PKs isolating
each category."""
with ExitStack() as cfg:
cfg.enter_context(config_value_context(cql,
'compaction_large_partition_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'compaction_large_row_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'compaction_collection_elements_count_warning_threshold', '50'))
cfg.enter_context(config_value_context(cql,
'compaction_large_cell_warning_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_partition_fail_threshold_mb', '2'))
cfg.enter_context(config_value_context(cql,
'rows_count_fail_threshold', '0'))
cfg.enter_context(config_value_context(cql,
'large_row_fail_threshold_mb', '1'))
cfg.enter_context(config_value_context(cql,
'large_collection_elements_fail_threshold', '50'))
schema = "pk int, ck int, v blob, s set<int>, PRIMARY KEY (pk, ck)"
with new_test_table(cql, test_keyspace, schema,
extra=" WITH large_data_guardrails_enabled = true") as tbl:
insert_v = cql.prepare(
f"INSERT INTO {tbl} (pk, ck, v) VALUES (?, ?, ?)")
# pk=1: 20 rows x 150 KB ~ 3 MB partition, each row < 1 MB.
for ck in range(20):
cql.execute(insert_v, [1, ck, bytes(150 * 1024)])
# pk=2: single 1.2 MB row; partition total < 2 MB.
cql.execute(insert_v, [2, 0, bytes(1200 * 1024)])
# pk=3: 60-element collection; row and partition tiny.
update_s = cql.prepare(
f"UPDATE {tbl} SET s = s + ? WHERE pk = ? AND ck = ?")
for i in range(60):
cql.execute(update_s, [{i}, 3, 0])
nodetool.flush(cql, tbl)
# Partition rejection.
with pytest.raises(WriteFailure, match="(?i)partition size.*exceeds"):
cql.execute(insert_v, [1, 99, b"\x00"])
# Row rejection.
with pytest.raises(WriteFailure, match="(?i)row size.*exceeds"):
cql.execute(insert_v, [2, 0, b"\x00"])
# Different CK in pk=2 succeeds.
cql.execute(insert_v, [2, 99, b"\x00"])
# Collection rejection.
insert_s = cql.prepare(
f"INSERT INTO {tbl} (pk, ck, s) VALUES (?, ?, ?)")
with pytest.raises(WriteFailure,
match="(?i)collection element count.*exceeds"):
cql.execute(insert_s, [3, 0, {999}])
# Writing without the collection column succeeds.
cql.execute(insert_v, [3, 0, b"\x00"])
# Different CK in pk=3 succeeds.
cql.execute(insert_s, [3, 99, {999}])