diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index f69d774f49..d1294f637d 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -608,6 +608,7 @@ private: } _pi_write_m; utils::UUID _run_identifier; bool _write_regular_as_static; // See #4139 + scylla_metadata::large_data_stats _large_data_stats; void init_file_writers(); @@ -696,6 +697,13 @@ private: std::optional local_deletion_time; }; + void maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size); + void maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count); + void maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const uint64_t row_size); + void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size); + // Writes single atomic cell void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef, const row_time_properties& properties, std::optional cell_path = {}); @@ -713,7 +721,7 @@ private: void write_static_row(const row&, column_kind); void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key) { ++_c_stats.rows_count; - _sst.get_large_data_handler().maybe_record_large_rows(_sst, *_partition_key, clustering_key, row_size).get(); + maybe_record_large_rows(_sst, *_partition_key, clustering_key, row_size); } // Clustered is a term used to denote an entity that has a clustering key prefix @@ -759,6 +767,32 @@ public: , _sst_schema(make_sstable_schema(s, _enc_stats, _cfg)) , _run_identifier(cfg.run_identifier) , _write_regular_as_static(cfg.correctly_serialize_static_compact_in_mc && s.is_static_compact_table()) + , _large_data_stats({{ + { + large_data_type::partition_size, + large_data_stats_entry{ + .threshold = _sst.get_large_data_handler().get_partition_threshold_bytes(), + } + }, + { + large_data_type::row_size, + large_data_stats_entry{ + .threshold = _sst.get_large_data_handler().get_row_threshold_bytes(), + } + }, + { + large_data_type::cell_size, + large_data_stats_entry{ + .threshold = _sst.get_large_data_handler().get_cell_threshold_bytes(), + } + }, + { + large_data_type::rows_in_partition, + large_data_stats_entry{ + .threshold = _sst.get_large_data_handler().get_rows_count_threshold(), + } + }, + }}) { // This can be 0 in some cases, which is albeit benign, can wreak havoc // in lower-level writer code, so clamp it to [1, +inf) here, which is @@ -1006,6 +1040,48 @@ void writer::consume(tombstone t) { } } +void writer::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) { + auto& entry = _large_data_stats.map.at(large_data_type::partition_size); + if (entry.max_value < partition_size) { + entry.max_value = partition_size; + } + if (_sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size).get0()) { + entry.above_threshold++; + }; +} + +void writer::maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) { + auto& entry = _large_data_stats.map.at(large_data_type::rows_in_partition); + if (entry.max_value < rows_count) { + entry.max_value = rows_count; + } + if (_sst.get_large_data_handler().maybe_log_too_many_rows(sst, partition_key, rows_count)) { + entry.above_threshold++; + } +} + +void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, uint64_t row_size) { + auto& entry = _large_data_stats.map.at(large_data_type::row_size); + if (entry.max_value < row_size) { + entry.max_value = row_size; + } + if (_sst.get_large_data_handler().maybe_record_large_rows(sst, partition_key, clustering_key, row_size).get0()) { + entry.above_threshold++; + }; +} + +void writer::maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) { + auto& entry = _large_data_stats.map.at(large_data_type::cell_size); + if (entry.max_value < cell_size) { + entry.max_value = cell_size; + } + if (_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size).get0()) { + entry.above_threshold++; + }; +} + void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef, const row_time_properties& properties, std::optional cell_path) { @@ -1073,7 +1149,7 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus // We record collections in write_collection, so ignore them here if (cdef.is_atomic()) { uint64_t size = writer.size() - current_pos; - _sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size).get(); + maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); } _c_stats.update_timestamp(cell.timestamp()); @@ -1140,7 +1216,7 @@ void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix } }); uint64_t size = writer.size() - current_pos; - _sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size).get(); + maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size); } void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body, @@ -1387,8 +1463,8 @@ stop_iteration writer::consume_end_of_partition() { // compute size of the current row. _c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset; - _sst.get_large_data_handler().maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get(); - _sst.get_large_data_handler().maybe_log_too_many_rows(_sst, *_partition_key, _c_stats.rows_count); + maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size); + maybe_record_too_many_rows(_sst, *_partition_key, _c_stats.rows_count); // update is about merging column_stats with the data being stored by collector. diff --git a/sstables/types.hh b/sstables/types.hh index d10e0c19fd..30f701f8d2 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -509,8 +509,29 @@ struct run_identifier { auto describe_type(sstable_version_types v, Describer f) { return f(id); } }; +// Types of large data statistics. +// +// Note: For extensibility, never reuse an identifier, +// only add new ones, since these are stored on stable storage. +enum class large_data_type : uint32_t { + partition_size = 1, // partition size, in bytes + row_size = 2, // row size, in bytes + cell_size = 3, // cell size, in bytes + rows_in_partition = 4, // number of rows in a partition +}; + +struct large_data_stats_entry { + uint64_t max_value; + uint64_t threshold; + uint32_t above_threshold; + + template + auto describe_type(sstable_version_types v, Describer f) { return f(max_value, threshold, above_threshold); } +}; + struct scylla_metadata { using extension_attributes = disk_hash, disk_string>; + using large_data_stats = disk_hash; disk_set_of_tagged_union,