db: implement large_data_guardrail

Checks partition size, row count, row size, and collection element count
against config thresholds using large_data_record_index lookups.
Warns on soft limit, throws large_data_exception on hard limit.
This commit is contained in:
Taras Veretilnyk
2026-05-13 13:55:10 +02:00
parent ae8879d2f6
commit f7ffc64703
2 changed files with 188 additions and 0 deletions

View File

@@ -13,6 +13,8 @@
#include "db/system_keyspace.hh"
#include "db/large_data_handler.hh"
#include "keys/keys.hh"
#include "mutation/mutation_partition.hh"
#include "replica/exceptions.hh"
#include "sstables/sstables.hh"
#include "gms/feature_service.hh"
#include "cql3/untyped_result_set.hh"
@@ -21,6 +23,10 @@ static logging::logger large_data_logger("large_data");
namespace db {
namespace {
constexpr uint64_t MB = 1024 * 1024;
}
nop_large_data_handler::nop_large_data_handler()
: large_data_handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()) {
@@ -152,6 +158,127 @@ std::optional<uint64_t> large_data_record_index::lookup_collection(bytes_view pk
return result;
}
void large_data_guardrail::register_sstable(sstables::shared_sstable sst) {
_index.register_sstable(std::move(sst));
}
void large_data_guardrail::rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) {
_index.rebuild(sstables);
}
void large_data_guardrail::check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const {
auto sst_key = sstables::key::from_partition_key(s, pk);
auto pk_bytes = bytes_view(sst_key.get_bytes());
check_partition(s, pk_bytes, pk);
check_rows_and_collections(s, pk_bytes, mp, pk);
}
void large_data_guardrail::check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const {
const uint64_t size_fail = uint64_t(_cfg.partition_size_fail_threshold_mb()) * MB;
const uint64_t size_warn = uint64_t(_cfg.partition_size_warn_threshold_mb()) * MB;
const uint64_t rows_fail = uint64_t(_cfg.rows_count_fail_threshold());
const uint64_t rows_warn = uint64_t(_cfg.rows_count_warn_threshold());
auto entry = _index.lookup_partition(pk_bytes);
if (!entry) [[likely]] {
return;
}
if (size_fail > 0 && entry->partition_size >= size_fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: partition size {} exceeds hard limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->partition_size, size_fail, s.ks_name(), s.cf_name(), pk));
}
if (entry->partition_size >= size_warn) {
large_data_logger.warn("Large data guardrail: partition size {} exceeds soft limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->partition_size, size_warn, s.ks_name(), s.cf_name(), pk);
}
if (rows_fail > 0 && entry->rows >= rows_fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: partition row count {} exceeds hard limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->rows, rows_fail, s.ks_name(), s.cf_name(), pk));
}
if (entry->rows >= rows_warn) {
large_data_logger.warn("Large data guardrail: partition row count {} exceeds soft limit {} "
"(keyspace={}, table={}, partition_key={})",
entry->rows, rows_warn, s.ks_name(), s.cf_name(), pk);
}
}
void large_data_guardrail::check_rows_and_collections(const schema& s, bytes_view pk_bytes,
const mutation_partition& mp, partition_key_view pk) const {
if (!mp.static_row().empty()) {
check_row_size(s, pk_bytes, pk, bytes_view(), nullptr);
mp.static_row().for_each_cell([&](column_id id, const atomic_cell_or_collection&) {
check_collection_element_count(s, pk_bytes, pk, s.static_column_at(id), bytes_view(), nullptr);
});
}
for (const auto& cr : mp.non_dummy_rows()) {
auto ck_bytes = cr.key().view().representation().linearize();
auto ck_bv = bytes_view(ck_bytes);
check_row_size(s, pk_bytes, pk, ck_bv, &cr.key());
cr.row().cells().for_each_cell([&](column_id id, const atomic_cell_or_collection&) {
check_collection_element_count(s, pk_bytes, pk, s.regular_column_at(id), ck_bv, &cr.key());
});
}
}
void large_data_guardrail::check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk,
bytes_view ck_bytes, const clustering_key_prefix* ck) const {
const uint64_t fail = uint64_t(_cfg.row_size_fail_threshold_mb()) * MB;
const uint64_t warn = uint64_t(_cfg.row_size_warn_threshold_mb()) * MB;
auto row_size = _index.lookup_row(pk_bytes, ck_bytes);
if (!row_size) [[likely]] {
return;
}
if (fail > 0 && *row_size >= fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: row size {} exceeds hard limit {} "
"(keyspace={}, table={}, clustering_key={})",
*row_size, fail, s.ks_name(), s.cf_name(),
ck ? format("{}", ck->with_schema(s)) : sstring()));
}
if (*row_size >= warn) {
large_data_logger.warn("Large data guardrail: row size {} exceeds soft limit {} "
"(keyspace={}, table={}, clustering_key={})",
*row_size, warn, s.ks_name(), s.cf_name(),
ck ? format("{}", ck->with_schema(s)) : sstring());
}
}
void large_data_guardrail::check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk,
const column_definition& cdef, bytes_view ck_bytes,
const clustering_key_prefix* ck) const {
if (cdef.is_atomic()) {
return;
}
const uint64_t fail = uint64_t(_cfg.collection_elements_fail_threshold());
const uint64_t warn = uint64_t(_cfg.collection_elements_warn_threshold());
auto col_bytes = to_bytes(cdef.name_as_text());
auto count = _index.lookup_collection(pk_bytes, ck_bytes, bytes_view(col_bytes));
if (!count) [[likely]] {
return;
}
if (fail > 0 && *count >= fail) {
throw replica::large_data_exception(s.ks_name(), s.cf_name(), format(
"Large data guardrail: collection element count {} exceeds hard limit {} "
"(keyspace={}, table={}, column={}, clustering_key={})",
*count, fail, s.ks_name(), s.cf_name(), cdef.name_as_text(),
ck ? format("{}", ck->with_schema(s)) : sstring()));
}
if (*count >= warn) {
large_data_logger.warn("Large data guardrail: collection element count {} exceeds soft limit {} "
"(keyspace={}, table={}, column={}, clustering_key={})",
*count, warn, s.ks_name(), s.cf_name(), cdef.name_as_text(),
ck ? format("{}", ck->with_schema(s)) : sstring());
}
}
sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
return sst.component_basename(sstables::component_type::Data);
}
@@ -442,4 +569,5 @@ future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(co
large_table_name, s.ks_name(), s.cf_name(), old_name, new_name, std::current_exception());
}
}
}

