sstables: writer: keep track of large data stats

In the next patch, this is will be written to the
sstable's scylla_metadata component.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-11-22 12:45:38 +02:00
parent 8ab053bd44
commit 79c19a166c
2 changed files with 102 additions and 5 deletions

View File

@@ -608,6 +608,7 @@ private:
} _pi_write_m;
utils::UUID _run_identifier;
bool _write_regular_as_static; // See #4139
scylla_metadata::large_data_stats _large_data_stats;
void init_file_writers();
@@ -696,6 +697,13 @@ private:
std::optional<gc_clock::time_point> local_deletion_time;
};
void maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size);
void maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count);
void maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const uint64_t row_size);
void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size);
// Writes single atomic cell
void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, std::optional<bytes_view> cell_path = {});
@@ -713,7 +721,7 @@ private:
void write_static_row(const row&, column_kind);
void collect_row_stats(uint64_t row_size, const clustering_key_prefix* clustering_key) {
++_c_stats.rows_count;
_sst.get_large_data_handler().maybe_record_large_rows(_sst, *_partition_key, clustering_key, row_size).get();
maybe_record_large_rows(_sst, *_partition_key, clustering_key, row_size);
}
// Clustered is a term used to denote an entity that has a clustering key prefix
@@ -759,6 +767,32 @@ public:
, _sst_schema(make_sstable_schema(s, _enc_stats, _cfg))
, _run_identifier(cfg.run_identifier)
, _write_regular_as_static(cfg.correctly_serialize_static_compact_in_mc && s.is_static_compact_table())
, _large_data_stats({{
{
large_data_type::partition_size,
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_partition_threshold_bytes(),
}
},
{
large_data_type::row_size,
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_row_threshold_bytes(),
}
},
{
large_data_type::cell_size,
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_cell_threshold_bytes(),
}
},
{
large_data_type::rows_in_partition,
large_data_stats_entry{
.threshold = _sst.get_large_data_handler().get_rows_count_threshold(),
}
},
}})
{
// This can be 0 in some cases, which is albeit benign, can wreak havoc
// in lower-level writer code, so clamp it to [1, +inf) here, which is
@@ -1006,6 +1040,48 @@ void writer::consume(tombstone t) {
}
}
void writer::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size) {
auto& entry = _large_data_stats.map.at(large_data_type::partition_size);
if (entry.max_value < partition_size) {
entry.max_value = partition_size;
}
if (_sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size).get0()) {
entry.above_threshold++;
};
}
void writer::maybe_record_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t rows_count) {
auto& entry = _large_data_stats.map.at(large_data_type::rows_in_partition);
if (entry.max_value < rows_count) {
entry.max_value = rows_count;
}
if (_sst.get_large_data_handler().maybe_log_too_many_rows(sst, partition_key, rows_count)) {
entry.above_threshold++;
}
}
void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) {
auto& entry = _large_data_stats.map.at(large_data_type::row_size);
if (entry.max_value < row_size) {
entry.max_value = row_size;
}
if (_sst.get_large_data_handler().maybe_record_large_rows(sst, partition_key, clustering_key, row_size).get0()) {
entry.above_threshold++;
};
}
void writer::maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size) {
auto& entry = _large_data_stats.map.at(large_data_type::cell_size);
if (entry.max_value < cell_size) {
entry.max_value = cell_size;
}
if (_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size).get0()) {
entry.above_threshold++;
};
}
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
const column_definition& cdef, const row_time_properties& properties, std::optional<bytes_view> cell_path) {
@@ -1073,7 +1149,7 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus
// We record collections in write_collection, so ignore them here
if (cdef.is_atomic()) {
uint64_t size = writer.size() - current_pos;
_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size).get();
maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size);
}
_c_stats.update_timestamp(cell.timestamp());
@@ -1140,7 +1216,7 @@ void writer::write_collection(bytes_ostream& writer, const clustering_key_prefix
}
});
uint64_t size = writer.size() - current_pos;
_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size).get();
maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, size);
}
void writer::write_cells(bytes_ostream& writer, const clustering_key_prefix* clustering_key, column_kind kind, const row& row_body,
@@ -1387,8 +1463,8 @@ stop_iteration writer::consume_end_of_partition() {
// compute size of the current row.
_c_stats.partition_size = _data_writer->offset() - _c_stats.start_offset;
_sst.get_large_data_handler().maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size).get();
_sst.get_large_data_handler().maybe_log_too_many_rows(_sst, *_partition_key, _c_stats.rows_count);
maybe_record_large_partitions(_sst, *_partition_key, _c_stats.partition_size);
maybe_record_too_many_rows(_sst, *_partition_key, _c_stats.rows_count);
// update is about merging column_stats with the data being stored by collector.

View File

@@ -509,8 +509,29 @@ struct run_identifier {
auto describe_type(sstable_version_types v, Describer f) { return f(id); }
};
// Types of large data statistics.
//
// Note: For extensibility, never reuse an identifier,
// only add new ones, since these are stored on stable storage.
enum class large_data_type : uint32_t {
partition_size = 1, // partition size, in bytes
row_size = 2, // row size, in bytes
cell_size = 3, // cell size, in bytes
rows_in_partition = 4, // number of rows in a partition
};
struct large_data_stats_entry {
uint64_t max_value;
uint64_t threshold;
uint32_t above_threshold;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) { return f(max_value, threshold, above_threshold); }
};
struct scylla_metadata {
using extension_attributes = disk_hash<uint32_t, disk_string<uint32_t>, disk_string<uint32_t>>;
using large_data_stats = disk_hash<uint32_t, large_data_type, large_data_stats_entry>;
disk_set_of_tagged_union<scylla_metadata_type,
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::Sharding, sharding_metadata>,