diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index 732b494f7c..2ec6977bf3 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -96,7 +96,7 @@ future<> large_data_handler::maybe_delete_large_data_entries(sstables::shared_ss }); } future<> large_cells = make_ready_future<>(); - if (above_threshold(ldt::cell_size)) { + if (above_threshold(ldt::cell_size) || above_threshold(ldt::elements_in_collection)) { large_cells = with_sem([schema, filename, this] () mutable { return delete_large_data_entries(*schema, std::move(filename), db::system_keyspace::LARGE_CELLS); }); @@ -108,12 +108,16 @@ cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, uint64_t collection_elements_count_threshold) : large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, rows_count_threshold, collection_elements_count_threshold) , _feat(feat) -{ - _feat.large_collection_detection.when_enabled([this] { + , _record_large_cells([this] (const sstables::sstable& sst, const sstables::key& pk, const clustering_key_prefix* ck, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) { + return internal_record_large_cells(sst, pk, ck, cdef, cell_size, collection_elements); + }) + , _feat_listener(_feat.large_collection_detection.when_enabled([this] { large_data_logger.debug("Enabled large_collection detection"); - // FIXME: set the record_large_cell function - }); -} + _record_large_cells = [this] (const sstables::sstable& sst, const sstables::key& pk, const clustering_key_prefix* ck, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) { + return internal_record_large_cells_and_collections(sst, pk, ck, cdef, cell_size, collection_elements); + }; + })) +{} template static future<> try_record(std::string_view large_table, const sstables::sstable& sst, const sstables::key& partition_key, int64_t size, @@ -153,6 +157,11 @@ future<> cql_table_large_data_handler::record_large_partitions(const sstables::s future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const { + return _record_large_cells(sst, partition_key, clustering_key, cdef, cell_size, collection_elements); +} + +future<> cql_table_large_data_handler::internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const { auto column_name = cdef.name_as_text(); std::string_view cell_type = cdef.is_atomic() ? "cell" : "collection"; static const std::vector extra_fields{"clustering_key", "column_name"}; @@ -166,6 +175,21 @@ future<> cql_table_large_data_handler::record_large_cells(const sstables::sstabl } } +future<> cql_table_large_data_handler::internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const { + auto column_name = cdef.name_as_text(); + std::string_view cell_type = cdef.is_atomic() ? "cell" : "collection"; + static const std::vector extra_fields{"clustering_key", "column_name", "collection_elements"}; + if (clustering_key) { + const schema &s = *sst.get_schema(); + auto ck_str = key_to_str(*clustering_key, s); + return try_record("cell", sst, partition_key, int64_t(cell_size), cell_type, format("/{}/{}", ck_str, column_name), extra_fields, ck_str, column_name, data_value((int64_t)collection_elements)); + } else { + auto desc = format("static {}", cell_type); + return try_record("cell", sst, partition_key, int64_t(cell_size), desc, format("//{}", column_name), extra_fields, data_value::make_null(utf8_type), column_name, data_value((int64_t)collection_elements)); + } +} + future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const { static const std::vector extra_fields{"clustering_key"}; diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index c1d8434553..7686221267 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -130,6 +130,9 @@ protected: class cql_table_large_data_handler : public large_data_handler { gms::feature_service& _feat; + std::function (const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_items)> _record_large_cells; + std::optional _feat_listener; public: explicit cql_table_large_data_handler(gms::feature_service& feat, uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, uint64_t collection_elements_count_threshold); @@ -140,6 +143,12 @@ protected: virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override; virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; + +private: + future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_items) const; + future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_items) const; }; class nop_large_data_handler : public large_data_handler {