View File

@@ -11,6 +11,7 @@
#include <concepts>
#include <cstdint>
#include <optional>
#include <seastar/core/shared_ptr.hh>
#include <boost/intrusive/set.hpp>
#include "bytes.hh"
#include "schema/schema_fwd.hh"
@@ -28,6 +29,7 @@ class key;
}
class partition_key_view;
class mutation_partition;
namespace db {
@@ -111,6 +113,64 @@ private:
record_set<record_type::collection> _collections;
};
struct guardrail_config {
utils::updateable_value<uint32_t> partition_size_fail_threshold_mb;
utils::updateable_value<uint32_t> partition_size_warn_threshold_mb;
utils::updateable_value<uint32_t> rows_count_fail_threshold;
utils::updateable_value<uint32_t> rows_count_warn_threshold;
utils::updateable_value<uint32_t> row_size_fail_threshold_mb;
utils::updateable_value<uint32_t> row_size_warn_threshold_mb;
utils::updateable_value<uint32_t> collection_elements_fail_threshold;
utils::updateable_value<uint32_t> collection_elements_warn_threshold;
};
// Each replica::table holds a unique_ptr to either a real guardrail or a
// noop. The guardrail owns the per-table large_data_record_index, so
// noop tables pay no index-maintenance cost.
class large_data_guardrail_base {
public:
virtual ~large_data_guardrail_base() = default;
virtual void check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const = 0;
virtual void register_sstable(sstables::shared_sstable sst) = 0;
virtual void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) = 0;
};
class noop_large_data_guardrail final : public large_data_guardrail_base {
public:
static shared_ptr<large_data_guardrail_base> instance() {
static thread_local auto inst = make_shared<noop_large_data_guardrail>();
return inst;
}
void check(const schema&, const mutation_partition&,
partition_key_view) const override {}
void register_sstable(sstables::shared_sstable) override {}
void rebuild(const std::unordered_set<sstables::shared_sstable>&) override {}
};
class large_data_guardrail final : public large_data_guardrail_base {
public:
explicit large_data_guardrail(guardrail_config cfg) noexcept
: _cfg(std::move(cfg)) {}
void check(const schema& s, const mutation_partition& mp,
partition_key_view pk) const override;
void register_sstable(sstables::shared_sstable sst) override;
void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables) override;
private:
void check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const;
void check_rows_and_collections(const schema& s, bytes_view pk_bytes, const mutation_partition& mp, partition_key_view pk) const;
void check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk,
bytes_view ck_bytes, const clustering_key_prefix* ck) const;
void check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk,
const column_definition& cdef, bytes_view ck_bytes,
const clustering_key_prefix* ck) const;
guardrail_config _cfg;
large_data_record_index _index;
};
class large_data_handler {
public:
struct stats {