diff --git a/db/config.cc b/db/config.cc index b5f10e8aaa..c19a8e613d 100644 --- a/db/config.cc +++ b/db/config.cc @@ -790,6 +790,8 @@ db::config::config(std::shared_ptr exts) "Log a warning when writing a number of rows larger than this value.") , compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000, "Log a warning when writing a collection containing more elements than this value.") + , compaction_large_data_records_per_sstable(this, "compaction_large_data_records_per_sstable", liveness::LiveUpdate, value_status::Used, 10, + "Maximum number of large data records per type to store in each SSTable's scylla metadata.") /** * @Group Common memtable settings */ diff --git a/db/config.hh b/db/config.hh index 6c57f9b420..fd36d0112f 100644 --- a/db/config.hh +++ b/db/config.hh @@ -228,6 +228,7 @@ public: named_value compaction_large_cell_warning_threshold_mb; named_value compaction_rows_count_warning_threshold; named_value compaction_collection_elements_count_warning_threshold; + named_value compaction_large_data_records_per_sstable; named_value memtable_total_space_in_mb; named_value concurrent_reads; named_value concurrent_writes; diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index c406881603..d3b3e4aaf2 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -12,6 +12,7 @@ #include #include "db/system_keyspace.hh" #include "db/large_data_handler.hh" +#include "keys/keys.hh" #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "cql3/untyped_result_set.hh" @@ -39,19 +40,19 @@ large_data_handler::large_data_handler(uint64_t partition_threshold_bytes, uint6 partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, rows_count_threshold, _collection_elements_count_threshold); } -future large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) { +future large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) { SCYLLA_ASSERT(running()); - partition_above_threshold above_threshold{partition_size > _partition_threshold_bytes, rows > _rows_count_threshold}; + above_threshold_result above_threshold{.size = partition_size > _partition_threshold_bytes, .elements = rows > _rows_count_threshold}; static_assert(std::is_same_v); _stats.partitions_bigger_than_threshold += above_threshold.size; // increment if true - if (above_threshold.size || above_threshold.rows) [[unlikely]] { + if (above_threshold.size || above_threshold.elements) [[unlikely]] { return with_sem([&sst, &key, partition_size, rows, range_tombstones, dead_rows, this] { return record_large_partitions(sst, key, partition_size, rows, range_tombstones, dead_rows); }).then([above_threshold] { return above_threshold; }); } - return make_ready_future(); + return make_ready_future(); } void large_data_handler::start() { @@ -75,10 +76,6 @@ future<> large_data_handler::unplug_system_keyspace() noexcept { co_await _sys_ks.unplug(); } -template static std::string key_to_str(const T& key, const schema& s) { - return fmt::to_string(key.with_schema(s)); -} - sstring large_data_handler::sst_filename(const sstables::sstable& sst) { return sst.component_basename(sstables::component_type::Data); } @@ -145,6 +142,10 @@ future<> large_data_handler::maybe_update_large_data_entries_sstable_name(sstabl return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result(); } +bool cql_table_large_data_handler::skip_cql_writes() const { + return bool(_feat.large_data_virtual_tables); +} + cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& feat, utils::updateable_value partition_threshold_mb, utils::updateable_value row_threshold_mb, @@ -183,6 +184,9 @@ future<> cql_table_large_data_handler::do_insert_large_data_entry(std::string_vi sstring ks_name, sstring cf_name, sstring sstable_name, int64_t size, sstring partition_key, db_clock::time_point compaction_time, const std::vector& extra_fields, Args&&... args) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); if (!sys_ks) { co_return; @@ -287,6 +291,9 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable } future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); SCYLLA_ASSERT(sys_ks); const sstring req = @@ -332,6 +339,9 @@ cql_table_large_data_handler::row_reinsert_func cql_table_large_data_handler::ma } future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); SCYLLA_ASSERT(sys_ks); // sstable_name is a clustering key, so we can't update it in place. diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index 72df8c9f6d..1e68ee7bd7 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -91,24 +91,25 @@ public: return make_ready_future(false); } - struct partition_above_threshold { + struct above_threshold_result { bool size = false; - bool rows = false; + bool elements = false; }; - future maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, + future maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows); - future maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, + future maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) { SCYLLA_ASSERT(running()); - if (cell_size > _cell_threshold_bytes || collection_elements > _collection_elements_count_threshold) [[unlikely]] { + above_threshold_result above_threshold{.size = cell_size > _cell_threshold_bytes, .elements = collection_elements > _collection_elements_count_threshold}; + if (above_threshold.size || above_threshold.elements) [[unlikely]] { return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, collection_elements, this] { return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size, collection_elements); - }).then([] { - return true; + }).then([above_threshold] { + return above_threshold; }); } - return make_ready_future(false); + return make_ready_future(); } future<> maybe_delete_large_data_entries(sstables::shared_sstable sst); @@ -180,6 +181,12 @@ protected: virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; private: + // Returns true if CQL writes to system.large_* tables should be skipped. + // Once LARGE_DATA_VIRTUAL_TABLES is enabled, large data records are served + // from SSTable metadata via virtual tables and the physical CQL tables are + // dropped, so writing to them is both unnecessary and would fail. + bool skip_cql_writes() const; + future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const; future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key, diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index 668c4ff4fa..076aad9a6e 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -26,6 +26,8 @@ #include "db/size_estimates_virtual_reader.hh" #include "db/view/build_progress_virtual_reader.hh" #include "index/built_indexes_virtual_reader.hh" +#include "keys/keys.hh" +#include "gms/feature_service.hh" #include "gms/gossiper.hh" #include "mutation/frozen_mutation.hh" #include "transport/protocol_server.hh" @@ -35,6 +37,7 @@ #include "service/raft/raft_group_registry.hh" #include "service/storage_service.hh" #include "service/tablet_allocator.hh" +#include "sstables/sstables.hh" #include "locator/load_sketch.hh" #include "types/list.hh" #include "types/types.hh" @@ -1461,6 +1464,595 @@ private: } }; +// Helper: convert bytes stored in a disk_string to sstring. +// disk_string::value is of type `bytes`. +static sstring disk_string_to_sstring(const sstables::disk_string& ds) { + return sstring(to_string_view(ds.value)); +} + +// Helper: convert int64_t millis-since-epoch to db_clock::time_point +// for use with timestamp_type columns in virtual tables. +static db_clock::time_point millis_to_db_clock(int64_t ms) { + return db_clock::time_point(db_clock::duration(ms)); +} + +// Derive compaction_time (milliseconds since Unix epoch) from the SSTable's +// generation UUID. If the generation is not UUID-based (legacy integer +// generations), returns 0. +static int64_t compaction_time_from_generation(const sstables::sstable& sst) { + auto gen = sst.generation(); + if (gen.is_uuid_based()) { + return utils::UUID_gen::unix_timestamp(gen.as_uuid()).count(); + } + return 0; +} + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows partition_size records from all live SSTables, matching the +// schema of the legacy system.large_partitions CQL table. +class large_partitions_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t partition_size; + sstring partition_key; + int64_t compaction_time; + int64_t rows; + int64_t range_tombstones; + int64_t dead_rows; + }; + + // Extract partition_size records from a single table's SSTables on the local shard. + // Called on each shard via map() to collect records cross-shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + // Legacy SSTable without LargeDataRecords: emit synthetic row + // if above_threshold > 0 for partition_size or rows_in_partition stats. + auto stat = sst->get_large_data_stat(sstables::large_data_type::partition_size); + auto rows_stat = sst->get_large_data_stat(sstables::large_data_type::rows_in_partition); + if ((stat && stat->above_threshold > 0) || (rows_stat && rows_stat->above_threshold > 0)) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .partition_size = static_cast(stat ? stat->max_value : 0), + .partition_key = "(details unavailable - legacy SSTable)", + .compaction_time = compaction_time_from_generation(*sst), + .rows = static_cast(rows_stat ? rows_stat->max_value : 0), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::partition_size && + rec.type != sstables::large_data_type::rows_in_partition) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + records.push_back(record{ + .sstable_name = sst_name, + .partition_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .compaction_time = compaction_time, + .rows = static_cast(rec.elements_count), + .range_tombstones = static_cast(rec.range_tombstones), + .dead_rows = static_cast(rec.dead_rows), + }); + } + } + co_return records; + } + +public: + explicit large_partitions_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, 1); + schema_builder builder(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, std::make_optional(id)); + builder.with_column("keyspace_name", utf8_type, column_kind::partition_key); + builder.with_column("table_name", utf8_type, column_kind::partition_key); + builder.with_column("sstable_name", utf8_type, column_kind::clustering_key); + builder.with_column("partition_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key); + builder.with_column("partition_key", utf8_type, column_kind::clustering_key); + builder.with_column("rows", long_type); + builder.with_column("compaction_time", timestamp_type); + builder.with_column("range_tombstones", long_type); + builder.with_column("dead_rows", long_type); + builder.set_comment("partitions larger than specified threshold"); + builder.with_hash_version(); + return builder.build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t partition_size, const sstring& pk) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(partition_size), + data_value(pk).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + // Phase 1: Determine which (ks, table) partitions this shard owns. + // Table metadata is shared across all shards, so local iteration is sufficient. + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + // Sort by token order — streaming_virtual_table requires + // partitions to be emitted in token order. + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + // Phase 2: For each owned table, collect records from ALL shards + // in parallel, then sort and emit immediately. We process one + // table at a time to bound memory usage: once we have emitted + // page_size records we stop and let CQL paging fetch the rest. + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + // Sort by (sstable_name ASC, partition_size DESC, partition_key ASC) to match clustering order + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.partition_size != b.partition_size) { + return a.partition_size > b.partition_size; // DESC + } + return a.partition_key < b.partition_key; + }); + // A partition that exceeds both the size and the row-count + // thresholds produces two LargeDataRecords entries (one per + // type) that map to identical clustering keys. Remove the + // duplicates so that every emitted row has a unique key. + records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) { + return a.sstable_name == b.sstable_name + && a.partition_size == b.partition_size + && a.partition_key == b.partition_key; + }), records.end()); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.partition_size, rec.partition_key)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + if (rec.rows != 0) { + set_cell(cr.cells(), "rows", rec.rows); + } + if (rec.range_tombstones != 0) { + set_cell(cr.cells(), "range_tombstones", rec.range_tombstones); + } + if (rec.dead_rows != 0) { + set_cell(cr.cells(), "dead_rows", rec.dead_rows); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows row_size records from all live SSTables, matching the +// schema of the legacy system.large_rows CQL table. +class large_rows_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t row_size; + sstring partition_key; + sstring clustering_key; + int64_t compaction_time; + }; + + // Extract row_size records from a single table's SSTables on the local shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + auto stat = sst->get_large_data_stat(sstables::large_data_type::row_size); + if (stat && stat->above_threshold > 0) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .row_size = static_cast(stat->max_value), + .partition_key = "(details unavailable - legacy SSTable)", + .clustering_key = "", + .compaction_time = compaction_time_from_generation(*sst), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::row_size) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + auto ck_str = rec.clustering_key.value.empty() + ? sstring() + : sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema)); + records.push_back(record{ + .sstable_name = sst_name, + .row_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .clustering_key = std::move(ck_str), + .compaction_time = compaction_time, + }); + } + } + co_return records; + } + +public: + explicit large_rows_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_ROWS, 1); + return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_ROWS, std::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + .with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("compaction_time", timestamp_type) + .set_comment("rows larger than specified threshold") + .with_hash_version() + .build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t row_size, + const sstring& pk, const sstring& ck) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(row_size), + data_value(pk).serialize_nonnull(), + data_value(ck).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.row_size != b.row_size) { + return a.row_size > b.row_size; // DESC + } else if (a.partition_key != b.partition_key) { + return a.partition_key < b.partition_key; + } + return a.clustering_key < b.clustering_key; + }); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.row_size, rec.partition_key, rec.clustering_key)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows cell_size records from all live SSTables, matching the +// schema of the legacy system.large_cells CQL table. +class large_cells_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t cell_size; + sstring partition_key; + sstring clustering_key; + sstring column_name; + int64_t collection_elements; // -1 means not applicable + int64_t compaction_time; + }; + + // Extract cell_size records from a single table's SSTables on the local shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + auto stat = sst->get_large_data_stat(sstables::large_data_type::cell_size); + auto elem_stat = sst->get_large_data_stat(sstables::large_data_type::elements_in_collection); + if ((stat && stat->above_threshold > 0) || (elem_stat && elem_stat->above_threshold > 0)) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .cell_size = static_cast(stat ? stat->max_value : 0), + .partition_key = "(details unavailable - legacy SSTable)", + .clustering_key = "", + .column_name = "", + .collection_elements = elem_stat ? static_cast(elem_stat->max_value) : int64_t(-1), + .compaction_time = compaction_time_from_generation(*sst), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::cell_size && + rec.type != sstables::large_data_type::elements_in_collection) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + auto ck_str = rec.clustering_key.value.empty() + ? sstring() + : sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema)); + records.push_back(record{ + .sstable_name = sst_name, + .cell_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .clustering_key = std::move(ck_str), + .column_name = disk_string_to_sstring(rec.column_name), + .collection_elements = rec.elements_count > 0 + ? static_cast(rec.elements_count) : int64_t(-1), + .compaction_time = compaction_time, + }); + } + } + co_return records; + } + +public: + explicit large_cells_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_CELLS, 1); + return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_CELLS, std::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + .with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("column_name", utf8_type, column_kind::clustering_key) + .with_column("collection_elements", long_type) + .with_column("compaction_time", timestamp_type) + .set_comment("cells larger than specified threshold") + .with_hash_version() + .build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t cell_size, + const sstring& pk, const sstring& ck, const sstring& col_name) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(cell_size), + data_value(pk).serialize_nonnull(), + data_value(ck).serialize_nonnull(), + data_value(col_name).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.cell_size != b.cell_size) { + return a.cell_size > b.cell_size; // DESC + } else if (a.partition_key != b.partition_key) { + return a.partition_key < b.partition_key; + } else if (a.clustering_key != b.clustering_key) { + return a.clustering_key < b.clustering_key; + } + return a.column_name < b.column_name; + }); + // A collection cell that exceeds both the size and the + // element-count thresholds produces two LargeDataRecords + // entries that map to identical clustering keys. Remove the + // duplicates so that every emitted row has a unique key. + records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) { + return a.sstable_name == b.sstable_name + && a.cell_size == b.cell_size + && a.partition_key == b.partition_key + && a.clustering_key == b.clustering_key + && a.column_name == b.column_name; + }), records.end()); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.cell_size, + rec.partition_key, rec.clustering_key, rec.column_name)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + if (rec.collection_elements >= 0) { + set_cell(cr.cells(), "collection_elements", rec.collection_elements); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +} + +// Helper: register a virtual table on the local shard. +static future<> add_virtual_table( + sharded& sys_ks, + sharded& dist_db, + sharded& dist_ss, + std::unique_ptr&& tbl) { + auto& virtual_tables = *sys_ks.local().get_virtual_tables_registry(); + auto& db = dist_db.local(); + auto& ss = dist_ss.local(); + + auto schema = tbl->schema(); + virtual_tables[schema->id()] = std::move(tbl); + co_await db.create_local_system_table(schema, false, ss.get_erm_factory()); + auto& cf = db.find_column_family(schema); + cf.mark_ready_for_writes(nullptr); + auto& vt = virtual_tables[schema->id()]; + cf.set_virtual_reader(vt->as_mutation_source()); + cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); }); } future<> initialize_virtual_tables( @@ -1469,41 +2061,86 @@ future<> initialize_virtual_tables( sharded& sys_ks, sharded& tablet_allocator, sharded& ms, - db::config& cfg) { - auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry(); - auto& virtual_tables = *virtual_tables_registry; - auto& db = dist_db.local(); - auto& ss = dist_ss.local(); + db::config& cfg, + gms::feature_service& feat) { + co_await smp::invoke_on_all([&] () -> future<> { + auto add_table = [&] (std::unique_ptr&& tbl) -> future<> { + co_await add_virtual_table(sys_ks, dist_db, dist_ss, std::move(tbl)); + }; - auto add_table = [&] (std::unique_ptr&& tbl) -> future<> { - auto schema = tbl->schema(); - virtual_tables[schema->id()] = std::move(tbl); - co_await db.create_local_system_table(schema, false, ss.get_erm_factory()); - auto& cf = db.find_column_family(schema); - cf.mark_ready_for_writes(nullptr); - auto& vt = virtual_tables[schema->id()]; - cf.set_virtual_reader(vt->as_mutation_source()); - cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); }); + auto& db = dist_db.local(); + + // Add built-in virtual tables here. + co_await add_table(std::make_unique(dist_ss, dist_gossiper)); + co_await add_table(std::make_unique(db, dist_ss.local())); + co_await add_table(std::make_unique(dist_db)); + co_await add_table(std::make_unique(dist_ss.local())); + co_await add_table(std::make_unique(dist_db, dist_ss.local())); + co_await add_table(std::make_unique()); + co_await add_table(std::make_unique(cfg)); + co_await add_table(std::make_unique(dist_ss.local())); + co_await add_table(std::make_unique(dist_raft_gr)); + co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper)); + co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms)); + co_await add_table(std::make_unique(db, dist_ss.local())); + co_await add_table(std::make_unique(db, dist_ss.local())); + + db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local()))); + db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); + db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db))); + }); + + // Drop old physical system.large_* tables and register virtual + // replacements. Must run on shard 0 (uses cross-shard operations). + auto activate_large_data_virtual_tables = [&dist_db, &sys_ks, &dist_ss] () -> future<> { + // Drop old physical system.large_* tables. Their data is now + // served from SSTable metadata via virtual tables. + // TODO: In a follow-up, read existing large data entries from + // the old tables and populate per-sstable LargeDataRecords + // metadata via component_rewrite before dropping. + for (auto table_name : { + db::system_keyspace::LARGE_PARTITIONS, + db::system_keyspace::LARGE_ROWS, + db::system_keyspace::LARGE_CELLS}) { + try { + co_await replica::database::legacy_drop_table_on_all_shards( + dist_db, sys_ks, db::system_keyspace::NAME, table_name, false); + } catch (const replica::no_such_column_family&) { + // Already dropped (e.g. restart after feature was enabled). + } + } + + // Add virtual tables on all shards. + co_await smp::invoke_on_all([&dist_db, &sys_ks, &dist_ss] () -> future<> { + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + }); }; - // Add built-in virtual tables here. - co_await add_table(std::make_unique(dist_ss, dist_gossiper)); - co_await add_table(std::make_unique(db, ss)); - co_await add_table(std::make_unique(dist_db)); - co_await add_table(std::make_unique(ss)); - co_await add_table(std::make_unique(dist_db, ss)); - co_await add_table(std::make_unique()); - co_await add_table(std::make_unique(cfg)); - co_await add_table(std::make_unique(ss)); - co_await add_table(std::make_unique(dist_raft_gr)); - co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper)); - co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms)); - co_await add_table(std::make_unique(db, ss)); - co_await add_table(std::make_unique(db, ss)); - - db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local()))); - db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); - db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db))); + if (feat.large_data_virtual_tables) { + // Feature already enabled (e.g. test environment or restart after + // upgrade). Activate directly as a coroutine — no seastar::async + // context needed. + co_await activate_large_data_virtual_tables(); + } else { + // Feature not yet enabled. Register a callback that will fire + // when the feature is enabled during rolling upgrade. The callback + // runs inside seastar::async context (via feature_service::enable), + // so .get() is safe. + // + // The listener_registration must outlive the feature, so we store + // it in a static variable (process lifetime). This function is + // only called on shard 0, so the listener fires only on shard 0. + static gms::feature::listener_registration large_data_vt_listener; + large_data_vt_listener = feat.large_data_virtual_tables.when_enabled( + [activate_large_data_virtual_tables = std::move(activate_large_data_virtual_tables)] { + activate_large_data_virtual_tables().get(); + }); + } } virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique()) { diff --git a/db/virtual_tables.hh b/db/virtual_tables.hh index 94d9ec4ee3..7f1ebfc437 100644 --- a/db/virtual_tables.hh +++ b/db/virtual_tables.hh @@ -24,6 +24,7 @@ class tablet_allocator; } namespace gms { +class feature_service; class gossiper; } @@ -44,7 +45,8 @@ future<> initialize_virtual_tables( sharded&, sharded&, sharded&, - db::config&); + db::config&, + gms::feature_service&); class virtual_table; diff --git a/docs/dev/sstable-scylla-format.md b/docs/dev/sstable-scylla-format.md index d3bc598218..1b20c324a0 100644 --- a/docs/dev/sstable-scylla-format.md +++ b/docs/dev/sstable-scylla-format.md @@ -33,6 +33,7 @@ in individual sections | ext_timestamp_stats | schema | components_digests + | large_data_records `sharding_metadata` (tag 1): describes what token sub-ranges are included in this sstable. This is used, when loading the sstable, to determine which shard(s) @@ -80,6 +81,11 @@ all SSTable component files that are checksummed during write. Each entry maps a type (e.g., Data, Index, Filter, Statistics, etc.) to its CRC32 checksum. This allows verifying the integrity of individual component files. +`large_data_records` (tag 13): an `array` with the top-N individual large +data entries (partitions, rows, cells) found during the sstable write. Unlike `large_data_stats` +which only stores aggregate statistics, this records the actual keys and sizes so they survive +tablet/shard migration. + The [scylla sstable dump-scylla-metadata](https://github.com/scylladb/scylladb/blob/master/docs/operating-scylla/admin-tools/scylla-sstable.rst#dump-scylla-metadata) tool can be used to dump the scylla metadata in JSON format. @@ -203,3 +209,35 @@ in the statistics component, which lacks column names and other metadata. Unlike the full schema stored in the system schema tables, it is not intended to be comprehensive, but it contains enough information for tools like scylla-sstable to parse an sstable in a self-sufficient manner. + +## large_data_records subcomponent + + large_data_records = record_count large_data_record* + record_count = be32 + large_data_record = large_data_type partition_key clustering_key column_name value elements_count range_tombstones dead_rows + large_data_type = be32 // same enum as in large_data_stats + partition_key = string32 // binary serialized partition key (sstables::key::get_bytes()) + clustering_key = string32 // binary serialized clustering key (clustering_key_prefix::representation()), empty if N/A + column_name = string32 // column name as text, empty for partition/row entries + value = be64 // size in bytes (partition, row, or cell size depending on type) + elements_count = be64 // type-dependent element count (see below) + range_tombstones = be64 // number of range tombstones (partition_size records only, 0 otherwise) + dead_rows = be64 // number of dead rows (partition_size records only, 0 otherwise) + string32 = string32_size byte* + string32_size = be32 + +The large_data_records component holds individual top-N large data entries +(partitions, rows, cells) found during the sstable write. Unlike large_data_stats, +which only stores aggregate per-type statistics (max value, threshold, count above +threshold), large_data_records preserves the actual partition key, clustering key, +column name, and size for each above-threshold entry. This information is embedded +in the sstable file itself and therefore survives tablet/shard migration. + +The elements_count field carries a type-dependent element count: + +- For partition_size and rows_in_partition records: number of rows in the partition +- For cell_size and elements_in_collection records: number of elements in the collection (0 for non-collection cells) +- For row_size records: 0 + +The range_tombstones and dead_rows fields are meaningful only for +partition_size records and are zero for all other record types. diff --git a/docs/operating-scylla/admin-tools/scylla-sstable.rst b/docs/operating-scylla/admin-tools/scylla-sstable.rst index 173fdeb3a3..57c1921bb2 100644 --- a/docs/operating-scylla/admin-tools/scylla-sstable.rst +++ b/docs/operating-scylla/admin-tools/scylla-sstable.rst @@ -524,6 +524,7 @@ The content is dumped in JSON, using the following schema: "scylla_version": String "ext_timestamp_stats": {"$key": int64, ...} "sstable_identifier": String, // UUID + "large_data_records": [$LARGE_DATA_RECORD, ...] } $SHARDING_METADATA := { @@ -548,6 +549,17 @@ The content is dumped in JSON, using the following schema: "above_threshold": Uint } + $LARGE_DATA_RECORD := { + "type": String, // large_data_type name + "partition_key": String, // human-readable partition key (decoded from binary) + "clustering_key": String, // human-readable clustering key (decoded from binary), empty if N/A + "column_name": String, // column name, empty for partition/row entries + "value": Uint64, // size in bytes (partition, row, or cell size depending on type) + "elements_count": Uint64, // rows (partition_size, rows_in_partition) or collection elements (cell_size, elements_in_collection), 0 for row_size + "range_tombstones": Uint64, // range tombstones (partition_size records only, 0 otherwise) + "dead_rows": Uint64 // dead rows (partition_size records only, 0 otherwise) + } + dump-schema ^^^^^^^^^^^ diff --git a/docs/troubleshooting/large-partition-table.rst b/docs/troubleshooting/large-partition-table.rst index 7677f9fb0e..6d28a238dc 100644 --- a/docs/troubleshooting/large-partition-table.rst +++ b/docs/troubleshooting/large-partition-table.rst @@ -128,6 +128,79 @@ Expiring Data In order to prevent stale data from appearing, all rows in ``system.large_partitions`` table are inserted with Time To Live (TTL) equal to 30 days. + +.. _large-data-virtual-tables: + +Virtual Tables (``LARGE_DATA_VIRTUAL_TABLES`` Feature) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Starting with ScyllaDB 2026.2, the ``system.large_partitions``, +``system.large_rows``, and ``system.large_cells`` tables are implemented as +**virtual tables** that read directly from metadata stored inside each SSTable +rather than from separate physical system tables. + +How it works +"""""""""""" + +When an SSTable is written (during flush or compaction), the SSTable writer +records the top-N above-threshold large data entries into a +``LargeDataRecords`` component in the SSTable's ``.Scylla`` metadata file. +The virtual tables query this metadata from all live SSTables at read time, +so large data records automatically follow their SSTables when they are +migrated between shards or nodes (e.g. during tablet migration or repair). + +The maximum number of records stored per large data type per SSTable is +controlled by the ``compaction_large_data_records_per_sstable`` configuration +option (default: 10). + +Activation +"""""""""" + +The transition is gated by the ``LARGE_DATA_VIRTUAL_TABLES`` cluster feature +flag, which is automatically enabled once all nodes in the cluster are +upgraded to a version that supports it. + +When the feature is enabled on a node: + +1. The old physical ``system.large_partitions``, ``system.large_rows``, and + ``system.large_cells`` tables are dropped. +2. Virtual table replacements are registered in their place. +3. New SSTable writes stop populating the old physical tables. + +During a rolling upgrade, as long as any node in the cluster has not been +upgraded, the feature remains disabled and all nodes continue writing to +the old physical tables. This ensures safe rollback if the upgrade needs +to be reverted. + +Upgrade considerations +"""""""""""""""""""""" + +After the ``LARGE_DATA_VIRTUAL_TABLES`` feature is enabled, the virtual tables +derive their content from ``LargeDataRecords`` metadata stored in SSTables. +SSTables written by older versions do not contain this metadata. As a result, +**the virtual tables may appear empty until those older SSTables are rewritten** +by compaction. + +To populate the virtual tables promptly after upgrade, run one of the following +on each node: + +* **Upgrade SSTables compaction** (recommended -- rewrites SSTables without + waiting for normal compaction triggers): + + .. code-block:: console + + nodetool upgradesstables --include-all-sstables + +* **Major compaction** (rewrites all SSTables into a single set, but may + temporarily increase disk usage): + + .. code-block:: console + + nodetool compact + +Once the SSTables have been rewritten, the virtual tables will show the +current large data records. + .. include:: /troubleshooting/_common/ts-return.rst Additional Resources diff --git a/docs/troubleshooting/large-rows-large-cells-tables.rst b/docs/troubleshooting/large-rows-large-cells-tables.rst index f0c93175c3..a2beb356b4 100644 --- a/docs/troubleshooting/large-rows-large-cells-tables.rst +++ b/docs/troubleshooting/large-rows-large-cells-tables.rst @@ -170,4 +170,12 @@ Expiring Data ^^^^^^^^^^^^^ In order to prevent stale data from appearing, all rows in the ``system.large_rows`` and ``system.large_cells`` tables are inserted with Time To Live (TTL) equal to 30 days. +Virtual Tables +^^^^^^^^^^^^^^ + +Starting with ScyllaDB 2026.2, ``system.large_rows`` and ``system.large_cells`` +are implemented as virtual tables that read from SSTable metadata. See +:ref:`large-data-virtual-tables` for details on the +``LARGE_DATA_VIRTUAL_TABLES`` feature and upgrade considerations. + .. include:: /troubleshooting/_common/ts-return.rst diff --git a/gms/feature_service.hh b/gms/feature_service.hh index e7431ce548..9ef7e9e01d 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -181,6 +181,7 @@ public: gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv }; gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv }; gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv }; + gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv }; public: const std::unordered_map>& registered_features() const; diff --git a/keys/keys.hh b/keys/keys.hh index f46f70e0e9..2148aac33c 100644 --- a/keys/keys.hh +++ b/keys/keys.hh @@ -932,6 +932,13 @@ struct fmt::formatter : fmt::formatt } }; +// Convert a key (partition_key, clustering_key_prefix, etc.) to a human-readable +// string using its schema. +template +std::string key_to_str(const T& key, const schema& s) { + return fmt::to_string(key.with_schema(s)); +} + template<> struct appending_hash { template diff --git a/main.cc b/main.cc index 95f5990b33..d7bf653d35 100644 --- a/main.cc +++ b/main.cc @@ -1964,9 +1964,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); checkpoint(stop_signal, "initializing virtual tables"); - smp::invoke_on_all([&] { - return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg); - }).get(); + db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg, feature_service.local()).get(); // #293 - do not stop anything // engine().at_exit([&qp] { return qp.stop(); }); diff --git a/replica/database.cc b/replica/database.cc index 2472637eee..14bd15dc58 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -382,6 +382,7 @@ static auto configure_sstables_manager(const db::config& cfg, const database_con .memory_reclaim_threshold = cfg.components_memory_reclaim_threshold, .data_file_directories = cfg.data_file_directories(), .format = cfg.sstable_format, + .large_data_records_per_sstable = cfg.compaction_large_data_records_per_sstable, }; } diff --git a/schema/schema.cc b/schema/schema.cc index 461b37e8d7..de2c32ffdf 100644 --- a/schema/schema.cc +++ b/schema/schema.cc @@ -1306,9 +1306,20 @@ bool operator==(const column_definition& x, const column_definition& y) } // Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId +// +// The version suffix (e.g. "_v1") allows creating a table with a +// different UUID than the original (version 0). This is used when +// replacing a physical table with a virtual table of the same name: +// the virtual table uses version 1 so its UUID differs from the old +// physical table, allowing the old table to be properly dropped by +// UUID before the replacement is registered. table_id -generate_legacy_id(const sstring& ks_name, const sstring& cf_name) { - return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name)); +generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version) { + auto name = ks_name + cf_name; + if (version > 0) { + name += fmt::format("_v{}", version); + } + return table_id(utils::UUID_gen::get_name_UUID(name)); } schema_builder& schema_builder::set_compaction_strategy_options(std::map&& options) { diff --git a/schema/schema.hh b/schema/schema.hh index eac788c8c7..e3d70d393a 100644 --- a/schema/schema.hh +++ b/schema/schema.hh @@ -1075,7 +1075,7 @@ public: std::ostream& operator<<(std::ostream& os, const view_ptr& view); -table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name); +table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version = 0); // Thrown when attempted to access a schema-dependent object using diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 059364d96d..0054f17787 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -20,8 +20,10 @@ #include "utils/exceptions.hh" #include "db/large_data_handler.hh" #include "db/corrupt_data_handler.hh" +#include "keys/keys.hh" #include +#include #include #include @@ -626,6 +628,46 @@ private: large_data_stats_entry _cell_size_entry; large_data_stats_entry _elements_in_collection_entry; + // Bounded min-heaps for top-N large data records, one per large_data_type. + // Size-type heaps (partition_size, row_size, cell_size) compare by `value`; + // element-count-type heaps (rows_in_partition, elements_in_collection) + // compare by `elements_count`. + struct large_data_record_cmp_by_value { + bool operator()(const large_data_record& a, const large_data_record& b) const { + return a.value > b.value; // min-heap: smallest value on top + } + }; + struct large_data_record_cmp_by_elements { + bool operator()(const large_data_record& a, const large_data_record& b) const { + return a.elements_count > b.elements_count; // min-heap: smallest elements_count on top + } + }; + using ld_size_heap = std::priority_queue, large_data_record_cmp_by_value>; + using ld_elements_heap = std::priority_queue, large_data_record_cmp_by_elements>; + ld_size_heap _ld_partition_size_records; + ld_elements_heap _ld_rows_in_partition_records; + ld_size_heap _ld_row_size_records; + ld_size_heap _ld_cell_size_records; + ld_elements_heap _ld_elements_in_collection_records; + + // Insert a record into a bounded min-heap, keeping at most N entries. + // Uses the heap's own comparator to decide eviction: since the comparator + // defines a min-heap (smallest on top), comp(rec, top) is true when rec + // is "greater" than top in the heap's ordering, meaning it should replace it. + template + void insert_into_ld_heap(Heap& heap, large_data_record rec) { + auto max_records = _cfg.large_data_records_per_sstable; + if (max_records == 0) { + return; + } + if (heap.size() < max_records) { + heap.push(std::move(rec)); + } else if (typename Heap::value_compare{}(rec, heap.top())) { + heap.pop(); + heap.push(std::move(rec)); + } + } + void init_file_writers(); // Returns the closed writer @@ -1098,7 +1140,48 @@ void writer::maybe_record_large_partitions(const sstables::sstable& sst, const s row_count_entry.max_value = std::max(row_count_entry.max_value, rows); auto ret = _sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size, rows, range_rombstones, dead_rows).get(); size_entry.above_threshold += unsigned(bool(ret.size)); - row_count_entry.above_threshold += unsigned(bool(ret.rows)); + row_count_entry.above_threshold += unsigned(bool(ret.elements)); + + auto trace_log = [&] (large_data_type type) { + slogger.trace("Detected large partition: sstable={}, partition_key={}, record_type={}, partition_size={}, rows={}, range_tombstones={}, dead_rows={}", + _sst.component_basename(component_type::Data), + key_to_str(partition_key.to_partition_key(_schema), _schema), + type, + partition_size, + rows, + range_rombstones, + dead_rows); + }; + + // Populate top-N large data records into separate per-type heaps. + if (ret.size) { + trace_log(large_data_type::partition_size); + const auto& pk_bytes = partition_key.get_bytes(); + insert_into_ld_heap(_ld_partition_size_records, large_data_record{ + .type = large_data_type::partition_size, + .partition_key = disk_string{bytes(pk_bytes)}, + .clustering_key = disk_string{bytes()}, + .column_name = disk_string{bytes()}, + .value = partition_size, + .elements_count = rows, + .range_tombstones = range_rombstones, + .dead_rows = dead_rows, + }); + } + if (ret.elements) { + trace_log(large_data_type::rows_in_partition); + const auto& pk_bytes = partition_key.get_bytes(); + insert_into_ld_heap(_ld_rows_in_partition_records, large_data_record{ + .type = large_data_type::rows_in_partition, + .partition_key = disk_string{bytes(pk_bytes)}, + .clustering_key = disk_string{bytes()}, + .column_name = disk_string{bytes()}, + .value = partition_size, + .elements_count = rows, + .range_tombstones = range_rombstones, + .dead_rows = dead_rows, + }); + } } void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, @@ -1109,6 +1192,22 @@ void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstable } if (_sst.get_large_data_handler().maybe_record_large_rows(sst, partition_key, clustering_key, row_size).get()) { entry.above_threshold++; + + slogger.trace("Detected large row: sstable={}, partition_key={}, clustering_key={}, record_type={}, row_size={}", + _sst.component_basename(component_type::Data), + key_to_str(partition_key.to_partition_key(_schema), _schema), + clustering_key ? key_to_str(*clustering_key, _schema) : "", + large_data_type::row_size, + row_size); + const auto& pk_bytes = partition_key.get_bytes(); + auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes(); + insert_into_ld_heap(_ld_row_size_records, large_data_record{ + .type = large_data_type::row_size, + .partition_key = disk_string{bytes(pk_bytes)}, + .clustering_key = disk_string{std::move(ck_bytes)}, + .column_name = disk_string{bytes()}, + .value = row_size, + }); }; } @@ -1122,14 +1221,54 @@ void writer::maybe_record_large_cells(const sstables::sstable& sst, const sstabl if (collection_elements_entry.max_value < collection_elements) { collection_elements_entry.max_value = collection_elements; } - if (_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size, collection_elements).get()) { - if (cell_size > cell_size_entry.threshold) { - cell_size_entry.above_threshold++; - } - if (collection_elements > collection_elements_entry.threshold) { - collection_elements_entry.above_threshold++; - } + auto ret = _sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size, collection_elements).get(); + if (ret.size) { + cell_size_entry.above_threshold++; + } + if (ret.elements) { + collection_elements_entry.above_threshold++; + } + + auto trace_log = [&] (large_data_type type) { + slogger.trace("Detected large cell: sstable={}, partition_key={}, clustering_key={}, column={}, record_type={}, cell_size={}, collection_elements={}", + _sst.component_basename(component_type::Data), + key_to_str(partition_key.to_partition_key(_schema), _schema), + clustering_key ? key_to_str(*clustering_key, _schema) : "", + cdef.name_as_text(), + type, + cell_size, + collection_elements); }; + + // Populate top-N large data records into separate per-type heaps. + if (ret.size) { + trace_log(large_data_type::cell_size); + const auto& pk_bytes = partition_key.get_bytes(); + auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes(); + auto col_name = cdef.name_as_text(); + insert_into_ld_heap(_ld_cell_size_records, large_data_record{ + .type = large_data_type::cell_size, + .partition_key = disk_string{bytes(pk_bytes)}, + .clustering_key = disk_string{std::move(ck_bytes)}, + .column_name = disk_string{to_bytes(col_name)}, + .value = cell_size, + .elements_count = collection_elements, + }); + } + if (ret.elements) { + trace_log(large_data_type::elements_in_collection); + const auto& pk_bytes = partition_key.get_bytes(); + auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes(); + auto col_name = cdef.name_as_text(); + insert_into_ld_heap(_ld_elements_in_collection_records, large_data_record{ + .type = large_data_type::elements_in_collection, + .partition_key = disk_string{bytes(pk_bytes)}, + .clustering_key = disk_string{std::move(ck_bytes)}, + .column_name = disk_string{to_bytes(col_name)}, + .value = cell_size, + .elements_count = collection_elements, + }); + } } void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, @@ -1719,7 +1858,32 @@ void writer::consume_end_of_stream() { std::optional ts_stats(scylla_metadata::ext_timestamp_stats{ .map = _collector.get_ext_timestamp_stats() }); - _sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats)); + // Drain all per-type min-heaps into a single large_data_records array. + std::optional ld_records; + { + utils::chunked_vector records; + auto drain_size_heap = [&records](ld_size_heap& heap) { + while (!heap.empty()) { + records.push_back(std::move(const_cast(heap.top()))); + heap.pop(); + } + }; + auto drain_elements_heap = [&records](ld_elements_heap& heap) { + while (!heap.empty()) { + records.push_back(std::move(const_cast(heap.top()))); + heap.pop(); + } + }; + drain_size_heap(_ld_partition_size_records); + drain_elements_heap(_ld_rows_in_partition_records); + drain_size_heap(_ld_row_size_records); + drain_size_heap(_ld_cell_size_records); + drain_elements_heap(_ld_elements_in_collection_records); + if (!records.empty()) { + ld_records = scylla_metadata::large_data_records{.elements = std::move(records)}; + } + } + _sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats), std::move(ld_records)); if (!_cfg.leave_unsealed) { _sst.seal_sstable(_cfg.backup).get(); } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index f26420382b..b8553a540e 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1664,6 +1664,10 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept { if (ld_stats) { _large_data_stats.emplace(*ld_stats); } + auto* ld_records = _components->scylla_metadata->data.get(); + if (ld_records) { + _large_data_records.emplace(*ld_records); + } auto* origin = _components->scylla_metadata->data.get(); if (origin) { _origin = sstring(to_string_view(bytes_view(origin->value))); @@ -2295,7 +2299,8 @@ static sstable_column_kind to_sstable_column_kind(column_kind k) { void sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier, - std::optional ld_stats, std::optional ts_stats) { + std::optional ld_stats, std::optional ts_stats, + std::optional ld_records) { auto&& first_key = get_first_decorated_key(); auto&& last_key = get_last_decorated_key(); @@ -2318,6 +2323,9 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier, if (ld_stats) { _components->scylla_metadata->data.set(std::move(*ld_stats)); } + if (ld_records) { + _components->scylla_metadata->data.set(std::move(*ld_records)); + } if (!_origin.empty()) { scylla_metadata::sstable_origin o; o.value = bytes(to_bytes_view(std::string_view(_origin))); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index fe499ca7dd..5c02a5ea7c 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -123,6 +123,7 @@ struct sstable_writer_config { size_t summary_byte_cost; sstring origin; bool correct_pi_block_width = true; + uint32_t large_data_records_per_sstable = 10; private: explicit sstable_writer_config() {} @@ -628,6 +629,7 @@ private: // It can be disengaged normally when loading legacy sstables that do not have this // information in their scylla metadata. std::optional _large_data_stats; + std::optional _large_data_records; sstring _origin; std::optional _ext_timestamp_stats; optimized_optional _sstable_identifier; @@ -708,7 +710,8 @@ private: void write_scylla_metadata(shard_id shard, run_identifier identifier, std::optional ld_stats, - std::optional ts_stats); + std::optional ts_stats, + std::optional ld_records = std::nullopt); future<> read_filter(sstable_open_config cfg = {}); @@ -1092,6 +1095,12 @@ public: // the map. Otherwise, return a disengaged optional. std::optional get_large_data_stat(large_data_type t) const noexcept; + // Return the large_data_records stored in scylla_metadata, if present. + // Absent on legacy SSTables that predate LargeDataRecords support. + const std::optional& get_large_data_records() const noexcept { + return _large_data_records; + } + // Return the extended timestamp statistics map. // Some or all entries may be missing if not present in scylla_metadata scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept; diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 05657d99a7..d677391d48 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -226,6 +226,7 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const { cfg.summary_byte_cost = summary_byte_cost(_config.sstable_summary_ratio); cfg.origin = std::move(origin); + cfg.large_data_records_per_sstable = _config.large_data_records_per_sstable(); return cfg; } diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index be0fde9459..6b161110fc 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -114,6 +114,7 @@ public: utils::updateable_value memory_reclaim_threshold = utils::updateable_value(0.2); const std::vector& data_file_directories; utils::updateable_value format = utils::updateable_value(fmt::to_string(sstable_version_types::me)); + utils::updateable_value large_data_records_per_sstable = utils::updateable_value(10); }; private: diff --git a/sstables/types.hh b/sstables/types.hh index 9c14b64d22..fad5d32779 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -548,6 +548,7 @@ enum class scylla_metadata_type : uint32_t { SSTableIdentifier = 10, Schema = 11, ComponentsDigests = 12, + LargeDataRecords = 13, }; // UUID is used for uniqueness across nodes, such that an imported sstable @@ -595,6 +596,31 @@ struct large_data_stats_entry { auto describe_type(sstable_version_types v, Describer f) { return f(max_value, threshold, above_threshold); } }; +// A single top-N large data record stored in the SSTable's scylla metadata. +// Records are written by the sstable writer and survive tablet/shard migration +// because they live in the SSTable file itself rather than in a CQL system table. +struct large_data_record { + large_data_type type; + disk_string partition_key; // binary serialized partition key (sstables::key::get_bytes()) + disk_string clustering_key; // binary serialized CK (clustering_key_prefix::representation()), empty if N/A + disk_string column_name; // column name as text, empty for partition/row entries + uint64_t value; // size in bytes (partition, row, or cell size depending on type) + // Type-dependent element count: + // partition_size, rows_in_partition: number of rows in the partition + // cell_size, elements_in_collection: number of elements in the collection (0 for non-collection cells) + // row_size: 0 + uint64_t elements_count; + // Partition-level auxiliary fields (meaningful only for partition_size records, 0 otherwise): + uint64_t range_tombstones; // number of range tombstones in the partition + uint64_t dead_rows; // number of dead rows in the partition + + template + auto describe_type(sstable_version_types v, Describer f) { + return f(type, partition_key, clustering_key, column_name, value, + elements_count, range_tombstones, dead_rows); + } +}; + // Types of extended timestamp statistics. // // Note: For extensibility, never reuse an identifier, @@ -639,6 +665,7 @@ struct sstable_schema_type { struct scylla_metadata { using extension_attributes = disk_hash, disk_string>; using large_data_stats = disk_hash; + using large_data_records = disk_array; using sstable_origin = disk_string; using scylla_build_id = disk_string; using scylla_version = disk_string; @@ -659,7 +686,8 @@ struct scylla_metadata { disk_tagged_union_member, disk_tagged_union_member, disk_tagged_union_member, - disk_tagged_union_member + disk_tagged_union_member, + disk_tagged_union_member > data; std::optional digest; @@ -898,3 +926,24 @@ struct fmt::formatter { dt.marked_for_delete_at, dt.local_deletion_time); } }; + +template <> +struct fmt::formatter : fmt::formatter { + template + auto format(sstables::large_data_type type, FormatContext& ctx) const { + using enum sstables::large_data_type; + switch (type) { + case partition_size: + return formatter::format("partition_size", ctx); + case row_size: + return formatter::format("row_size", ctx); + case cell_size: + return formatter::format("cell_size", ctx); + case rows_in_partition: + return formatter::format("rows_in_partition", ctx); + case elements_in_collection: + return formatter::format("elements_in_collection", ctx); + } + return formatter::format("unknown", ctx); + } +}; diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 04aa0772cb..88e2fe433d 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -90,6 +90,9 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { // Check the only the large row is added to system.large_rows. BOOST_REQUIRE_EQUAL(rows.size(), 1); auto row0 = rows[0]; + // The result has 3 columns: partition_key, row_size, and table_name + // (CQL adds the filtered partition key column to the result set when + // using ALLOW FILTERING with a partial partition key restriction). BOOST_REQUIRE_EQUAL(row0.size(), 3); BOOST_REQUIRE_EQUAL(to_bytes(*row0[0]), "44"); BOOST_REQUIRE_EQUAL(to_bytes(*row0[2]), "tbl"); @@ -118,10 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { e.execute_cql("delete from tbl where a = 44;").get(); - // In order to guarantee that system.large_rows, system.large_cells and system.large_partitions have been updated, we have to + // In order to guarantee that system.large_rows, system.large_cells and + // system.large_partitions are empty, we need to: // * flush, so that a tombstone for the above delete is created. // * do a major compaction, so that the tombstone is combined with the old entry, - // and the old sstable is deleted. + // and the old sstable (which holds the large data records in its metadata) is deleted. flush(e); e.db().invoke_on_all([] (replica::database& dbi) { return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { @@ -165,6 +169,71 @@ SEASTAR_THREAD_TEST_CASE(test_large_row_count_warning) { }, cfg).get(); } +// Test that when a partition exceeds both the size threshold and the row +// count threshold, system.large_partitions still returns a single row +// (not two rows with the same clustering key). +SEASTAR_THREAD_TEST_CASE(test_large_partitions_dual_threshold) { + auto cfg = make_shared(); + // Set very low thresholds so that a single partition with a handful + // of rows containing modest data exceeds both size and row-count + // thresholds simultaneously. + cfg->compaction_large_partition_warning_threshold_mb(1); + cfg->compaction_rows_count_warning_threshold(10); + do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create table tbl (a int, b int, c text, primary key (a, b))").get(); + // Insert enough rows with enough data to exceed 1 MB partition size + // AND more than 10 rows. + sstring blob(128 * 1024, 'x'); // 128 KB per row + for (int i = 0; i < 11; ++i) { + e.execute_cql(format("insert into tbl (a, b, c) values (42, {}, '{}');", i, blob)).get(); + } + flush(e); + + // There must be exactly one row in system.large_partitions for + // this table. Before the fix, two rows with the same clustering + // key would have been emitted. + assert_that(e.execute_cql( + "select partition_key, rows from system.large_partitions " + "where table_name = 'tbl' allow filtering;").get()) + .is_rows() + .with_size(1); + + return make_ready_future<>(); + }, cfg).get(); +} + +// Test that when a collection cell exceeds both the cell size threshold +// and the collection element count threshold, system.large_cells still +// returns a single row (not two rows with the same clustering key). +SEASTAR_THREAD_TEST_CASE(test_large_cells_dual_threshold) { + auto cfg = make_shared(); + // Set very low thresholds so that a collection with modest elements + // exceeds both size and element-count thresholds simultaneously. + cfg->compaction_large_cell_warning_threshold_mb(1); + cfg->compaction_collection_elements_count_warning_threshold(10); + do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create table tbl (a int, b list, primary key (a))").get(); + // Insert 128 KB blobs, 11 times -> ~1.4 MB total, exceeding 1 MB + // threshold. Also exceeds 10 element threshold. + sstring blob(128 * 1024, 'x'); + for (int i = 0; i < 11; ++i) { + e.execute_cql("update tbl set b = ['" + blob + "'] + b where a = 42;").get(); + } + flush(e); + + // There must be exactly one row in system.large_cells for this + // table. Before the fix, two rows with the same clustering key + // would have been emitted. + assert_that(e.execute_cql( + "select partition_key, column_name from system.large_cells " + "where table_name = 'tbl' allow filtering;").get()) + .is_rows() + .with_size(1); + + return make_ready_future<>(); + }, cfg).get(); +} + SEASTAR_TEST_CASE(test_insert_large_collection_values) { return do_with_cql_env([] (cql_test_env& e) { return seastar::async([&e] { diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index fbe81269f6..b5aa442736 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -41,6 +41,7 @@ #include "test/lib/simple_schema.hh" #include "test/lib/exception_utils.hh" #include "db/large_data_handler.hh" +#include "db/config.hh" #include "readers/combined.hh" #include @@ -5091,70 +5092,34 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) { } namespace { -struct large_row_handler : public db::large_data_handler { - using callback_t = std::function; - callback_t callback; - - large_row_handler(uint64_t large_rows_threshold, uint64_t rows_count_threshold, uint64_t cell_threshold_bytes, uint64_t collection_elements_threshold, callback_t callback) - : large_data_handler(std::numeric_limits::max(), large_rows_threshold, cell_threshold_bytes, - rows_count_threshold, collection_elements_threshold) - , callback(std::move(callback)) { +// A large_data_handler with configurable thresholds for testing. +// After refactoring, the handler only does threshold comparison + logging; +// actual large data recording goes into the SSTable's LargeDataRecords metadata. +struct large_data_test_handler : public db::large_data_handler { + large_data_test_handler(uint64_t partition_threshold_bytes, uint64_t large_rows_threshold, + uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, + uint64_t collection_elements_threshold) + : large_data_handler(partition_threshold_bytes, large_rows_threshold, cell_threshold_bytes, + rows_count_threshold, collection_elements_threshold) { start(); } - - virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, clustering_key, row_size, 0, 0, nullptr, 0, 0); - return make_ready_future<>(); - } - - virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, clustering_key, 0, 0, 0, &cdef, cell_size, collection_elements); - return make_ready_future<>(); - } - - virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, - uint64_t partition_size, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows_count) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, nullptr, rows_count, range_tombstones_count, dead_rows_count, nullptr, 0, 0); - return make_ready_future<>(); - } - - virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view) const override { - return make_ready_future<>(); - } - virtual future<> update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const override { - return make_ready_future<>(); - } +protected: + future<> record_large_cells(const sstables::sstable&, const sstables::key&, + const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { return make_ready_future<>(); } + future<> record_large_rows(const sstables::sstable&, const sstables::key&, + const clustering_key_prefix*, uint64_t) const override { return make_ready_future<>(); } + future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { return make_ready_future<>(); } + future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { return make_ready_future<>(); } + future<> record_large_partitions(const sstables::sstable&, const sstables::key&, + uint64_t, uint64_t, uint64_t, uint64_t) const override { return make_ready_future<>(); } }; } static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk, std::vector expected, uint64_t threshold, sstables::sstable_version_types version) { - unsigned i = 0; - auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s))); - BOOST_REQUIRE_LT(i, expected.size()); - BOOST_REQUIRE_GT(row_size, threshold); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - if (clustering_key) { - BOOST_REQUIRE(expected[i]->equal(s, *clustering_key)); - } else { - BOOST_REQUIRE_EQUAL(expected[i], nullptr); - } - ++i; - }; - - large_row_handler handler(threshold, std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), f); + large_data_test_handler handler(std::numeric_limits::max(), threshold, + std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(s, version); @@ -5164,14 +5129,50 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r // depends on the encoding statistics (because of variable-length encoding). The original values // were chosen with the default-constructed encoding_stats, so let's keep it that way. sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); - BOOST_REQUIRE_EQUAL(i, expected.size()); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); + + auto& records_opt = sst->get_large_data_records(); + // Collect row_size records (order is not guaranteed, so just count and validate). + unsigned row_size_count = 0; + unsigned static_row_count = 0; + unsigned clustering_row_count = 0; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type != large_data_type::row_size) { + continue; + } + BOOST_REQUIRE_GT(rec.value, threshold); + auto rec_ck_str = sstring(reinterpret_cast(rec.clustering_key.value.data()), rec.clustering_key.value.size()); + if (rec_ck_str.empty()) { + ++static_row_count; + } else { + ++clustering_row_count; + } + ++row_size_count; + } + } + // Count expected static rows (nullptr entries) and clustering rows. + unsigned expected_static = 0; + unsigned expected_clustering = 0; + for (auto* ck : expected) { + if (ck) { + ++expected_clustering; + } else { + ++expected_static; + } + } + BOOST_REQUIRE_EQUAL(row_size_count, expected.size()); + BOOST_REQUIRE_EQUAL(static_row_count, expected_static); + BOOST_REQUIRE_EQUAL(clustering_row_count, expected_clustering); }, { &handler }).get(); } SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation partition = s.new_mutation("pv"); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation partition(s.schema(), s.make_pkey()); const partition_key& pk = partition.key(); s.add_static_row(partition, "foo bar zed"); @@ -5189,26 +5190,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) { static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk, std::vector expected, uint64_t threshold, sstables::sstable_version_types version) { - unsigned i = 0; - auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_TEST_MESSAGE(format("i={} ck={} cell_size={} threshold={}", i, clustering_key ? format("{}", *clustering_key) : "null", cell_size, threshold)); - BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s))); - BOOST_REQUIRE_LT(i, expected.size()); - BOOST_REQUIRE_GT(cell_size, threshold); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - if (clustering_key) { - BOOST_REQUIRE(expected[i]->equal(s, *clustering_key)); - } else { - BOOST_REQUIRE_EQUAL(expected[i], nullptr); - } - ++i; - }; - - large_row_handler handler(std::numeric_limits::max(), std::numeric_limits::max(), threshold, std::numeric_limits::max(), f); + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), threshold, + std::numeric_limits::max(), std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(s, version); @@ -5218,14 +5202,50 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, // depends on the encoding statistics (because of variable-length encoding). The original values // were chosen with the default-constructed encoding_stats, so let's keep it that way. sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); - BOOST_REQUIRE_EQUAL(i, expected.size()); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); + + auto& records_opt = sst->get_large_data_records(); + // Collect cell_size records (order is not guaranteed, so just count and validate). + unsigned cell_size_count = 0; + unsigned static_cell_count = 0; + unsigned clustering_cell_count = 0; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type != large_data_type::cell_size) { + continue; + } + BOOST_REQUIRE_GT(rec.value, threshold); + auto rec_ck_str = sstring(reinterpret_cast(rec.clustering_key.value.data()), rec.clustering_key.value.size()); + if (rec_ck_str.empty()) { + ++static_cell_count; + } else { + ++clustering_cell_count; + } + ++cell_size_count; + } + } + // Count expected static cells (nullptr entries) and clustering cells. + unsigned expected_static = 0; + unsigned expected_clustering = 0; + for (auto* ck : expected) { + if (ck) { + ++expected_clustering; + } else { + ++expected_static; + } + } + BOOST_REQUIRE_EQUAL(cell_size_count, expected.size()); + BOOST_REQUIRE_EQUAL(static_cell_count, expected_static); + BOOST_REQUIRE_EQUAL(clustering_cell_count, expected_clustering); }, { &handler }).get(); } SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation partition = s.new_mutation("pv"); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation partition(s.schema(), s.make_pkey()); const partition_key& pk = partition.key(); s.add_static_row(partition, "foo bar zed"); @@ -5244,8 +5264,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) { static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); sstring sv; for (auto idx = 0; idx < rows - 1; idx++) { sv += "foo "; @@ -5258,27 +5278,27 @@ static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uin schema_ptr sc = s.schema(); auto mt = make_memtable(sc, {p}); - bool logged = false; - auto f = [&logged, &pk, &threshold, &rows, &range_tombstones](const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(rows_count, threshold); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - // Inserting one range tombstone creates two range tombstone marker rows - BOOST_REQUIRE_EQUAL(rows_count, rows + range_tombstones * 2); - BOOST_REQUIRE_EQUAL(range_tombstones_count, range_tombstones * 2); - logged = true; - }; - - large_row_handler handler(std::numeric_limits::max(), threshold, std::numeric_limits::max(), std::numeric_limits::max(), f); + // rows_count_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + threshold, std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5306,8 +5326,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) { static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); sstring sv; int live_rows = 0; int expected_dead_rows = 0; @@ -5354,7 +5374,6 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, } auto ck_start = ck; auto rt_start = bound_view(ck_start, tests::random::get_bool() ? bound_kind::incl_start : bound_kind::excl_start); - auto sv_end = sv; auto rt_size = tests::random::get_int(1, 10); for (auto i = 0; i < rt_size; i++) { sv += "X"; @@ -5373,25 +5392,27 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, auto mt = make_lw_shared(sc); mt->apply(p); - bool logged = false; - auto f = [&] (const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(rows_count, threshold); - BOOST_REQUIRE_EQUAL(rows_count, rows); - BOOST_REQUIRE_EQUAL(dead_rows, expected_dead_rows + expected_expired_rows + expected_rows_with_dead_cell); - BOOST_REQUIRE_EQUAL(range_tombstones, expected_range_tombstones); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - logged = true; - }; - - large_row_handler handler(std::numeric_limits::max(), threshold, std::numeric_limits::max(), std::numeric_limits::max(), f); + // rows_count_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + threshold, std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5415,8 +5436,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) { static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes); tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); std::map kv_map; for (auto i = 0; i < elements; i++) { kv_map[to_bytes(format("key{}", i))] = to_bytes(format("val{}", i)); @@ -5425,25 +5446,28 @@ static void test_sstable_too_many_collection_elements_f(int elements, uint64_t t schema_ptr sc = s.schema(); auto mt = make_memtable(sc, {p}); - bool logged = false; - auto f = [&logged, &pk, &threshold](const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(collection_elements, threshold); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - logged = true; - }; - BOOST_TEST_MESSAGE(format("elements={} threshold={} expected={}", elements, threshold, expected)); - large_row_handler handler(std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), threshold, f); + // collection_elements_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max(), threshold); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::elements_in_collection && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5463,6 +5487,220 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_too_many_collection_elements) { } } +// Handler for testing LargeDataRecords population in SSTable metadata. +// large_data_test_handler already serves this purpose; keep this alias for +// test readability / backward compatibility. +namespace { +using large_data_records_handler = large_data_test_handler; +} + +// Test that writing an SSTable with data exceeding large data thresholds +// populates LargeDataRecords in the SSTable's scylla metadata, and that +// the records survive a round-trip (write + open_data). +SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) { + // Use low thresholds so that our small test data exceeds them. + // partition_threshold=1 byte, row_threshold=1 byte, cell_threshold=1 byte, + // rows_count_threshold=MAX (not tested here), collection_elements_threshold=MAX + large_data_records_handler handler(1, 1, 1, + std::numeric_limits::max(), std::numeric_limits::max()); + + for (auto version : writable_sstable_versions) { + sstables::test_env::do_with_async([&] (auto& env) { + simple_schema ss; + auto s = ss.schema(); + + // Create a mutation with a clustering row whose serialized cell value + // exceeds the 1-byte thresholds, so partition_size, row_size, and + // cell_size records are all generated. + // Use make_pkey() (no argument) to generate a key on this shard. + auto pk = ss.make_pkey(); + mutation m(s, pk); + auto ck = ss.make_ckey("ck1"); + ss.add_row(m, ck, "a_value_that_is_larger_than_one_byte"); + + auto mt = make_memtable(s, {m}); + auto sst = env.make_sstable(s, version); + sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()), + 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); + sst->open_data().get(); + + auto& records_opt = sst->get_large_data_records(); + BOOST_REQUIRE(records_opt.has_value()); + auto& records = records_opt->elements; + BOOST_REQUIRE(!records.empty()); + + // We expect records for partition_size, row_size, and cell_size, + // since all exceed the 1-byte thresholds. + bool found_partition_size = false; + bool found_row_size = false; + bool found_cell_size = false; + + for (auto& rec : records) { + BOOST_TEST_MESSAGE(format("LargeDataRecord: type={}, pk='{}', ck='{}', col='{}', value={}", + static_cast(rec.type), + to_hex(rec.partition_key.value), + to_hex(rec.clustering_key.value), + sstring(reinterpret_cast(rec.column_name.value.data()), rec.column_name.value.size()), + rec.value)); + + BOOST_REQUIRE(rec.value > 0); + + // Verify partition_key field is non-empty (binary serialized key) + BOOST_REQUIRE(!rec.partition_key.value.empty()); + // Verify binary partition key round-trips to the original partition key + auto rec_pk = sstables::key_view(rec.partition_key.value).to_partition_key(*s); + BOOST_REQUIRE(rec_pk.equal(*s, pk.key())); + + switch (rec.type) { + case large_data_type::partition_size: + found_partition_size = true; + // clustering_key and column_name should be empty for partition-level entries + BOOST_REQUIRE(rec.clustering_key.value.empty()); + BOOST_REQUIRE(rec.column_name.value.empty()); + // Verify partition-level auxiliary fields: + // elements_count = rows in partition (1 clustering row + 1 implicit static row) + BOOST_REQUIRE_EQUAL(rec.elements_count, 2u); + BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u); + BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u); + break; + case large_data_type::row_size: + found_row_size = true; + // Static rows have an empty clustering key; clustering rows have a non-empty one. + // Both are valid row_size records. + // Verify binary clustering key round-trips for non-static rows. + if (!rec.clustering_key.value.empty()) { + auto rec_ck = clustering_key_prefix::from_bytes(rec.clustering_key.value); + BOOST_REQUIRE(rec_ck.equal(*s, ck)); + } + BOOST_REQUIRE(rec.column_name.value.empty()); + // Non-partition records should have zero auxiliary fields + BOOST_REQUIRE_EQUAL(rec.elements_count, 0u); + BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u); + BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u); + break; + case large_data_type::cell_size: + found_cell_size = true; + BOOST_REQUIRE(!rec.column_name.value.empty()); + // For cell_size records, elements_count may be non-zero + // (collection element count) but range_tombstones/dead_rows + // should be zero. + BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u); + BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u); + break; + default: + break; + } + } + + BOOST_REQUIRE_MESSAGE(found_partition_size, "Expected a partition_size record"); + BOOST_REQUIRE_MESSAGE(found_row_size, "Expected a row_size record"); + BOOST_REQUIRE_MESSAGE(found_cell_size, "Expected a cell_size record"); + }, { &handler }).get(); + } +} + +// Test that the bounded top-N heap correctly keeps only the largest entries. +SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) { + // Row threshold = 1 byte so every row triggers recording. + // partition threshold = MAX to avoid partition records (simplifies counting). + large_data_records_handler handler( + std::numeric_limits::max(), // partition threshold + 1, // row threshold + std::numeric_limits::max(), // cell threshold + std::numeric_limits::max(), // rows count threshold + std::numeric_limits::max() // collection elements threshold + ); + + for (auto version : writable_sstable_versions) { + sstables::test_env::do_with_async([&] (auto& env) { + // Set large_data_records_per_sstable to 3 so we can test bounding. + env.db_config().compaction_large_data_records_per_sstable(3); + + simple_schema ss; + auto s = ss.schema(); + + // Create 6 partitions, each with one row of increasing size. + // Since each partition has exactly one row, we get 6 row_size records + // competing for 3 slots. + // Use make_pkeys() to generate shard-local keys. + auto pkeys = ss.make_pkeys(6); + utils::chunked_vector muts; + for (int i = 0; i < 6; i++) { + auto& pk = pkeys[i]; + mutation m(s, pk); + auto ck = ss.make_ckey(format("ck{}", i)); + // Row value size increases with i. Pad with 'x' characters. + sstring val(100 * (i + 1), 'x'); + ss.add_row(m, ck, val); + muts.push_back(std::move(m)); + } + + auto mt = make_memtable(s, muts); + auto sst = env.make_sstable(s, version); + sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()), + 6, s, env.manager().configure_writer("test"), encoding_stats{}).get(); + sst->open_data().get(); + + auto& records_opt = sst->get_large_data_records(); + BOOST_REQUIRE(records_opt.has_value()); + + // Count row_size records. + size_t row_size_count = 0; + uint64_t min_row_value = std::numeric_limits::max(); + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::row_size) { + row_size_count++; + min_row_value = std::min(min_row_value, rec.value); + } + } + + // We wrote 6 rows but limited to top-3, so we should have exactly 3 row_size records. + BOOST_REQUIRE_EQUAL(row_size_count, 3u); + + // The smallest of the top-3 should be from one of the larger rows (index 3, 4, or 5). + // Row at index 3 has value size ~ 400 bytes of 'x' chars. The actual row_size will be + // larger due to serialization overhead, but should be well above 100 * 1 = 100 bytes + // (the smallest row). This verifies the heap evicted smaller entries. + // We just check that the minimum value in the top-3 is larger than what the + // smallest row (index 0, ~100 bytes of payload) would produce. + BOOST_REQUIRE_GT(min_row_value, 100u); + }, { &handler }).get(); + } +} + +// Test that an SSTable with data below all thresholds produces no LargeDataRecords. +SEASTAR_THREAD_TEST_CASE(test_large_data_records_none_when_below_threshold) { + // All thresholds at maximum => nothing should be recorded. + large_data_records_handler handler( + std::numeric_limits::max(), + std::numeric_limits::max(), + std::numeric_limits::max(), + std::numeric_limits::max(), + std::numeric_limits::max() + ); + + for (auto version : writable_sstable_versions) { + sstables::test_env::do_with_async([&] (auto& env) { + simple_schema ss; + auto s = ss.schema(); + + auto pk = ss.make_pkey(); + mutation m(s, pk); + ss.add_row(m, ss.make_ckey("ck1"), "small_value"); + + auto mt = make_memtable(s, {m}); + auto sst = env.make_sstable(s, version); + sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()), + 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); + sst->open_data().get(); + + auto& records_opt = sst->get_large_data_records(); + // With all thresholds at max, no records should be written. + BOOST_REQUIRE(!records_opt.has_value()); + }, { &handler }).get(); + } +} + // The following test runs on test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection // It was created using Scylla 3.0.x using the following CQL statements: // diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index c8f93be45f..0446b407f7 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1008,9 +1008,7 @@ private: _mnotifier.local().unregister_listener(&_ss.local()).get(); }); - smp::invoke_on_all([&] { - return db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg); - }).get(); + db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg, _feature_service.local()).get(); _qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) { qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client, diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 5a38204c86..eaca0cd546 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -260,6 +260,7 @@ test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, ss .memory_reclaim_threshold = db_config->components_memory_reclaim_threshold, .data_file_directories = db_config->data_file_directories(), .format = db_config->sstable_format, + .large_data_records_per_sstable = db_config->compaction_large_data_records_per_sstable, }, feature_service, cache_tracker, diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index c8c3380d4a..1674dd81ab 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -30,6 +30,7 @@ #include "db/corrupt_data_handler.hh" #include "db/object_storage_endpoint_param.hh" #include "gms/feature_service.hh" +#include "keys/keys.hh" #include "reader_concurrency_semaphore.hh" #include "readers/combined.hh" #include "readers/filtering.hh" @@ -1338,17 +1339,7 @@ const char* to_string(sstables::scylla_metadata_type t) { case sstables::scylla_metadata_type::SSTableIdentifier: return "sstable_identifier"; case sstables::scylla_metadata_type::Schema: return "schema"; case sstables::scylla_metadata_type::ComponentsDigests: return "components_digests"; - } - std::abort(); -} - -const char* to_string(sstables::large_data_type t) { - switch (t) { - case sstables::large_data_type::partition_size: return "partition_size"; - case sstables::large_data_type::row_size: return "row_size"; - case sstables::large_data_type::cell_size: return "cell_size"; - case sstables::large_data_type::rows_in_partition: return "rows_in_partition"; - case sstables::large_data_type::elements_in_collection: return "elements_in_collection"; + case sstables::scylla_metadata_type::LargeDataRecords: return "large_data_records"; } std::abort(); } @@ -1363,12 +1354,13 @@ const char* to_string(sstables::ext_timestamp_stats_type t) { class scylla_metadata_visitor { json_writer& _writer; + schema_ptr _schema; dht::token as_token(const sstables::disk_string& ds) const { return dht::token(dht::token::kind::key, bytes_view(ds)); } public: - scylla_metadata_visitor(json_writer& writer) : _writer(writer) { } + scylla_metadata_visitor(json_writer& writer, schema_ptr schema = nullptr) : _writer(writer), _schema(std::move(schema)) { } void operator()(const sstables::sharding_metadata& val) const { _writer.StartArray(); @@ -1440,7 +1432,7 @@ public: void operator()(const sstables::scylla_metadata::large_data_stats& val) const { _writer.StartObject(); for (const auto& [k, v] : val.map) { - _writer.Key(to_string(k)); + _writer.Key(fmt::format("{}", k)); _writer.StartObject(); _writer.Key("max_value"); _writer.Uint64(v.max_value); @@ -1452,6 +1444,32 @@ public: } _writer.EndObject(); } + void operator()(const sstables::large_data_record& val) const { + _writer.StartObject(); + _writer.Key("type"); + _writer.String(fmt::format("{}", val.type)); + _writer.Key("partition_key"); + auto pk = sstables::key_view(val.partition_key.value).to_partition_key(*_schema); + _writer.String(key_to_str(pk, *_schema)); + _writer.Key("clustering_key"); + if (!val.clustering_key.value.empty()) { + auto ck = clustering_key_prefix::from_bytes(val.clustering_key.value); + _writer.String(key_to_str(ck, *_schema)); + } else { + _writer.String(""); + } + _writer.Key("column_name"); + _writer.String(disk_string_to_string(val.column_name)); + _writer.Key("value"); + _writer.Uint64(val.value); + _writer.Key("elements_count"); + _writer.Uint64(val.elements_count); + _writer.Key("range_tombstones"); + _writer.Uint64(val.range_tombstones); + _writer.Key("dead_rows"); + _writer.Uint64(val.dead_rows); + _writer.EndObject(); + } void operator()(const sstables::scylla_metadata::ext_timestamp_stats& val) const { _writer.StartObject(); for (const auto& [k, v] : val.map) { @@ -1538,7 +1556,7 @@ void dump_scylla_metadata_operation(schema_ptr schema, reader_permit permit, con continue; } for (const auto& [k, v] : m->data.data) { - std::visit(scylla_metadata_visitor(writer), v); + std::visit(scylla_metadata_visitor(writer, schema), v); } if (m->digest.has_value()) { writer.Key("digest");