From ce00d6191724fe5bbf3081c1e7900cb51e0a591d Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 26 Mar 2026 17:21:27 +0200 Subject: [PATCH] db: implement large_data virtual tables with feature flag gating Replace the physical system.large_partitions, system.large_rows, and system.large_cells CQL tables with virtual tables that read from LargeDataRecords stored in SSTable scylla metadata (tag 13). The transition is gated by a new LARGE_DATA_VIRTUAL_TABLES cluster feature flag: - Before the feature is enabled: the old physical tables remain in all_tables(), CQL writes are active, no virtual tables are registered. This ensures safe rollback during rolling upgrades. - After the feature is enabled: old physical tables are dropped from disk via legacy_drop_table_on_all_shards(), virtual tables are registered on all shards, and CQL writes are skipped via skip_cql_writes() in cql_table_large_data_handler. Key implementation details: - Three virtual table classes (large_partitions_virtual_table, large_rows_virtual_table, large_cells_virtual_table) extend streaming_virtual_table with cross-shard record collection. - generate_legacy_id() gains a version parameter; virtual tables use version 1 to get different UUIDs than the old physical tables. - compaction_time is derived from SSTable generation UUID at display time via UUID_gen::unix_timestamp(). - Legacy SSTables without LargeDataRecords emit synthetic summary rows based on above_threshold > 0 in LargeDataStats. - The activation logic uses two paths: when the feature is already enabled (test env, restart), it runs as a coroutine; when not yet enabled, it registers a when_enabled callback that runs inside seastar::async from feature_service::enable(). - sstable_3_x_test updated to use a simplified large_data_test_handler and validate LargeDataRecords in SSTable metadata directly. --- db/large_data_handler.cc | 13 + db/large_data_handler.hh | 6 + db/virtual_tables.cc | 675 +++++++++++++++++- db/virtual_tables.hh | 4 +- .../troubleshooting/large-partition-table.rst | 73 ++ .../large-rows-large-cells-tables.rst | 8 + gms/feature_service.hh | 1 + main.cc | 2 +- schema/schema.cc | 15 +- schema/schema.hh | 2 +- test/boost/cql_query_large_test.cc | 73 +- test/boost/sstable_3_x_test.cc | 321 ++++----- test/lib/cql_test_env.cc | 2 +- 13 files changed, 1003 insertions(+), 192 deletions(-) diff --git a/db/large_data_handler.cc b/db/large_data_handler.cc index ef8c5e6f10..d3b3e4aaf2 100644 --- a/db/large_data_handler.cc +++ b/db/large_data_handler.cc @@ -142,6 +142,10 @@ future<> large_data_handler::maybe_update_large_data_entries_sstable_name(sstabl return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result(); } +bool cql_table_large_data_handler::skip_cql_writes() const { + return bool(_feat.large_data_virtual_tables); +} + cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& feat, utils::updateable_value partition_threshold_mb, utils::updateable_value row_threshold_mb, @@ -180,6 +184,9 @@ future<> cql_table_large_data_handler::do_insert_large_data_entry(std::string_vi sstring ks_name, sstring cf_name, sstring sstable_name, int64_t size, sstring partition_key, db_clock::time_point compaction_time, const std::vector& extra_fields, Args&&... args) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); if (!sys_ks) { co_return; @@ -284,6 +291,9 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable } future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); SCYLLA_ASSERT(sys_ks); const sstring req = @@ -329,6 +339,9 @@ cql_table_large_data_handler::row_reinsert_func cql_table_large_data_handler::ma } future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const { + if (skip_cql_writes()) { + co_return; + } auto sys_ks = _sys_ks.get_permit(); SCYLLA_ASSERT(sys_ks); // sstable_name is a clustering key, so we can't update it in place. diff --git a/db/large_data_handler.hh b/db/large_data_handler.hh index e807a736de..1e68ee7bd7 100644 --- a/db/large_data_handler.hh +++ b/db/large_data_handler.hh @@ -181,6 +181,12 @@ protected: virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override; private: + // Returns true if CQL writes to system.large_* tables should be skipped. + // Once LARGE_DATA_VIRTUAL_TABLES is enabled, large data records are served + // from SSTable metadata via virtual tables and the physical CQL tables are + // dropped, so writing to them is both unnecessary and would fail. + bool skip_cql_writes() const; + future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const; future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key, diff --git a/db/virtual_tables.cc b/db/virtual_tables.cc index 0b9909562f..076aad9a6e 100644 --- a/db/virtual_tables.cc +++ b/db/virtual_tables.cc @@ -26,6 +26,8 @@ #include "db/size_estimates_virtual_reader.hh" #include "db/view/build_progress_virtual_reader.hh" #include "index/built_indexes_virtual_reader.hh" +#include "keys/keys.hh" +#include "gms/feature_service.hh" #include "gms/gossiper.hh" #include "mutation/frozen_mutation.hh" #include "transport/protocol_server.hh" @@ -35,6 +37,7 @@ #include "service/raft/raft_group_registry.hh" #include "service/storage_service.hh" #include "service/tablet_allocator.hh" +#include "sstables/sstables.hh" #include "locator/load_sketch.hh" #include "types/list.hh" #include "types/types.hh" @@ -1461,6 +1464,595 @@ private: } }; +// Helper: convert bytes stored in a disk_string to sstring. +// disk_string::value is of type `bytes`. +static sstring disk_string_to_sstring(const sstables::disk_string& ds) { + return sstring(to_string_view(ds.value)); +} + +// Helper: convert int64_t millis-since-epoch to db_clock::time_point +// for use with timestamp_type columns in virtual tables. +static db_clock::time_point millis_to_db_clock(int64_t ms) { + return db_clock::time_point(db_clock::duration(ms)); +} + +// Derive compaction_time (milliseconds since Unix epoch) from the SSTable's +// generation UUID. If the generation is not UUID-based (legacy integer +// generations), returns 0. +static int64_t compaction_time_from_generation(const sstables::sstable& sst) { + auto gen = sst.generation(); + if (gen.is_uuid_based()) { + return utils::UUID_gen::unix_timestamp(gen.as_uuid()).count(); + } + return 0; +} + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows partition_size records from all live SSTables, matching the +// schema of the legacy system.large_partitions CQL table. +class large_partitions_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t partition_size; + sstring partition_key; + int64_t compaction_time; + int64_t rows; + int64_t range_tombstones; + int64_t dead_rows; + }; + + // Extract partition_size records from a single table's SSTables on the local shard. + // Called on each shard via map() to collect records cross-shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + // Legacy SSTable without LargeDataRecords: emit synthetic row + // if above_threshold > 0 for partition_size or rows_in_partition stats. + auto stat = sst->get_large_data_stat(sstables::large_data_type::partition_size); + auto rows_stat = sst->get_large_data_stat(sstables::large_data_type::rows_in_partition); + if ((stat && stat->above_threshold > 0) || (rows_stat && rows_stat->above_threshold > 0)) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .partition_size = static_cast(stat ? stat->max_value : 0), + .partition_key = "(details unavailable - legacy SSTable)", + .compaction_time = compaction_time_from_generation(*sst), + .rows = static_cast(rows_stat ? rows_stat->max_value : 0), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::partition_size && + rec.type != sstables::large_data_type::rows_in_partition) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + records.push_back(record{ + .sstable_name = sst_name, + .partition_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .compaction_time = compaction_time, + .rows = static_cast(rec.elements_count), + .range_tombstones = static_cast(rec.range_tombstones), + .dead_rows = static_cast(rec.dead_rows), + }); + } + } + co_return records; + } + +public: + explicit large_partitions_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, 1); + schema_builder builder(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, std::make_optional(id)); + builder.with_column("keyspace_name", utf8_type, column_kind::partition_key); + builder.with_column("table_name", utf8_type, column_kind::partition_key); + builder.with_column("sstable_name", utf8_type, column_kind::clustering_key); + builder.with_column("partition_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key); + builder.with_column("partition_key", utf8_type, column_kind::clustering_key); + builder.with_column("rows", long_type); + builder.with_column("compaction_time", timestamp_type); + builder.with_column("range_tombstones", long_type); + builder.with_column("dead_rows", long_type); + builder.set_comment("partitions larger than specified threshold"); + builder.with_hash_version(); + return builder.build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t partition_size, const sstring& pk) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(partition_size), + data_value(pk).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + // Phase 1: Determine which (ks, table) partitions this shard owns. + // Table metadata is shared across all shards, so local iteration is sufficient. + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + // Sort by token order — streaming_virtual_table requires + // partitions to be emitted in token order. + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + // Phase 2: For each owned table, collect records from ALL shards + // in parallel, then sort and emit immediately. We process one + // table at a time to bound memory usage: once we have emitted + // page_size records we stop and let CQL paging fetch the rest. + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + // Sort by (sstable_name ASC, partition_size DESC, partition_key ASC) to match clustering order + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.partition_size != b.partition_size) { + return a.partition_size > b.partition_size; // DESC + } + return a.partition_key < b.partition_key; + }); + // A partition that exceeds both the size and the row-count + // thresholds produces two LargeDataRecords entries (one per + // type) that map to identical clustering keys. Remove the + // duplicates so that every emitted row has a unique key. + records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) { + return a.sstable_name == b.sstable_name + && a.partition_size == b.partition_size + && a.partition_key == b.partition_key; + }), records.end()); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.partition_size, rec.partition_key)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + if (rec.rows != 0) { + set_cell(cr.cells(), "rows", rec.rows); + } + if (rec.range_tombstones != 0) { + set_cell(cr.cells(), "range_tombstones", rec.range_tombstones); + } + if (rec.dead_rows != 0) { + set_cell(cr.cells(), "dead_rows", rec.dead_rows); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows row_size records from all live SSTables, matching the +// schema of the legacy system.large_rows CQL table. +class large_rows_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t row_size; + sstring partition_key; + sstring clustering_key; + int64_t compaction_time; + }; + + // Extract row_size records from a single table's SSTables on the local shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + auto stat = sst->get_large_data_stat(sstables::large_data_type::row_size); + if (stat && stat->above_threshold > 0) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .row_size = static_cast(stat->max_value), + .partition_key = "(details unavailable - legacy SSTable)", + .clustering_key = "", + .compaction_time = compaction_time_from_generation(*sst), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::row_size) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + auto ck_str = rec.clustering_key.value.empty() + ? sstring() + : sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema)); + records.push_back(record{ + .sstable_name = sst_name, + .row_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .clustering_key = std::move(ck_str), + .compaction_time = compaction_time, + }); + } + } + co_return records; + } + +public: + explicit large_rows_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_ROWS, 1); + return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_ROWS, std::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + .with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("compaction_time", timestamp_type) + .set_comment("rows larger than specified threshold") + .with_hash_version() + .build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t row_size, + const sstring& pk, const sstring& ck) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(row_size), + data_value(pk).serialize_nonnull(), + data_value(ck).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.row_size != b.row_size) { + return a.row_size > b.row_size; // DESC + } else if (a.partition_key != b.partition_key) { + return a.partition_key < b.partition_key; + } + return a.clustering_key < b.clustering_key; + }); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.row_size, rec.partition_key, rec.clustering_key)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +// Virtual table backed by LargeDataRecords in SSTable scylla metadata. +// Shows cell_size records from all live SSTables, matching the +// schema of the legacy system.large_cells CQL table. +class large_cells_virtual_table : public streaming_virtual_table { + sharded& _db; + + struct record { + sstring sstable_name; + int64_t cell_size; + sstring partition_key; + sstring clustering_key; + sstring column_name; + int64_t collection_elements; // -1 means not applicable + int64_t compaction_time; + }; + + // Extract cell_size records from a single table's SSTables on the local shard. + static future> collect_local_records(replica::table& table) { + std::vector records; + auto table_schema = table.schema(); + auto sstables = co_await table.take_sstable_set_snapshot(); + for (const auto& sst : sstables) { + auto& records_opt = sst->get_large_data_records(); + if (!records_opt) { + auto stat = sst->get_large_data_stat(sstables::large_data_type::cell_size); + auto elem_stat = sst->get_large_data_stat(sstables::large_data_type::elements_in_collection); + if ((stat && stat->above_threshold > 0) || (elem_stat && elem_stat->above_threshold > 0)) { + records.push_back(record{ + .sstable_name = sst->component_basename(sstables::component_type::Data), + .cell_size = static_cast(stat ? stat->max_value : 0), + .partition_key = "(details unavailable - legacy SSTable)", + .clustering_key = "", + .column_name = "", + .collection_elements = elem_stat ? static_cast(elem_stat->max_value) : int64_t(-1), + .compaction_time = compaction_time_from_generation(*sst), + }); + } + continue; + } + auto compaction_time = compaction_time_from_generation(*sst); + auto sst_name = sst->component_basename(sstables::component_type::Data); + for (const auto& rec : records_opt->elements) { + if (rec.type != sstables::large_data_type::cell_size && + rec.type != sstables::large_data_type::elements_in_collection) { + continue; + } + auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema); + auto ck_str = rec.clustering_key.value.empty() + ? sstring() + : sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema)); + records.push_back(record{ + .sstable_name = sst_name, + .cell_size = static_cast(rec.value), + .partition_key = key_to_str(pk, *table_schema), + .clustering_key = std::move(ck_str), + .column_name = disk_string_to_sstring(rec.column_name), + .collection_elements = rec.elements_count > 0 + ? static_cast(rec.elements_count) : int64_t(-1), + .compaction_time = compaction_time, + }); + } + } + co_return records; + } + +public: + explicit large_cells_virtual_table(sharded& db) + : streaming_virtual_table(build_schema()) + , _db(db) + { + _shard_aware = true; + } + + static schema_ptr build_schema() { + auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_CELLS, 1); + return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_CELLS, std::make_optional(id)) + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + .with_column("sstable_name", utf8_type, column_kind::clustering_key) + .with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key) + .with_column("partition_key", utf8_type, column_kind::clustering_key) + .with_column("clustering_key", utf8_type, column_kind::clustering_key) + .with_column("column_name", utf8_type, column_kind::clustering_key) + .with_column("collection_elements", long_type) + .with_column("compaction_time", timestamp_type) + .set_comment("cells larger than specified threshold") + .with_hash_version() + .build(); + } + + dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) { + return dht::decorate_key(*_s, partition_key::from_exploded(*_s, { + data_value(ks_name).serialize_nonnull(), + data_value(cf_name).serialize_nonnull() + })); + } + + clustering_key make_clustering_key(const sstring& sstable_name, int64_t cell_size, + const sstring& pk, const sstring& ck, const sstring& col_name) { + return clustering_key::from_exploded(*_s, { + data_value(sstable_name).serialize_nonnull(), + reversed_type_impl::get_instance(long_type)->decompose(cell_size), + data_value(pk).serialize_nonnull(), + data_value(ck).serialize_nonnull(), + data_value(col_name).serialize_nonnull() + }); + } + + future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override { + struct table_info { + table_id tid; + dht::decorated_key dk; + }; + std::vector owned_tables; + + auto& db = _db.local(); + db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr table) { + auto& ks_name = table->schema()->ks_name(); + auto& cf_name = table->schema()->cf_name(); + auto dk = make_partition_key(ks_name, cf_name); + if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) { + owned_tables.push_back(table_info{tid, std::move(dk)}); + } + }); + + std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s), + std::mem_fn(&table_info::dk)); + + static constexpr size_t page_size = 1000; + size_t emitted = 0; + + for (auto& ti : owned_tables) { + auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future> { + if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) { + co_return co_await collect_local_records(*table); + } + co_return std::vector{}; + }); + std::vector records; + for (auto& shard_records : per_shard) { + records.insert(records.end(), + std::make_move_iterator(shard_records.begin()), + std::make_move_iterator(shard_records.end())); + } + if (records.empty()) { + continue; + } + std::ranges::sort(records, [] (const record& a, const record& b) { + if (a.sstable_name != b.sstable_name) { + return a.sstable_name < b.sstable_name; + } else if (a.cell_size != b.cell_size) { + return a.cell_size > b.cell_size; // DESC + } else if (a.partition_key != b.partition_key) { + return a.partition_key < b.partition_key; + } else if (a.clustering_key != b.clustering_key) { + return a.clustering_key < b.clustering_key; + } + return a.column_name < b.column_name; + }); + // A collection cell that exceeds both the size and the + // element-count thresholds produces two LargeDataRecords + // entries that map to identical clustering keys. Remove the + // duplicates so that every emitted row has a unique key. + records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) { + return a.sstable_name == b.sstable_name + && a.cell_size == b.cell_size + && a.partition_key == b.partition_key + && a.clustering_key == b.clustering_key + && a.column_name == b.column_name; + }), records.end()); + co_await result.emit_partition_start(ti.dk); + for (const auto& rec : records) { + clustering_row cr(make_clustering_key(rec.sstable_name, rec.cell_size, + rec.partition_key, rec.clustering_key, rec.column_name)); + if (rec.compaction_time != 0) { + set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time)); + } + if (rec.collection_elements >= 0) { + set_cell(cr.cells(), "collection_elements", rec.collection_elements); + } + co_await result.emit_row(std::move(cr)); + } + co_await result.emit_partition_end(); + emitted += records.size(); + if (emitted >= page_size) { + break; + } + } + } +}; + +} + +// Helper: register a virtual table on the local shard. +static future<> add_virtual_table( + sharded& sys_ks, + sharded& dist_db, + sharded& dist_ss, + std::unique_ptr&& tbl) { + auto& virtual_tables = *sys_ks.local().get_virtual_tables_registry(); + auto& db = dist_db.local(); + auto& ss = dist_ss.local(); + + auto schema = tbl->schema(); + virtual_tables[schema->id()] = std::move(tbl); + co_await db.create_local_system_table(schema, false, ss.get_erm_factory()); + auto& cf = db.find_column_family(schema); + cf.mark_ready_for_writes(nullptr); + auto& vt = virtual_tables[schema->id()]; + cf.set_virtual_reader(vt->as_mutation_source()); + cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); }); } future<> initialize_virtual_tables( @@ -1469,43 +2061,86 @@ future<> initialize_virtual_tables( sharded& sys_ks, sharded& tablet_allocator, sharded& ms, - db::config& cfg) { + db::config& cfg, + gms::feature_service& feat) { co_await smp::invoke_on_all([&] () -> future<> { - auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry(); - auto& virtual_tables = *virtual_tables_registry; - auto& db = dist_db.local(); - auto& ss = dist_ss.local(); - auto add_table = [&] (std::unique_ptr&& tbl) -> future<> { - auto schema = tbl->schema(); - virtual_tables[schema->id()] = std::move(tbl); - co_await db.create_local_system_table(schema, false, ss.get_erm_factory()); - auto& cf = db.find_column_family(schema); - cf.mark_ready_for_writes(nullptr); - auto& vt = virtual_tables[schema->id()]; - cf.set_virtual_reader(vt->as_mutation_source()); - cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); }); + co_await add_virtual_table(sys_ks, dist_db, dist_ss, std::move(tbl)); }; + auto& db = dist_db.local(); + // Add built-in virtual tables here. co_await add_table(std::make_unique(dist_ss, dist_gossiper)); - co_await add_table(std::make_unique(db, ss)); + co_await add_table(std::make_unique(db, dist_ss.local())); co_await add_table(std::make_unique(dist_db)); - co_await add_table(std::make_unique(ss)); - co_await add_table(std::make_unique(dist_db, ss)); + co_await add_table(std::make_unique(dist_ss.local())); + co_await add_table(std::make_unique(dist_db, dist_ss.local())); co_await add_table(std::make_unique()); co_await add_table(std::make_unique(cfg)); - co_await add_table(std::make_unique(ss)); + co_await add_table(std::make_unique(dist_ss.local())); co_await add_table(std::make_unique(dist_raft_gr)); co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper)); co_await add_table(std::make_unique(tablet_allocator, dist_db, dist_raft_gr, ms)); - co_await add_table(std::make_unique(db, ss)); - co_await add_table(std::make_unique(db, ss)); + co_await add_table(std::make_unique(db, dist_ss.local())); + co_await add_table(std::make_unique(db, dist_ss.local())); db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local()))); db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db))); db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db))); }); + + // Drop old physical system.large_* tables and register virtual + // replacements. Must run on shard 0 (uses cross-shard operations). + auto activate_large_data_virtual_tables = [&dist_db, &sys_ks, &dist_ss] () -> future<> { + // Drop old physical system.large_* tables. Their data is now + // served from SSTable metadata via virtual tables. + // TODO: In a follow-up, read existing large data entries from + // the old tables and populate per-sstable LargeDataRecords + // metadata via component_rewrite before dropping. + for (auto table_name : { + db::system_keyspace::LARGE_PARTITIONS, + db::system_keyspace::LARGE_ROWS, + db::system_keyspace::LARGE_CELLS}) { + try { + co_await replica::database::legacy_drop_table_on_all_shards( + dist_db, sys_ks, db::system_keyspace::NAME, table_name, false); + } catch (const replica::no_such_column_family&) { + // Already dropped (e.g. restart after feature was enabled). + } + } + + // Add virtual tables on all shards. + co_await smp::invoke_on_all([&dist_db, &sys_ks, &dist_ss] () -> future<> { + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + co_await add_virtual_table(sys_ks, dist_db, dist_ss, + std::make_unique(dist_db)); + }); + }; + + if (feat.large_data_virtual_tables) { + // Feature already enabled (e.g. test environment or restart after + // upgrade). Activate directly as a coroutine — no seastar::async + // context needed. + co_await activate_large_data_virtual_tables(); + } else { + // Feature not yet enabled. Register a callback that will fire + // when the feature is enabled during rolling upgrade. The callback + // runs inside seastar::async context (via feature_service::enable), + // so .get() is safe. + // + // The listener_registration must outlive the feature, so we store + // it in a static variable (process lifetime). This function is + // only called on shard 0, so the listener fires only on shard 0. + static gms::feature::listener_registration large_data_vt_listener; + large_data_vt_listener = feat.large_data_virtual_tables.when_enabled( + [activate_large_data_virtual_tables = std::move(activate_large_data_virtual_tables)] { + activate_large_data_virtual_tables().get(); + }); + } } virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique()) { diff --git a/db/virtual_tables.hh b/db/virtual_tables.hh index 94d9ec4ee3..7f1ebfc437 100644 --- a/db/virtual_tables.hh +++ b/db/virtual_tables.hh @@ -24,6 +24,7 @@ class tablet_allocator; } namespace gms { +class feature_service; class gossiper; } @@ -44,7 +45,8 @@ future<> initialize_virtual_tables( sharded&, sharded&, sharded&, - db::config&); + db::config&, + gms::feature_service&); class virtual_table; diff --git a/docs/troubleshooting/large-partition-table.rst b/docs/troubleshooting/large-partition-table.rst index 7677f9fb0e..6d28a238dc 100644 --- a/docs/troubleshooting/large-partition-table.rst +++ b/docs/troubleshooting/large-partition-table.rst @@ -128,6 +128,79 @@ Expiring Data In order to prevent stale data from appearing, all rows in ``system.large_partitions`` table are inserted with Time To Live (TTL) equal to 30 days. + +.. _large-data-virtual-tables: + +Virtual Tables (``LARGE_DATA_VIRTUAL_TABLES`` Feature) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Starting with ScyllaDB 2026.2, the ``system.large_partitions``, +``system.large_rows``, and ``system.large_cells`` tables are implemented as +**virtual tables** that read directly from metadata stored inside each SSTable +rather than from separate physical system tables. + +How it works +"""""""""""" + +When an SSTable is written (during flush or compaction), the SSTable writer +records the top-N above-threshold large data entries into a +``LargeDataRecords`` component in the SSTable's ``.Scylla`` metadata file. +The virtual tables query this metadata from all live SSTables at read time, +so large data records automatically follow their SSTables when they are +migrated between shards or nodes (e.g. during tablet migration or repair). + +The maximum number of records stored per large data type per SSTable is +controlled by the ``compaction_large_data_records_per_sstable`` configuration +option (default: 10). + +Activation +"""""""""" + +The transition is gated by the ``LARGE_DATA_VIRTUAL_TABLES`` cluster feature +flag, which is automatically enabled once all nodes in the cluster are +upgraded to a version that supports it. + +When the feature is enabled on a node: + +1. The old physical ``system.large_partitions``, ``system.large_rows``, and + ``system.large_cells`` tables are dropped. +2. Virtual table replacements are registered in their place. +3. New SSTable writes stop populating the old physical tables. + +During a rolling upgrade, as long as any node in the cluster has not been +upgraded, the feature remains disabled and all nodes continue writing to +the old physical tables. This ensures safe rollback if the upgrade needs +to be reverted. + +Upgrade considerations +"""""""""""""""""""""" + +After the ``LARGE_DATA_VIRTUAL_TABLES`` feature is enabled, the virtual tables +derive their content from ``LargeDataRecords`` metadata stored in SSTables. +SSTables written by older versions do not contain this metadata. As a result, +**the virtual tables may appear empty until those older SSTables are rewritten** +by compaction. + +To populate the virtual tables promptly after upgrade, run one of the following +on each node: + +* **Upgrade SSTables compaction** (recommended -- rewrites SSTables without + waiting for normal compaction triggers): + + .. code-block:: console + + nodetool upgradesstables --include-all-sstables + +* **Major compaction** (rewrites all SSTables into a single set, but may + temporarily increase disk usage): + + .. code-block:: console + + nodetool compact + +Once the SSTables have been rewritten, the virtual tables will show the +current large data records. + .. include:: /troubleshooting/_common/ts-return.rst Additional Resources diff --git a/docs/troubleshooting/large-rows-large-cells-tables.rst b/docs/troubleshooting/large-rows-large-cells-tables.rst index f0c93175c3..a2beb356b4 100644 --- a/docs/troubleshooting/large-rows-large-cells-tables.rst +++ b/docs/troubleshooting/large-rows-large-cells-tables.rst @@ -170,4 +170,12 @@ Expiring Data ^^^^^^^^^^^^^ In order to prevent stale data from appearing, all rows in the ``system.large_rows`` and ``system.large_cells`` tables are inserted with Time To Live (TTL) equal to 30 days. +Virtual Tables +^^^^^^^^^^^^^^ + +Starting with ScyllaDB 2026.2, ``system.large_rows`` and ``system.large_cells`` +are implemented as virtual tables that read from SSTable metadata. See +:ref:`large-data-virtual-tables` for details on the +``LARGE_DATA_VIRTUAL_TABLES`` feature and upgrade considerations. + .. include:: /troubleshooting/_common/ts-return.rst diff --git a/gms/feature_service.hh b/gms/feature_service.hh index e7431ce548..9ef7e9e01d 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -181,6 +181,7 @@ public: gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv }; gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv }; gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv }; + gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv }; public: const std::unordered_map>& registered_features() const; diff --git a/main.cc b/main.cc index f2cad1f061..f69592ee53 100644 --- a/main.cc +++ b/main.cc @@ -1963,7 +1963,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); checkpoint(stop_signal, "initializing virtual tables"); - db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg).get(); + db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg, feature_service.local()).get(); // #293 - do not stop anything // engine().at_exit([&qp] { return qp.stop(); }); diff --git a/schema/schema.cc b/schema/schema.cc index 461b37e8d7..de2c32ffdf 100644 --- a/schema/schema.cc +++ b/schema/schema.cc @@ -1306,9 +1306,20 @@ bool operator==(const column_definition& x, const column_definition& y) } // Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId +// +// The version suffix (e.g. "_v1") allows creating a table with a +// different UUID than the original (version 0). This is used when +// replacing a physical table with a virtual table of the same name: +// the virtual table uses version 1 so its UUID differs from the old +// physical table, allowing the old table to be properly dropped by +// UUID before the replacement is registered. table_id -generate_legacy_id(const sstring& ks_name, const sstring& cf_name) { - return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name)); +generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version) { + auto name = ks_name + cf_name; + if (version > 0) { + name += fmt::format("_v{}", version); + } + return table_id(utils::UUID_gen::get_name_UUID(name)); } schema_builder& schema_builder::set_compaction_strategy_options(std::map&& options) { diff --git a/schema/schema.hh b/schema/schema.hh index eac788c8c7..e3d70d393a 100644 --- a/schema/schema.hh +++ b/schema/schema.hh @@ -1075,7 +1075,7 @@ public: std::ostream& operator<<(std::ostream& os, const view_ptr& view); -table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name); +table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version = 0); // Thrown when attempted to access a schema-dependent object using diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 04aa0772cb..88e2fe433d 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -90,6 +90,9 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { // Check the only the large row is added to system.large_rows. BOOST_REQUIRE_EQUAL(rows.size(), 1); auto row0 = rows[0]; + // The result has 3 columns: partition_key, row_size, and table_name + // (CQL adds the filtered partition key column to the result set when + // using ALLOW FILTERING with a partial partition key restriction). BOOST_REQUIRE_EQUAL(row0.size(), 3); BOOST_REQUIRE_EQUAL(to_bytes(*row0[0]), "44"); BOOST_REQUIRE_EQUAL(to_bytes(*row0[2]), "tbl"); @@ -118,10 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { e.execute_cql("delete from tbl where a = 44;").get(); - // In order to guarantee that system.large_rows, system.large_cells and system.large_partitions have been updated, we have to + // In order to guarantee that system.large_rows, system.large_cells and + // system.large_partitions are empty, we need to: // * flush, so that a tombstone for the above delete is created. // * do a major compaction, so that the tombstone is combined with the old entry, - // and the old sstable is deleted. + // and the old sstable (which holds the large data records in its metadata) is deleted. flush(e); e.db().invoke_on_all([] (replica::database& dbi) { return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { @@ -165,6 +169,71 @@ SEASTAR_THREAD_TEST_CASE(test_large_row_count_warning) { }, cfg).get(); } +// Test that when a partition exceeds both the size threshold and the row +// count threshold, system.large_partitions still returns a single row +// (not two rows with the same clustering key). +SEASTAR_THREAD_TEST_CASE(test_large_partitions_dual_threshold) { + auto cfg = make_shared(); + // Set very low thresholds so that a single partition with a handful + // of rows containing modest data exceeds both size and row-count + // thresholds simultaneously. + cfg->compaction_large_partition_warning_threshold_mb(1); + cfg->compaction_rows_count_warning_threshold(10); + do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create table tbl (a int, b int, c text, primary key (a, b))").get(); + // Insert enough rows with enough data to exceed 1 MB partition size + // AND more than 10 rows. + sstring blob(128 * 1024, 'x'); // 128 KB per row + for (int i = 0; i < 11; ++i) { + e.execute_cql(format("insert into tbl (a, b, c) values (42, {}, '{}');", i, blob)).get(); + } + flush(e); + + // There must be exactly one row in system.large_partitions for + // this table. Before the fix, two rows with the same clustering + // key would have been emitted. + assert_that(e.execute_cql( + "select partition_key, rows from system.large_partitions " + "where table_name = 'tbl' allow filtering;").get()) + .is_rows() + .with_size(1); + + return make_ready_future<>(); + }, cfg).get(); +} + +// Test that when a collection cell exceeds both the cell size threshold +// and the collection element count threshold, system.large_cells still +// returns a single row (not two rows with the same clustering key). +SEASTAR_THREAD_TEST_CASE(test_large_cells_dual_threshold) { + auto cfg = make_shared(); + // Set very low thresholds so that a collection with modest elements + // exceeds both size and element-count thresholds simultaneously. + cfg->compaction_large_cell_warning_threshold_mb(1); + cfg->compaction_collection_elements_count_warning_threshold(10); + do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create table tbl (a int, b list, primary key (a))").get(); + // Insert 128 KB blobs, 11 times -> ~1.4 MB total, exceeding 1 MB + // threshold. Also exceeds 10 element threshold. + sstring blob(128 * 1024, 'x'); + for (int i = 0; i < 11; ++i) { + e.execute_cql("update tbl set b = ['" + blob + "'] + b where a = 42;").get(); + } + flush(e); + + // There must be exactly one row in system.large_cells for this + // table. Before the fix, two rows with the same clustering key + // would have been emitted. + assert_that(e.execute_cql( + "select partition_key, column_name from system.large_cells " + "where table_name = 'tbl' allow filtering;").get()) + .is_rows() + .with_size(1); + + return make_ready_future<>(); + }, cfg).get(); +} + SEASTAR_TEST_CASE(test_insert_large_collection_values) { return do_with_cql_env([] (cql_test_env& e) { return seastar::async([&e] { diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 28d8272a86..b5aa442736 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -5092,70 +5092,34 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) { } namespace { -struct large_row_handler : public db::large_data_handler { - using callback_t = std::function; - callback_t callback; - - large_row_handler(uint64_t large_rows_threshold, uint64_t rows_count_threshold, uint64_t cell_threshold_bytes, uint64_t collection_elements_threshold, callback_t callback) - : large_data_handler(std::numeric_limits::max(), large_rows_threshold, cell_threshold_bytes, - rows_count_threshold, collection_elements_threshold) - , callback(std::move(callback)) { +// A large_data_handler with configurable thresholds for testing. +// After refactoring, the handler only does threshold comparison + logging; +// actual large data recording goes into the SSTable's LargeDataRecords metadata. +struct large_data_test_handler : public db::large_data_handler { + large_data_test_handler(uint64_t partition_threshold_bytes, uint64_t large_rows_threshold, + uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, + uint64_t collection_elements_threshold) + : large_data_handler(partition_threshold_bytes, large_rows_threshold, cell_threshold_bytes, + rows_count_threshold, collection_elements_threshold) { start(); } - - virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, clustering_key, row_size, 0, 0, nullptr, 0, 0); - return make_ready_future<>(); - } - - virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, clustering_key, 0, 0, 0, &cdef, cell_size, collection_elements); - return make_ready_future<>(); - } - - virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key, - uint64_t partition_size, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows_count) const override { - const schema_ptr s = sst.get_schema(); - callback(*s, partition_key, nullptr, rows_count, range_tombstones_count, dead_rows_count, nullptr, 0, 0); - return make_ready_future<>(); - } - - virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view) const override { - return make_ready_future<>(); - } - virtual future<> update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const override { - return make_ready_future<>(); - } +protected: + future<> record_large_cells(const sstables::sstable&, const sstables::key&, + const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { return make_ready_future<>(); } + future<> record_large_rows(const sstables::sstable&, const sstables::key&, + const clustering_key_prefix*, uint64_t) const override { return make_ready_future<>(); } + future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { return make_ready_future<>(); } + future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { return make_ready_future<>(); } + future<> record_large_partitions(const sstables::sstable&, const sstables::key&, + uint64_t, uint64_t, uint64_t, uint64_t) const override { return make_ready_future<>(); } }; } static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk, std::vector expected, uint64_t threshold, sstables::sstable_version_types version) { - unsigned i = 0; - auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s))); - BOOST_REQUIRE_LT(i, expected.size()); - BOOST_REQUIRE_GT(row_size, threshold); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - if (clustering_key) { - BOOST_REQUIRE(expected[i]->equal(s, *clustering_key)); - } else { - BOOST_REQUIRE_EQUAL(expected[i], nullptr); - } - ++i; - }; - - large_row_handler handler(threshold, std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), f); + large_data_test_handler handler(std::numeric_limits::max(), threshold, + std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(s, version); @@ -5165,14 +5129,50 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r // depends on the encoding statistics (because of variable-length encoding). The original values // were chosen with the default-constructed encoding_stats, so let's keep it that way. sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); - BOOST_REQUIRE_EQUAL(i, expected.size()); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); + + auto& records_opt = sst->get_large_data_records(); + // Collect row_size records (order is not guaranteed, so just count and validate). + unsigned row_size_count = 0; + unsigned static_row_count = 0; + unsigned clustering_row_count = 0; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type != large_data_type::row_size) { + continue; + } + BOOST_REQUIRE_GT(rec.value, threshold); + auto rec_ck_str = sstring(reinterpret_cast(rec.clustering_key.value.data()), rec.clustering_key.value.size()); + if (rec_ck_str.empty()) { + ++static_row_count; + } else { + ++clustering_row_count; + } + ++row_size_count; + } + } + // Count expected static rows (nullptr entries) and clustering rows. + unsigned expected_static = 0; + unsigned expected_clustering = 0; + for (auto* ck : expected) { + if (ck) { + ++expected_clustering; + } else { + ++expected_static; + } + } + BOOST_REQUIRE_EQUAL(row_size_count, expected.size()); + BOOST_REQUIRE_EQUAL(static_row_count, expected_static); + BOOST_REQUIRE_EQUAL(clustering_row_count, expected_clustering); }, { &handler }).get(); } SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation partition = s.new_mutation("pv"); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation partition(s.schema(), s.make_pkey()); const partition_key& pk = partition.key(); s.add_static_row(partition, "foo bar zed"); @@ -5190,26 +5190,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) { static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk, std::vector expected, uint64_t threshold, sstables::sstable_version_types version) { - unsigned i = 0; - auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_TEST_MESSAGE(format("i={} ck={} cell_size={} threshold={}", i, clustering_key ? format("{}", *clustering_key) : "null", cell_size, threshold)); - BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s))); - BOOST_REQUIRE_LT(i, expected.size()); - BOOST_REQUIRE_GT(cell_size, threshold); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - if (clustering_key) { - BOOST_REQUIRE(expected[i]->equal(s, *clustering_key)); - } else { - BOOST_REQUIRE_EQUAL(expected[i], nullptr); - } - ++i; - }; - - large_row_handler handler(std::numeric_limits::max(), std::numeric_limits::max(), threshold, std::numeric_limits::max(), f); + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), threshold, + std::numeric_limits::max(), std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(s, version); @@ -5219,14 +5202,50 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, // depends on the encoding statistics (because of variable-length encoding). The original values // were chosen with the default-constructed encoding_stats, so let's keep it that way. sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get(); - BOOST_REQUIRE_EQUAL(i, expected.size()); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); + + auto& records_opt = sst->get_large_data_records(); + // Collect cell_size records (order is not guaranteed, so just count and validate). + unsigned cell_size_count = 0; + unsigned static_cell_count = 0; + unsigned clustering_cell_count = 0; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type != large_data_type::cell_size) { + continue; + } + BOOST_REQUIRE_GT(rec.value, threshold); + auto rec_ck_str = sstring(reinterpret_cast(rec.clustering_key.value.data()), rec.clustering_key.value.size()); + if (rec_ck_str.empty()) { + ++static_cell_count; + } else { + ++clustering_cell_count; + } + ++cell_size_count; + } + } + // Count expected static cells (nullptr entries) and clustering cells. + unsigned expected_static = 0; + unsigned expected_clustering = 0; + for (auto* ck : expected) { + if (ck) { + ++expected_clustering; + } else { + ++expected_static; + } + } + BOOST_REQUIRE_EQUAL(cell_size_count, expected.size()); + BOOST_REQUIRE_EQUAL(static_cell_count, expected_static); + BOOST_REQUIRE_EQUAL(clustering_cell_count, expected_clustering); }, { &handler }).get(); } SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation partition = s.new_mutation("pv"); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation partition(s.schema(), s.make_pkey()); const partition_key& pk = partition.key(); s.add_static_row(partition, "foo bar zed"); @@ -5245,8 +5264,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) { static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); sstring sv; for (auto idx = 0; idx < rows - 1; idx++) { sv += "foo "; @@ -5259,27 +5278,27 @@ static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uin schema_ptr sc = s.schema(); auto mt = make_memtable(sc, {p}); - bool logged = false; - auto f = [&logged, &pk, &threshold, &rows, &range_tombstones](const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(rows_count, threshold); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - - // Inserting one range tombstone creates two range tombstone marker rows - BOOST_REQUIRE_EQUAL(rows_count, rows + range_tombstones * 2); - BOOST_REQUIRE_EQUAL(range_tombstones_count, range_tombstones * 2); - logged = true; - }; - - large_row_handler handler(std::numeric_limits::max(), threshold, std::numeric_limits::max(), std::numeric_limits::max(), f); + // rows_count_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + threshold, std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5307,8 +5326,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) { static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); sstring sv; int live_rows = 0; int expected_dead_rows = 0; @@ -5355,7 +5374,6 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, } auto ck_start = ck; auto rt_start = bound_view(ck_start, tests::random::get_bool() ? bound_kind::incl_start : bound_kind::excl_start); - auto sv_end = sv; auto rt_size = tests::random::get_int(1, 10); for (auto i = 0; i < rt_size; i++) { sv += "X"; @@ -5374,25 +5392,27 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, auto mt = make_lw_shared(sc); mt->apply(p); - bool logged = false; - auto f = [&] (const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(rows_count, threshold); - BOOST_REQUIRE_EQUAL(rows_count, rows); - BOOST_REQUIRE_EQUAL(dead_rows, expected_dead_rows + expected_expired_rows + expected_rows_with_dead_cell); - BOOST_REQUIRE_EQUAL(range_tombstones, expected_range_tombstones); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - logged = true; - }; - - large_row_handler handler(std::numeric_limits::max(), threshold, std::numeric_limits::max(), std::numeric_limits::max(), f); + // rows_count_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + threshold, std::numeric_limits::max()); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5416,8 +5436,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) { static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) { simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes); tests::reader_concurrency_semaphore_wrapper semaphore; - mutation p = s.new_mutation("pv"); - const partition_key& pk = p.key(); + // Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata"). + mutation p(s.schema(), s.make_pkey()); std::map kv_map; for (auto i = 0; i < elements; i++) { kv_map[to_bytes(format("key{}", i))] = to_bytes(format("val{}", i)); @@ -5426,25 +5446,28 @@ static void test_sstable_too_many_collection_elements_f(int elements, uint64_t t schema_ptr sc = s.schema(); auto mt = make_memtable(sc, {p}); - bool logged = false; - auto f = [&logged, &pk, &threshold](const schema& sc, const sstables::key& partition_key, - const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows, - const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) { - BOOST_REQUIRE_GT(collection_elements, threshold); - BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc))); - BOOST_REQUIRE_EQUAL(range_tombstones, 0); - BOOST_REQUIRE_EQUAL(dead_rows, 0); - logged = true; - }; - BOOST_TEST_MESSAGE(format("elements={} threshold={} expected={}", elements, threshold, expected)); - large_row_handler handler(std::numeric_limits::max(), std::numeric_limits::max(), std::numeric_limits::max(), threshold, f); + // collection_elements_threshold = threshold; all other thresholds at MAX. + large_data_test_handler handler(std::numeric_limits::max(), + std::numeric_limits::max(), std::numeric_limits::max(), + std::numeric_limits::max(), threshold); sstables::test_env::do_with_async([&] (auto& env) { auto sst = env.make_sstable(sc, version); sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get(); + sstable_open_config cfg { .load_first_and_last_position_metadata = true }; + sst->open_data(cfg).get(); - BOOST_REQUIRE_EQUAL(logged, expected); + auto& records_opt = sst->get_large_data_records(); + bool found = false; + if (records_opt) { + for (auto& rec : records_opt->elements) { + if (rec.type == large_data_type::elements_in_collection && rec.elements_count > threshold) { + found = true; + } + } + } + BOOST_REQUIRE_EQUAL(found, expected); }, { &handler }).get(); } @@ -5465,40 +5488,10 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_too_many_collection_elements) { } // Handler for testing LargeDataRecords population in SSTable metadata. -// Unlike large_row_handler above, this one allows setting the partition_threshold_bytes too. +// large_data_test_handler already serves this purpose; keep this alias for +// test readability / backward compatibility. namespace { -struct large_data_records_handler : public db::large_data_handler { - large_data_records_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes, - uint64_t cell_threshold_bytes, uint64_t rows_count_threshold, - uint64_t collection_elements_threshold) - : large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, - rows_count_threshold, collection_elements_threshold) { - start(); - } - - virtual future<> record_large_rows(const sstables::sstable&, const sstables::key&, - const clustering_key_prefix*, uint64_t) const override { - return make_ready_future<>(); - } - - virtual future<> record_large_cells(const sstables::sstable&, const sstables::key&, - const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { - return make_ready_future<>(); - } - - virtual future<> record_large_partitions(const sstables::sstable&, const sstables::key&, - uint64_t, uint64_t, uint64_t, uint64_t) const override { - return make_ready_future<>(); - } - - virtual future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { - return make_ready_future<>(); - } - - virtual future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { - return make_ready_future<>(); - } -}; +using large_data_records_handler = large_data_test_handler; } // Test that writing an SSTable with data exceeding large data thresholds diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 5742e6c40e..0446b407f7 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1008,7 +1008,7 @@ private: _mnotifier.local().unregister_listener(&_ss.local()).get(); }); - db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg).get(); + db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg, _feature_service.local()).get(); _qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) { qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,