diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index 3531a8f269..c4ed36edb6 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -13,6 +13,8 @@ #include "db/system_keyspace.hh" #include "db/large_data_handler.hh" #include "keys/keys.hh" +#include "mutation/mutation_partition.hh" +#include "replica/exceptions.hh" #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "cql3/untyped_result_set.hh" @@ -21,6 +23,10 @@ static logging::logger large_data_logger("large_data"); namespace db { +namespace { +constexpr uint64_t MB = 1024 * 1024; +} + nop_large_data_handler::nop_large_data_handler() : large_data_handler(std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max()) { @@ -152,6 +158,127 @@ std::optional large_data_record_index::lookup_collection(bytes_view pk return result; } +void large_data_guardrail::register_sstable(sstables::shared_sstable sst) { + _index.register_sstable(std::move(sst)); +} + +void large_data_guardrail::rebuild(const std::unordered_set& sstables) { + _index.rebuild(sstables); +} + +void large_data_guardrail::check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const { + auto sst_key = sstables::key::from_partition_key(s, pk); + auto pk_bytes = bytes_view(sst_key.get_bytes()); + check_partition(s, pk_bytes, pk); + check_rows_and_collections(s, pk_bytes, mp, pk); +} + +void large_data_guardrail::check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const { + const uint64_t size_fail = uint64_t(_cfg.partition_size_fail_threshold_mb()) * MB; + const uint64_t size_warn = uint64_t(_cfg.partition_size_warn_threshold_mb()) * MB; + const uint64_t rows_fail = uint64_t(_cfg.rows_count_fail_threshold()); + const uint64_t rows_warn = uint64_t(_cfg.rows_count_warn_threshold()); + + auto entry = _index.lookup_partition(pk_bytes); + if (!entry) [[likely]] { + return; + } + if (size_fail > 0 && entry->partition_size >= size_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition size {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->partition_size >= size_warn) { + large_data_logger.warn("Large data guardrail: partition size {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->partition_size, size_warn, s.ks_name(), s.cf_name(), pk); + } + if (rows_fail > 0 && entry->rows >= rows_fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: partition row count {} exceeds hard limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_fail, s.ks_name(), s.cf_name(), pk)); + } + if (entry->rows >= rows_warn) { + large_data_logger.warn("Large data guardrail: partition row count {} exceeds soft limit {} " + "(keyspace={}, table={}, partition_key={})", + entry->rows, rows_warn, s.ks_name(), s.cf_name(), pk); + } +} + +void large_data_guardrail::check_rows_and_collections(const schema& s, bytes_view pk_bytes, + const mutation_partition& mp, partition_key_view pk) const { + + if (!mp.static_row().empty()) { + check_row_size(s, pk_bytes, pk, bytes_view(), nullptr); + mp.static_row().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.static_column_at(id), bytes_view(), nullptr); + }); + } + + for (const auto& cr : mp.non_dummy_rows()) { + auto ck_bytes = cr.key().view().representation().linearize(); + auto ck_bv = bytes_view(ck_bytes); + check_row_size(s, pk_bytes, pk, ck_bv, &cr.key()); + cr.row().cells().for_each_cell([&](column_id id, const atomic_cell_or_collection&) { + check_collection_element_count(s, pk_bytes, pk, s.regular_column_at(id), ck_bv, &cr.key()); + }); + } +} + +void large_data_guardrail::check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const { + const uint64_t fail = uint64_t(_cfg.row_size_fail_threshold_mb()) * MB; + const uint64_t warn = uint64_t(_cfg.row_size_warn_threshold_mb()) * MB; + auto row_size = _index.lookup_row(pk_bytes, ck_bytes); + if (!row_size) [[likely]] { + return; + } + if (fail > 0 && *row_size >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: row size {} exceeds hard limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, fail, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*row_size >= warn) { + large_data_logger.warn("Large data guardrail: row size {} exceeds soft limit {} " + "(keyspace={}, table={}, clustering_key={})", + *row_size, warn, s.ks_name(), s.cf_name(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + +void large_data_guardrail::check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const { + if (cdef.is_atomic()) { + return; + } + const uint64_t fail = uint64_t(_cfg.collection_elements_fail_threshold()); + const uint64_t warn = uint64_t(_cfg.collection_elements_warn_threshold()); + auto col_bytes = to_bytes(cdef.name_as_text()); + auto count = _index.lookup_collection(pk_bytes, ck_bytes, bytes_view(col_bytes)); + if (!count) [[likely]] { + return; + } + if (fail > 0 && *count >= fail) { + throw replica::large_data_exception(s.ks_name(), s.cf_name(), format( + "Large data guardrail: collection element count {} exceeds hard limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, fail, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring())); + } + if (*count >= warn) { + large_data_logger.warn("Large data guardrail: collection element count {} exceeds soft limit {} " + "(keyspace={}, table={}, column={}, clustering_key={})", + *count, warn, s.ks_name(), s.cf_name(), cdef.name_as_text(), + ck ? format("{}", ck->with_schema(s)) : sstring()); + } +} + sstring large_data_handler::sst_filename(const sstables::sstable& sst) { return sst.component_basename(sstables::component_type::Data); } @@ -442,4 +569,5 @@ future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(co large_table_name, s.ks_name(), s.cf_name(), old_name, new_name, std::current_exception()); } } + } diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 3d805e719b..b55de253b4 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "bytes.hh" #include "schema/schema_fwd.hh" @@ -28,6 +29,7 @@ class key; } class partition_key_view; +class mutation_partition; namespace db { @@ -111,6 +113,64 @@ private: record_set _collections; }; +struct guardrail_config { + utils::updateable_value partition_size_fail_threshold_mb; + utils::updateable_value partition_size_warn_threshold_mb; + utils::updateable_value rows_count_fail_threshold; + utils::updateable_value rows_count_warn_threshold; + utils::updateable_value row_size_fail_threshold_mb; + utils::updateable_value row_size_warn_threshold_mb; + utils::updateable_value collection_elements_fail_threshold; + utils::updateable_value collection_elements_warn_threshold; +}; + +// Each replica::table holds a unique_ptr to either a real guardrail or a +// noop. The guardrail owns the per-table large_data_record_index, so +// noop tables pay no index-maintenance cost. +class large_data_guardrail_base { +public: + virtual ~large_data_guardrail_base() = default; + virtual void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const = 0; + virtual void register_sstable(sstables::shared_sstable sst) = 0; + virtual void rebuild(const std::unordered_set& sstables) = 0; +}; + +class noop_large_data_guardrail final : public large_data_guardrail_base { +public: + static shared_ptr instance() { + static thread_local auto inst = make_shared(); + return inst; + } + void check(const schema&, const mutation_partition&, + partition_key_view) const override {} + void register_sstable(sstables::shared_sstable) override {} + void rebuild(const std::unordered_set&) override {} +}; + +class large_data_guardrail final : public large_data_guardrail_base { +public: + explicit large_data_guardrail(guardrail_config cfg) noexcept + : _cfg(std::move(cfg)) {} + + void check(const schema& s, const mutation_partition& mp, + partition_key_view pk) const override; + void register_sstable(sstables::shared_sstable sst) override; + void rebuild(const std::unordered_set& sstables) override; + +private: + void check_partition(const schema& s, bytes_view pk_bytes, partition_key_view pk) const; + void check_rows_and_collections(const schema& s, bytes_view pk_bytes, const mutation_partition& mp, partition_key_view pk) const; + void check_row_size(const schema& s, bytes_view pk_bytes, partition_key_view pk, + bytes_view ck_bytes, const clustering_key_prefix* ck) const; + void check_collection_element_count(const schema& s, bytes_view pk_bytes, partition_key_view pk, + const column_definition& cdef, bytes_view ck_bytes, + const clustering_key_prefix* ck) const; + + guardrail_config _cfg; + large_data_record_index _index; +}; + class large_data_handler { public: struct stats {