db: implement large_data virtual tables with feature flag gating

Replace the physical system.large_partitions, system.large_rows, and
system.large_cells CQL tables with virtual tables that read from
LargeDataRecords stored in SSTable scylla metadata (tag 13).

The transition is gated by a new LARGE_DATA_VIRTUAL_TABLES cluster
feature flag:

- Before the feature is enabled: the old physical tables remain in
  all_tables(), CQL writes are active, no virtual tables are registered.
  This ensures safe rollback during rolling upgrades.

- After the feature is enabled: old physical tables are dropped from
  disk via legacy_drop_table_on_all_shards(), virtual tables are
  registered on all shards, and CQL writes are skipped via
  skip_cql_writes() in cql_table_large_data_handler.

Key implementation details:

- Three virtual table classes (large_partitions_virtual_table,
  large_rows_virtual_table, large_cells_virtual_table) extend
  streaming_virtual_table with cross-shard record collection.

- generate_legacy_id() gains a version parameter; virtual tables
  use version 1 to get different UUIDs than the old physical tables.

- compaction_time is derived from SSTable generation UUID at display
  time via UUID_gen::unix_timestamp().

- Legacy SSTables without LargeDataRecords emit synthetic summary
  rows based on above_threshold > 0 in LargeDataStats.

- The activation logic uses two paths: when the feature is already
  enabled (test env, restart), it runs as a coroutine; when not yet
  enabled, it registers a when_enabled callback that runs inside
  seastar::async from feature_service::enable().

- sstable_3_x_test updated to use a simplified large_data_test_handler
  and validate LargeDataRecords in SSTable metadata directly.
This commit is contained in:
Benny Halevy
2026-03-26 17:21:27 +02:00
parent cb6004b625
commit ce00d61917
13 changed files with 1003 additions and 192 deletions

View File

@@ -142,6 +142,10 @@ future<> large_data_handler::maybe_update_large_data_entries_sstable_name(sstabl
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
}
bool cql_table_large_data_handler::skip_cql_writes() const {
return bool(_feat.large_data_virtual_tables);
}
cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& feat,
utils::updateable_value<uint32_t> partition_threshold_mb,
utils::updateable_value<uint32_t> row_threshold_mb,
@@ -180,6 +184,9 @@ future<> cql_table_large_data_handler::do_insert_large_data_entry(std::string_vi
sstring ks_name, sstring cf_name, sstring sstable_name,
int64_t size, sstring partition_key, db_clock::time_point compaction_time,
const std::vector<sstring>& extra_fields, Args&&... args) const {
if (skip_cql_writes()) {
co_return;
}
auto sys_ks = _sys_ks.get_permit();
if (!sys_ks) {
co_return;
@@ -284,6 +291,9 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
}
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
if (skip_cql_writes()) {
co_return;
}
auto sys_ks = _sys_ks.get_permit();
SCYLLA_ASSERT(sys_ks);
const sstring req =
@@ -329,6 +339,9 @@ cql_table_large_data_handler::row_reinsert_func cql_table_large_data_handler::ma
}
future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const {
if (skip_cql_writes()) {
co_return;
}
auto sys_ks = _sys_ks.get_permit();
SCYLLA_ASSERT(sys_ks);
// sstable_name is a clustering key, so we can't update it in place.

View File

@@ -181,6 +181,12 @@ protected:
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override;
private:
// Returns true if CQL writes to system.large_* tables should be skipped.
// Once LARGE_DATA_VIRTUAL_TABLES is enabled, large data records are served
// from SSTable metadata via virtual tables and the physical CQL tables are
// dropped, so writing to them is both unnecessary and would fail.
bool skip_cql_writes() const;
future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key,

View File

@@ -26,6 +26,8 @@
#include "db/size_estimates_virtual_reader.hh"
#include "db/view/build_progress_virtual_reader.hh"
#include "index/built_indexes_virtual_reader.hh"
#include "keys/keys.hh"
#include "gms/feature_service.hh"
#include "gms/gossiper.hh"
#include "mutation/frozen_mutation.hh"
#include "transport/protocol_server.hh"
@@ -35,6 +37,7 @@
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/tablet_allocator.hh"
#include "sstables/sstables.hh"
#include "locator/load_sketch.hh"
#include "types/list.hh"
#include "types/types.hh"
@@ -1461,6 +1464,595 @@ private:
}
};
// Helper: convert bytes stored in a disk_string to sstring.
// disk_string<uint32_t>::value is of type `bytes`.
static sstring disk_string_to_sstring(const sstables::disk_string<uint32_t>& ds) {
return sstring(to_string_view(ds.value));
}
// Helper: convert int64_t millis-since-epoch to db_clock::time_point
// for use with timestamp_type columns in virtual tables.
static db_clock::time_point millis_to_db_clock(int64_t ms) {
return db_clock::time_point(db_clock::duration(ms));
}
// Derive compaction_time (milliseconds since Unix epoch) from the SSTable's
// generation UUID. If the generation is not UUID-based (legacy integer
// generations), returns 0.
static int64_t compaction_time_from_generation(const sstables::sstable& sst) {
auto gen = sst.generation();
if (gen.is_uuid_based()) {
return utils::UUID_gen::unix_timestamp(gen.as_uuid()).count();
}
return 0;
}
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
// Shows partition_size records from all live SSTables, matching the
// schema of the legacy system.large_partitions CQL table.
class large_partitions_virtual_table : public streaming_virtual_table {
sharded<replica::database>& _db;
struct record {
sstring sstable_name;
int64_t partition_size;
sstring partition_key;
int64_t compaction_time;
int64_t rows;
int64_t range_tombstones;
int64_t dead_rows;
};
// Extract partition_size records from a single table's SSTables on the local shard.
// Called on each shard via map() to collect records cross-shard.
static future<std::vector<record>> collect_local_records(replica::table& table) {
std::vector<record> records;
auto table_schema = table.schema();
auto sstables = co_await table.take_sstable_set_snapshot();
for (const auto& sst : sstables) {
auto& records_opt = sst->get_large_data_records();
if (!records_opt) {
// Legacy SSTable without LargeDataRecords: emit synthetic row
// if above_threshold > 0 for partition_size or rows_in_partition stats.
auto stat = sst->get_large_data_stat(sstables::large_data_type::partition_size);
auto rows_stat = sst->get_large_data_stat(sstables::large_data_type::rows_in_partition);
if ((stat && stat->above_threshold > 0) || (rows_stat && rows_stat->above_threshold > 0)) {
records.push_back(record{
.sstable_name = sst->component_basename(sstables::component_type::Data),
.partition_size = static_cast<int64_t>(stat ? stat->max_value : 0),
.partition_key = "(details unavailable - legacy SSTable)",
.compaction_time = compaction_time_from_generation(*sst),
.rows = static_cast<int64_t>(rows_stat ? rows_stat->max_value : 0),
});
}
continue;
}
auto compaction_time = compaction_time_from_generation(*sst);
auto sst_name = sst->component_basename(sstables::component_type::Data);
for (const auto& rec : records_opt->elements) {
if (rec.type != sstables::large_data_type::partition_size &&
rec.type != sstables::large_data_type::rows_in_partition) {
continue;
}
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
records.push_back(record{
.sstable_name = sst_name,
.partition_size = static_cast<int64_t>(rec.value),
.partition_key = key_to_str(pk, *table_schema),
.compaction_time = compaction_time,
.rows = static_cast<int64_t>(rec.elements_count),
.range_tombstones = static_cast<int64_t>(rec.range_tombstones),
.dead_rows = static_cast<int64_t>(rec.dead_rows),
});
}
}
co_return records;
}
public:
explicit large_partitions_virtual_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, 1);
schema_builder builder(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, std::make_optional(id));
builder.with_column("keyspace_name", utf8_type, column_kind::partition_key);
builder.with_column("table_name", utf8_type, column_kind::partition_key);
builder.with_column("sstable_name", utf8_type, column_kind::clustering_key);
builder.with_column("partition_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key);
builder.with_column("partition_key", utf8_type, column_kind::clustering_key);
builder.with_column("rows", long_type);
builder.with_column("compaction_time", timestamp_type);
builder.with_column("range_tombstones", long_type);
builder.with_column("dead_rows", long_type);
builder.set_comment("partitions larger than specified threshold");
builder.with_hash_version();
return builder.build();
}
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(const sstring& sstable_name, int64_t partition_size, const sstring& pk) {
return clustering_key::from_exploded(*_s, {
data_value(sstable_name).serialize_nonnull(),
reversed_type_impl::get_instance(long_type)->decompose(partition_size),
data_value(pk).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Phase 1: Determine which (ks, table) partitions this shard owns.
// Table metadata is shared across all shards, so local iteration is sufficient.
struct table_info {
table_id tid;
dht::decorated_key dk;
};
std::vector<table_info> owned_tables;
auto& db = _db.local();
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
auto& ks_name = table->schema()->ks_name();
auto& cf_name = table->schema()->cf_name();
auto dk = make_partition_key(ks_name, cf_name);
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
owned_tables.push_back(table_info{tid, std::move(dk)});
}
});
// Sort by token order — streaming_virtual_table requires
// partitions to be emitted in token order.
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
std::mem_fn(&table_info::dk));
// Phase 2: For each owned table, collect records from ALL shards
// in parallel, then sort and emit immediately. We process one
// table at a time to bound memory usage: once we have emitted
// page_size records we stop and let CQL paging fetch the rest.
static constexpr size_t page_size = 1000;
size_t emitted = 0;
for (auto& ti : owned_tables) {
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
co_return co_await collect_local_records(*table);
}
co_return std::vector<record>{};
});
std::vector<record> records;
for (auto& shard_records : per_shard) {
records.insert(records.end(),
std::make_move_iterator(shard_records.begin()),
std::make_move_iterator(shard_records.end()));
}
if (records.empty()) {
continue;
}
// Sort by (sstable_name ASC, partition_size DESC, partition_key ASC) to match clustering order
std::ranges::sort(records, [] (const record& a, const record& b) {
if (a.sstable_name != b.sstable_name) {
return a.sstable_name < b.sstable_name;
} else if (a.partition_size != b.partition_size) {
return a.partition_size > b.partition_size; // DESC
}
return a.partition_key < b.partition_key;
});
// A partition that exceeds both the size and the row-count
// thresholds produces two LargeDataRecords entries (one per
// type) that map to identical clustering keys. Remove the
// duplicates so that every emitted row has a unique key.
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
return a.sstable_name == b.sstable_name
&& a.partition_size == b.partition_size
&& a.partition_key == b.partition_key;
}), records.end());
co_await result.emit_partition_start(ti.dk);
for (const auto& rec : records) {
clustering_row cr(make_clustering_key(rec.sstable_name, rec.partition_size, rec.partition_key));
if (rec.compaction_time != 0) {
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
}
if (rec.rows != 0) {
set_cell(cr.cells(), "rows", rec.rows);
}
if (rec.range_tombstones != 0) {
set_cell(cr.cells(), "range_tombstones", rec.range_tombstones);
}
if (rec.dead_rows != 0) {
set_cell(cr.cells(), "dead_rows", rec.dead_rows);
}
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
emitted += records.size();
if (emitted >= page_size) {
break;
}
}
}
};
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
// Shows row_size records from all live SSTables, matching the
// schema of the legacy system.large_rows CQL table.
class large_rows_virtual_table : public streaming_virtual_table {
sharded<replica::database>& _db;
struct record {
sstring sstable_name;
int64_t row_size;
sstring partition_key;
sstring clustering_key;
int64_t compaction_time;
};
// Extract row_size records from a single table's SSTables on the local shard.
static future<std::vector<record>> collect_local_records(replica::table& table) {
std::vector<record> records;
auto table_schema = table.schema();
auto sstables = co_await table.take_sstable_set_snapshot();
for (const auto& sst : sstables) {
auto& records_opt = sst->get_large_data_records();
if (!records_opt) {
auto stat = sst->get_large_data_stat(sstables::large_data_type::row_size);
if (stat && stat->above_threshold > 0) {
records.push_back(record{
.sstable_name = sst->component_basename(sstables::component_type::Data),
.row_size = static_cast<int64_t>(stat->max_value),
.partition_key = "(details unavailable - legacy SSTable)",
.clustering_key = "",
.compaction_time = compaction_time_from_generation(*sst),
});
}
continue;
}
auto compaction_time = compaction_time_from_generation(*sst);
auto sst_name = sst->component_basename(sstables::component_type::Data);
for (const auto& rec : records_opt->elements) {
if (rec.type != sstables::large_data_type::row_size) {
continue;
}
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
auto ck_str = rec.clustering_key.value.empty()
? sstring()
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
records.push_back(record{
.sstable_name = sst_name,
.row_size = static_cast<int64_t>(rec.value),
.partition_key = key_to_str(pk, *table_schema),
.clustering_key = std::move(ck_str),
.compaction_time = compaction_time,
});
}
}
co_return records;
}
public:
explicit large_rows_virtual_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_ROWS, 1);
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_ROWS, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
.with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
.with_column("partition_key", utf8_type, column_kind::clustering_key)
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
.with_column("compaction_time", timestamp_type)
.set_comment("rows larger than specified threshold")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(const sstring& sstable_name, int64_t row_size,
const sstring& pk, const sstring& ck) {
return clustering_key::from_exploded(*_s, {
data_value(sstable_name).serialize_nonnull(),
reversed_type_impl::get_instance(long_type)->decompose(row_size),
data_value(pk).serialize_nonnull(),
data_value(ck).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct table_info {
table_id tid;
dht::decorated_key dk;
};
std::vector<table_info> owned_tables;
auto& db = _db.local();
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
auto& ks_name = table->schema()->ks_name();
auto& cf_name = table->schema()->cf_name();
auto dk = make_partition_key(ks_name, cf_name);
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
owned_tables.push_back(table_info{tid, std::move(dk)});
}
});
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
std::mem_fn(&table_info::dk));
static constexpr size_t page_size = 1000;
size_t emitted = 0;
for (auto& ti : owned_tables) {
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
co_return co_await collect_local_records(*table);
}
co_return std::vector<record>{};
});
std::vector<record> records;
for (auto& shard_records : per_shard) {
records.insert(records.end(),
std::make_move_iterator(shard_records.begin()),
std::make_move_iterator(shard_records.end()));
}
if (records.empty()) {
continue;
}
std::ranges::sort(records, [] (const record& a, const record& b) {
if (a.sstable_name != b.sstable_name) {
return a.sstable_name < b.sstable_name;
} else if (a.row_size != b.row_size) {
return a.row_size > b.row_size; // DESC
} else if (a.partition_key != b.partition_key) {
return a.partition_key < b.partition_key;
}
return a.clustering_key < b.clustering_key;
});
co_await result.emit_partition_start(ti.dk);
for (const auto& rec : records) {
clustering_row cr(make_clustering_key(rec.sstable_name, rec.row_size, rec.partition_key, rec.clustering_key));
if (rec.compaction_time != 0) {
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
}
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
emitted += records.size();
if (emitted >= page_size) {
break;
}
}
}
};
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
// Shows cell_size records from all live SSTables, matching the
// schema of the legacy system.large_cells CQL table.
class large_cells_virtual_table : public streaming_virtual_table {
sharded<replica::database>& _db;
struct record {
sstring sstable_name;
int64_t cell_size;
sstring partition_key;
sstring clustering_key;
sstring column_name;
int64_t collection_elements; // -1 means not applicable
int64_t compaction_time;
};
// Extract cell_size records from a single table's SSTables on the local shard.
static future<std::vector<record>> collect_local_records(replica::table& table) {
std::vector<record> records;
auto table_schema = table.schema();
auto sstables = co_await table.take_sstable_set_snapshot();
for (const auto& sst : sstables) {
auto& records_opt = sst->get_large_data_records();
if (!records_opt) {
auto stat = sst->get_large_data_stat(sstables::large_data_type::cell_size);
auto elem_stat = sst->get_large_data_stat(sstables::large_data_type::elements_in_collection);
if ((stat && stat->above_threshold > 0) || (elem_stat && elem_stat->above_threshold > 0)) {
records.push_back(record{
.sstable_name = sst->component_basename(sstables::component_type::Data),
.cell_size = static_cast<int64_t>(stat ? stat->max_value : 0),
.partition_key = "(details unavailable - legacy SSTable)",
.clustering_key = "",
.column_name = "",
.collection_elements = elem_stat ? static_cast<int64_t>(elem_stat->max_value) : int64_t(-1),
.compaction_time = compaction_time_from_generation(*sst),
});
}
continue;
}
auto compaction_time = compaction_time_from_generation(*sst);
auto sst_name = sst->component_basename(sstables::component_type::Data);
for (const auto& rec : records_opt->elements) {
if (rec.type != sstables::large_data_type::cell_size &&
rec.type != sstables::large_data_type::elements_in_collection) {
continue;
}
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
auto ck_str = rec.clustering_key.value.empty()
? sstring()
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
records.push_back(record{
.sstable_name = sst_name,
.cell_size = static_cast<int64_t>(rec.value),
.partition_key = key_to_str(pk, *table_schema),
.clustering_key = std::move(ck_str),
.column_name = disk_string_to_sstring(rec.column_name),
.collection_elements = rec.elements_count > 0
? static_cast<int64_t>(rec.elements_count) : int64_t(-1),
.compaction_time = compaction_time,
});
}
}
co_return records;
}
public:
explicit large_cells_virtual_table(sharded<replica::database>& db)
: streaming_virtual_table(build_schema())
, _db(db)
{
_shard_aware = true;
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_CELLS, 1);
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_CELLS, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
.with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
.with_column("partition_key", utf8_type, column_kind::clustering_key)
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
.with_column("column_name", utf8_type, column_kind::clustering_key)
.with_column("collection_elements", long_type)
.with_column("compaction_time", timestamp_type)
.set_comment("cells larger than specified threshold")
.with_hash_version()
.build();
}
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
data_value(ks_name).serialize_nonnull(),
data_value(cf_name).serialize_nonnull()
}));
}
clustering_key make_clustering_key(const sstring& sstable_name, int64_t cell_size,
const sstring& pk, const sstring& ck, const sstring& col_name) {
return clustering_key::from_exploded(*_s, {
data_value(sstable_name).serialize_nonnull(),
reversed_type_impl::get_instance(long_type)->decompose(cell_size),
data_value(pk).serialize_nonnull(),
data_value(ck).serialize_nonnull(),
data_value(col_name).serialize_nonnull()
});
}
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
struct table_info {
table_id tid;
dht::decorated_key dk;
};
std::vector<table_info> owned_tables;
auto& db = _db.local();
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
auto& ks_name = table->schema()->ks_name();
auto& cf_name = table->schema()->cf_name();
auto dk = make_partition_key(ks_name, cf_name);
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
owned_tables.push_back(table_info{tid, std::move(dk)});
}
});
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
std::mem_fn(&table_info::dk));
static constexpr size_t page_size = 1000;
size_t emitted = 0;
for (auto& ti : owned_tables) {
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
co_return co_await collect_local_records(*table);
}
co_return std::vector<record>{};
});
std::vector<record> records;
for (auto& shard_records : per_shard) {
records.insert(records.end(),
std::make_move_iterator(shard_records.begin()),
std::make_move_iterator(shard_records.end()));
}
if (records.empty()) {
continue;
}
std::ranges::sort(records, [] (const record& a, const record& b) {
if (a.sstable_name != b.sstable_name) {
return a.sstable_name < b.sstable_name;
} else if (a.cell_size != b.cell_size) {
return a.cell_size > b.cell_size; // DESC
} else if (a.partition_key != b.partition_key) {
return a.partition_key < b.partition_key;
} else if (a.clustering_key != b.clustering_key) {
return a.clustering_key < b.clustering_key;
}
return a.column_name < b.column_name;
});
// A collection cell that exceeds both the size and the
// element-count thresholds produces two LargeDataRecords
// entries that map to identical clustering keys. Remove the
// duplicates so that every emitted row has a unique key.
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
return a.sstable_name == b.sstable_name
&& a.cell_size == b.cell_size
&& a.partition_key == b.partition_key
&& a.clustering_key == b.clustering_key
&& a.column_name == b.column_name;
}), records.end());
co_await result.emit_partition_start(ti.dk);
for (const auto& rec : records) {
clustering_row cr(make_clustering_key(rec.sstable_name, rec.cell_size,
rec.partition_key, rec.clustering_key, rec.column_name));
if (rec.compaction_time != 0) {
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
}
if (rec.collection_elements >= 0) {
set_cell(cr.cells(), "collection_elements", rec.collection_elements);
}
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
emitted += records.size();
if (emitted >= page_size) {
break;
}
}
}
};
}
// Helper: register a virtual table on the local shard.
static future<> add_virtual_table(
sharded<db::system_keyspace>& sys_ks,
sharded<replica::database>& dist_db,
sharded<service::storage_service>& dist_ss,
std::unique_ptr<virtual_table>&& tbl) {
auto& virtual_tables = *sys_ks.local().get_virtual_tables_registry();
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
}
future<> initialize_virtual_tables(
@@ -1469,43 +2061,86 @@ future<> initialize_virtual_tables(
sharded<db::system_keyspace>& sys_ks,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<netw::messaging_service>& ms,
db::config& cfg) {
db::config& cfg,
gms::feature_service& feat) {
co_await smp::invoke_on_all([&] () -> future<> {
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
auto& virtual_tables = *virtual_tables_registry;
auto& db = dist_db.local();
auto& ss = dist_ss.local();
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
auto schema = tbl->schema();
virtual_tables[schema->id()] = std::move(tbl);
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
auto& cf = db.find_column_family(schema);
cf.mark_ready_for_writes(nullptr);
auto& vt = virtual_tables[schema->id()];
cf.set_virtual_reader(vt->as_mutation_source());
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
co_await add_virtual_table(sys_ks, dist_db, dist_ss, std::move(tbl));
};
auto& db = dist_db.local();
// Add built-in virtual tables here.
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
co_await add_table(std::make_unique<token_ring_table>(db, ss));
co_await add_table(std::make_unique<token_ring_table>(db, dist_ss.local()));
co_await add_table(std::make_unique<snapshots_table>(dist_db));
co_await add_table(std::make_unique<protocol_servers_table>(ss));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
co_await add_table(std::make_unique<protocol_servers_table>(dist_ss.local()));
co_await add_table(std::make_unique<runtime_info_table>(dist_db, dist_ss.local()));
co_await add_table(std::make_unique<versions_table>());
co_await add_table(std::make_unique<db_config_table>(cfg));
co_await add_table(std::make_unique<clients_table>(ss));
co_await add_table(std::make_unique<clients_table>(dist_ss.local()));
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
co_await add_table(std::make_unique<cdc_timestamps_table>(db, dist_ss.local()));
co_await add_table(std::make_unique<cdc_streams_table>(db, dist_ss.local()));
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
});
// Drop old physical system.large_* tables and register virtual
// replacements. Must run on shard 0 (uses cross-shard operations).
auto activate_large_data_virtual_tables = [&dist_db, &sys_ks, &dist_ss] () -> future<> {
// Drop old physical system.large_* tables. Their data is now
// served from SSTable metadata via virtual tables.
// TODO: In a follow-up, read existing large data entries from
// the old tables and populate per-sstable LargeDataRecords
// metadata via component_rewrite before dropping.
for (auto table_name : {
db::system_keyspace::LARGE_PARTITIONS,
db::system_keyspace::LARGE_ROWS,
db::system_keyspace::LARGE_CELLS}) {
try {
co_await replica::database::legacy_drop_table_on_all_shards(
dist_db, sys_ks, db::system_keyspace::NAME, table_name, false);
} catch (const replica::no_such_column_family&) {
// Already dropped (e.g. restart after feature was enabled).
}
}
// Add virtual tables on all shards.
co_await smp::invoke_on_all([&dist_db, &sys_ks, &dist_ss] () -> future<> {
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
std::make_unique<large_partitions_virtual_table>(dist_db));
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
std::make_unique<large_rows_virtual_table>(dist_db));
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
std::make_unique<large_cells_virtual_table>(dist_db));
});
};
if (feat.large_data_virtual_tables) {
// Feature already enabled (e.g. test environment or restart after
// upgrade). Activate directly as a coroutine — no seastar::async
// context needed.
co_await activate_large_data_virtual_tables();
} else {
// Feature not yet enabled. Register a callback that will fire
// when the feature is enabled during rolling upgrade. The callback
// runs inside seastar::async context (via feature_service::enable),
// so .get() is safe.
//
// The listener_registration must outlive the feature, so we store
// it in a static variable (process lifetime). This function is
// only called on shard 0, so the listener fires only on shard 0.
static gms::feature::listener_registration large_data_vt_listener;
large_data_vt_listener = feat.large_data_virtual_tables.when_enabled(
[activate_large_data_virtual_tables = std::move(activate_large_data_virtual_tables)] {
activate_large_data_virtual_tables().get();
});
}
}
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {

View File

@@ -24,6 +24,7 @@ class tablet_allocator;
}
namespace gms {
class feature_service;
class gossiper;
}
@@ -44,7 +45,8 @@ future<> initialize_virtual_tables(
sharded<db::system_keyspace>&,
sharded<service::tablet_allocator>&,
sharded<netw::messaging_service>&,
db::config&);
db::config&,
gms::feature_service&);
class virtual_table;

View File

@@ -128,6 +128,79 @@ Expiring Data
In order to prevent stale data from appearing, all rows in ``system.large_partitions`` table are inserted with Time To Live (TTL) equal to 30 days.
.. _large-data-virtual-tables:
Virtual Tables (``LARGE_DATA_VIRTUAL_TABLES`` Feature)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Starting with ScyllaDB 2026.2, the ``system.large_partitions``,
``system.large_rows``, and ``system.large_cells`` tables are implemented as
**virtual tables** that read directly from metadata stored inside each SSTable
rather than from separate physical system tables.
How it works
""""""""""""
When an SSTable is written (during flush or compaction), the SSTable writer
records the top-N above-threshold large data entries into a
``LargeDataRecords`` component in the SSTable's ``.Scylla`` metadata file.
The virtual tables query this metadata from all live SSTables at read time,
so large data records automatically follow their SSTables when they are
migrated between shards or nodes (e.g. during tablet migration or repair).
The maximum number of records stored per large data type per SSTable is
controlled by the ``compaction_large_data_records_per_sstable`` configuration
option (default: 10).
Activation
""""""""""
The transition is gated by the ``LARGE_DATA_VIRTUAL_TABLES`` cluster feature
flag, which is automatically enabled once all nodes in the cluster are
upgraded to a version that supports it.
When the feature is enabled on a node:
1. The old physical ``system.large_partitions``, ``system.large_rows``, and
``system.large_cells`` tables are dropped.
2. Virtual table replacements are registered in their place.
3. New SSTable writes stop populating the old physical tables.
During a rolling upgrade, as long as any node in the cluster has not been
upgraded, the feature remains disabled and all nodes continue writing to
the old physical tables. This ensures safe rollback if the upgrade needs
to be reverted.
Upgrade considerations
""""""""""""""""""""""
After the ``LARGE_DATA_VIRTUAL_TABLES`` feature is enabled, the virtual tables
derive their content from ``LargeDataRecords`` metadata stored in SSTables.
SSTables written by older versions do not contain this metadata. As a result,
**the virtual tables may appear empty until those older SSTables are rewritten**
by compaction.
To populate the virtual tables promptly after upgrade, run one of the following
on each node:
* **Upgrade SSTables compaction** (recommended -- rewrites SSTables without
waiting for normal compaction triggers):
.. code-block:: console
nodetool upgradesstables --include-all-sstables
* **Major compaction** (rewrites all SSTables into a single set, but may
temporarily increase disk usage):
.. code-block:: console
nodetool compact
Once the SSTables have been rewritten, the virtual tables will show the
current large data records.
.. include:: /troubleshooting/_common/ts-return.rst
Additional Resources

View File

@@ -170,4 +170,12 @@ Expiring Data
^^^^^^^^^^^^^
In order to prevent stale data from appearing, all rows in the ``system.large_rows`` and ``system.large_cells`` tables are inserted with Time To Live (TTL) equal to 30 days.
Virtual Tables
^^^^^^^^^^^^^^
Starting with ScyllaDB 2026.2, ``system.large_rows`` and ``system.large_cells``
are implemented as virtual tables that read from SSTable metadata. See
:ref:`large-data-virtual-tables` for details on the
``LARGE_DATA_VIRTUAL_TABLES`` feature and upgrade considerations.
.. include:: /troubleshooting/_common/ts-return.rst

View File

@@ -181,6 +181,7 @@ public:
gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv };
gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv };
gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv };
gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -1963,7 +1963,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
checkpoint(stop_signal, "initializing virtual tables");
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg).get();
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg, feature_service.local()).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });

View File

@@ -1306,9 +1306,20 @@ bool operator==(const column_definition& x, const column_definition& y)
}
// Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId
//
// The version suffix (e.g. "_v1") allows creating a table with a
// different UUID than the original (version 0). This is used when
// replacing a physical table with a virtual table of the same name:
// the virtual table uses version 1 so its UUID differs from the old
// physical table, allowing the old table to be properly dropped by
// UUID before the replacement is registered.
table_id
generate_legacy_id(const sstring& ks_name, const sstring& cf_name) {
return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name));
generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version) {
auto name = ks_name + cf_name;
if (version > 0) {
name += fmt::format("_v{}", version);
}
return table_id(utils::UUID_gen::get_name_UUID(name));
}
schema_builder& schema_builder::set_compaction_strategy_options(std::map<sstring, sstring>&& options) {

View File

@@ -1075,7 +1075,7 @@ public:
std::ostream& operator<<(std::ostream& os, const view_ptr& view);
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name);
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version = 0);
// Thrown when attempted to access a schema-dependent object using

View File

@@ -90,6 +90,9 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
// Check the only the large row is added to system.large_rows.
BOOST_REQUIRE_EQUAL(rows.size(), 1);
auto row0 = rows[0];
// The result has 3 columns: partition_key, row_size, and table_name
// (CQL adds the filtered partition key column to the result set when
// using ALLOW FILTERING with a partial partition key restriction).
BOOST_REQUIRE_EQUAL(row0.size(), 3);
BOOST_REQUIRE_EQUAL(to_bytes(*row0[0]), "44");
BOOST_REQUIRE_EQUAL(to_bytes(*row0[2]), "tbl");
@@ -118,10 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
e.execute_cql("delete from tbl where a = 44;").get();
// In order to guarantee that system.large_rows, system.large_cells and system.large_partitions have been updated, we have to
// In order to guarantee that system.large_rows, system.large_cells and
// system.large_partitions are empty, we need to:
// * flush, so that a tombstone for the above delete is created.
// * do a major compaction, so that the tombstone is combined with the old entry,
// and the old sstable is deleted.
// and the old sstable (which holds the large data records in its metadata) is deleted.
flush(e);
e.db().invoke_on_all([] (replica::database& dbi) {
return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr<replica::table> t) {
@@ -165,6 +169,71 @@ SEASTAR_THREAD_TEST_CASE(test_large_row_count_warning) {
}, cfg).get();
}
// Test that when a partition exceeds both the size threshold and the row
// count threshold, system.large_partitions still returns a single row
// (not two rows with the same clustering key).
SEASTAR_THREAD_TEST_CASE(test_large_partitions_dual_threshold) {
auto cfg = make_shared<db::config>();
// Set very low thresholds so that a single partition with a handful
// of rows containing modest data exceeds both size and row-count
// thresholds simultaneously.
cfg->compaction_large_partition_warning_threshold_mb(1);
cfg->compaction_rows_count_warning_threshold(10);
do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create table tbl (a int, b int, c text, primary key (a, b))").get();
// Insert enough rows with enough data to exceed 1 MB partition size
// AND more than 10 rows.
sstring blob(128 * 1024, 'x'); // 128 KB per row
for (int i = 0; i < 11; ++i) {
e.execute_cql(format("insert into tbl (a, b, c) values (42, {}, '{}');", i, blob)).get();
}
flush(e);
// There must be exactly one row in system.large_partitions for
// this table. Before the fix, two rows with the same clustering
// key would have been emitted.
assert_that(e.execute_cql(
"select partition_key, rows from system.large_partitions "
"where table_name = 'tbl' allow filtering;").get())
.is_rows()
.with_size(1);
return make_ready_future<>();
}, cfg).get();
}
// Test that when a collection cell exceeds both the cell size threshold
// and the collection element count threshold, system.large_cells still
// returns a single row (not two rows with the same clustering key).
SEASTAR_THREAD_TEST_CASE(test_large_cells_dual_threshold) {
auto cfg = make_shared<db::config>();
// Set very low thresholds so that a collection with modest elements
// exceeds both size and element-count thresholds simultaneously.
cfg->compaction_large_cell_warning_threshold_mb(1);
cfg->compaction_collection_elements_count_warning_threshold(10);
do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create table tbl (a int, b list<text>, primary key (a))").get();
// Insert 128 KB blobs, 11 times -> ~1.4 MB total, exceeding 1 MB
// threshold. Also exceeds 10 element threshold.
sstring blob(128 * 1024, 'x');
for (int i = 0; i < 11; ++i) {
e.execute_cql("update tbl set b = ['" + blob + "'] + b where a = 42;").get();
}
flush(e);
// There must be exactly one row in system.large_cells for this
// table. Before the fix, two rows with the same clustering key
// would have been emitted.
assert_that(e.execute_cql(
"select partition_key, column_name from system.large_cells "
"where table_name = 'tbl' allow filtering;").get())
.is_rows()
.with_size(1);
return make_ready_future<>();
}, cfg).get();
}
SEASTAR_TEST_CASE(test_insert_large_collection_values) {
return do_with_cql_env([] (cql_test_env& e) {
return seastar::async([&e] {

View File

@@ -5092,70 +5092,34 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) {
}
namespace {
struct large_row_handler : public db::large_data_handler {
using callback_t = std::function<void(const schema& s, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements)>;
callback_t callback;
large_row_handler(uint64_t large_rows_threshold, uint64_t rows_count_threshold, uint64_t cell_threshold_bytes, uint64_t collection_elements_threshold, callback_t callback)
: large_data_handler(std::numeric_limits<uint64_t>::max(), large_rows_threshold, cell_threshold_bytes,
rows_count_threshold, collection_elements_threshold)
, callback(std::move(callback)) {
// A large_data_handler with configurable thresholds for testing.
// After refactoring, the handler only does threshold comparison + logging;
// actual large data recording goes into the SSTable's LargeDataRecords metadata.
struct large_data_test_handler : public db::large_data_handler {
large_data_test_handler(uint64_t partition_threshold_bytes, uint64_t large_rows_threshold,
uint64_t cell_threshold_bytes, uint64_t rows_count_threshold,
uint64_t collection_elements_threshold)
: large_data_handler(partition_threshold_bytes, large_rows_threshold, cell_threshold_bytes,
rows_count_threshold, collection_elements_threshold) {
start();
}
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) const override {
const schema_ptr s = sst.get_schema();
callback(*s, partition_key, clustering_key, row_size, 0, 0, nullptr, 0, 0);
return make_ready_future<>();
}
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override {
const schema_ptr s = sst.get_schema();
callback(*s, partition_key, clustering_key, 0, 0, 0, &cdef, cell_size, collection_elements);
return make_ready_future<>();
}
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t partition_size, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows_count) const override {
const schema_ptr s = sst.get_schema();
callback(*s, partition_key, nullptr, rows_count, range_tombstones_count, dead_rows_count, nullptr, 0, 0);
return make_ready_future<>();
}
virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view) const override {
return make_ready_future<>();
}
virtual future<> update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const override {
return make_ready_future<>();
}
protected:
future<> record_large_cells(const sstables::sstable&, const sstables::key&,
const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { return make_ready_future<>(); }
future<> record_large_rows(const sstables::sstable&, const sstables::key&,
const clustering_key_prefix*, uint64_t) const override { return make_ready_future<>(); }
future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { return make_ready_future<>(); }
future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { return make_ready_future<>(); }
future<> record_large_partitions(const sstables::sstable&, const sstables::key&,
uint64_t, uint64_t, uint64_t, uint64_t) const override { return make_ready_future<>(); }
};
}
static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
unsigned i = 0;
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
BOOST_REQUIRE_LT(i, expected.size());
BOOST_REQUIRE_GT(row_size, threshold);
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
BOOST_REQUIRE_EQUAL(dead_rows, 0);
if (clustering_key) {
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
} else {
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
}
++i;
};
large_row_handler handler(threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(), threshold,
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max());
sstables::test_env::do_with_async([&] (auto& env) {
auto sst = env.make_sstable(s, version);
@@ -5165,14 +5129,50 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
// depends on the encoding statistics (because of variable-length encoding). The original values
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
BOOST_REQUIRE_EQUAL(i, expected.size());
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
auto& records_opt = sst->get_large_data_records();
// Collect row_size records (order is not guaranteed, so just count and validate).
unsigned row_size_count = 0;
unsigned static_row_count = 0;
unsigned clustering_row_count = 0;
if (records_opt) {
for (auto& rec : records_opt->elements) {
if (rec.type != large_data_type::row_size) {
continue;
}
BOOST_REQUIRE_GT(rec.value, threshold);
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
if (rec_ck_str.empty()) {
++static_row_count;
} else {
++clustering_row_count;
}
++row_size_count;
}
}
// Count expected static rows (nullptr entries) and clustering rows.
unsigned expected_static = 0;
unsigned expected_clustering = 0;
for (auto* ck : expected) {
if (ck) {
++expected_clustering;
} else {
++expected_static;
}
}
BOOST_REQUIRE_EQUAL(row_size_count, expected.size());
BOOST_REQUIRE_EQUAL(static_row_count, expected_static);
BOOST_REQUIRE_EQUAL(clustering_row_count, expected_clustering);
}, { &handler }).get();
}
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5190,26 +5190,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
unsigned i = 0;
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
BOOST_TEST_MESSAGE(format("i={} ck={} cell_size={} threshold={}", i, clustering_key ? format("{}", *clustering_key) : "null", cell_size, threshold));
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
BOOST_REQUIRE_LT(i, expected.size());
BOOST_REQUIRE_GT(cell_size, threshold);
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
BOOST_REQUIRE_EQUAL(dead_rows, 0);
if (clustering_key) {
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
} else {
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
}
++i;
};
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), f);
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), threshold,
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max());
sstables::test_env::do_with_async([&] (auto& env) {
auto sst = env.make_sstable(s, version);
@@ -5219,14 +5202,50 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
// depends on the encoding statistics (because of variable-length encoding). The original values
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
BOOST_REQUIRE_EQUAL(i, expected.size());
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
auto& records_opt = sst->get_large_data_records();
// Collect cell_size records (order is not guaranteed, so just count and validate).
unsigned cell_size_count = 0;
unsigned static_cell_count = 0;
unsigned clustering_cell_count = 0;
if (records_opt) {
for (auto& rec : records_opt->elements) {
if (rec.type != large_data_type::cell_size) {
continue;
}
BOOST_REQUIRE_GT(rec.value, threshold);
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
if (rec_ck_str.empty()) {
++static_cell_count;
} else {
++clustering_cell_count;
}
++cell_size_count;
}
}
// Count expected static cells (nullptr entries) and clustering cells.
unsigned expected_static = 0;
unsigned expected_clustering = 0;
for (auto* ck : expected) {
if (ck) {
++expected_clustering;
} else {
++expected_static;
}
}
BOOST_REQUIRE_EQUAL(cell_size_count, expected.size());
BOOST_REQUIRE_EQUAL(static_cell_count, expected_static);
BOOST_REQUIRE_EQUAL(clustering_cell_count, expected_clustering);
}, { &handler }).get();
}
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
mutation partition = s.new_mutation("pv");
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation partition(s.schema(), s.make_pkey());
const partition_key& pk = partition.key();
s.add_static_row(partition, "foo bar zed");
@@ -5245,8 +5264,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
mutation p = s.new_mutation("pv");
const partition_key& pk = p.key();
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
for (auto idx = 0; idx < rows - 1; idx++) {
sv += "foo ";
@@ -5259,27 +5278,27 @@ static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uin
schema_ptr sc = s.schema();
auto mt = make_memtable(sc, {p});
bool logged = false;
auto f = [&logged, &pk, &threshold, &rows, &range_tombstones](const schema& sc, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
BOOST_REQUIRE_GT(rows_count, threshold);
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
BOOST_REQUIRE_EQUAL(dead_rows, 0);
// Inserting one range tombstone creates two range tombstone marker rows
BOOST_REQUIRE_EQUAL(rows_count, rows + range_tombstones * 2);
BOOST_REQUIRE_EQUAL(range_tombstones_count, range_tombstones * 2);
logged = true;
};
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
// rows_count_threshold = threshold; all other thresholds at MAX.
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
threshold, std::numeric_limits<uint64_t>::max());
sstables::test_env::do_with_async([&] (auto& env) {
auto sst = env.make_sstable(sc, version);
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
BOOST_REQUIRE_EQUAL(logged, expected);
auto& records_opt = sst->get_large_data_records();
bool found = false;
if (records_opt) {
for (auto& rec : records_opt->elements) {
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
found = true;
}
}
}
BOOST_REQUIRE_EQUAL(found, expected);
}, { &handler }).get();
}
@@ -5307,8 +5326,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
mutation p = s.new_mutation("pv");
const partition_key& pk = p.key();
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
sstring sv;
int live_rows = 0;
int expected_dead_rows = 0;
@@ -5355,7 +5374,6 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
}
auto ck_start = ck;
auto rt_start = bound_view(ck_start, tests::random::get_bool() ? bound_kind::incl_start : bound_kind::excl_start);
auto sv_end = sv;
auto rt_size = tests::random::get_int(1, 10);
for (auto i = 0; i < rt_size; i++) {
sv += "X";
@@ -5374,25 +5392,27 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
auto mt = make_lw_shared<replica::memtable>(sc);
mt->apply(p);
bool logged = false;
auto f = [&] (const schema& sc, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
BOOST_REQUIRE_GT(rows_count, threshold);
BOOST_REQUIRE_EQUAL(rows_count, rows);
BOOST_REQUIRE_EQUAL(dead_rows, expected_dead_rows + expected_expired_rows + expected_rows_with_dead_cell);
BOOST_REQUIRE_EQUAL(range_tombstones, expected_range_tombstones);
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
logged = true;
};
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
// rows_count_threshold = threshold; all other thresholds at MAX.
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
threshold, std::numeric_limits<uint64_t>::max());
sstables::test_env::do_with_async([&] (auto& env) {
auto sst = env.make_sstable(sc, version);
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
BOOST_REQUIRE_EQUAL(logged, expected);
auto& records_opt = sst->get_large_data_records();
bool found = false;
if (records_opt) {
for (auto& rec : records_opt->elements) {
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
found = true;
}
}
}
BOOST_REQUIRE_EQUAL(found, expected);
}, { &handler }).get();
}
@@ -5416,8 +5436,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
tests::reader_concurrency_semaphore_wrapper semaphore;
mutation p = s.new_mutation("pv");
const partition_key& pk = p.key();
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
mutation p(s.schema(), s.make_pkey());
std::map<bytes, bytes> kv_map;
for (auto i = 0; i < elements; i++) {
kv_map[to_bytes(format("key{}", i))] = to_bytes(format("val{}", i));
@@ -5426,25 +5446,28 @@ static void test_sstable_too_many_collection_elements_f(int elements, uint64_t t
schema_ptr sc = s.schema();
auto mt = make_memtable(sc, {p});
bool logged = false;
auto f = [&logged, &pk, &threshold](const schema& sc, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
BOOST_REQUIRE_GT(collection_elements, threshold);
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
BOOST_REQUIRE_EQUAL(dead_rows, 0);
logged = true;
};
BOOST_TEST_MESSAGE(format("elements={} threshold={} expected={}", elements, threshold, expected));
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, f);
// collection_elements_threshold = threshold; all other thresholds at MAX.
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
std::numeric_limits<uint64_t>::max(), threshold);
sstables::test_env::do_with_async([&] (auto& env) {
auto sst = env.make_sstable(sc, version);
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
BOOST_REQUIRE_EQUAL(logged, expected);
auto& records_opt = sst->get_large_data_records();
bool found = false;
if (records_opt) {
for (auto& rec : records_opt->elements) {
if (rec.type == large_data_type::elements_in_collection && rec.elements_count > threshold) {
found = true;
}
}
}
BOOST_REQUIRE_EQUAL(found, expected);
}, { &handler }).get();
}
@@ -5465,40 +5488,10 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_too_many_collection_elements) {
}
// Handler for testing LargeDataRecords population in SSTable metadata.
// Unlike large_row_handler above, this one allows setting the partition_threshold_bytes too.
// large_data_test_handler already serves this purpose; keep this alias for
// test readability / backward compatibility.
namespace {
struct large_data_records_handler : public db::large_data_handler {
large_data_records_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes,
uint64_t cell_threshold_bytes, uint64_t rows_count_threshold,
uint64_t collection_elements_threshold)
: large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes,
rows_count_threshold, collection_elements_threshold) {
start();
}
virtual future<> record_large_rows(const sstables::sstable&, const sstables::key&,
const clustering_key_prefix*, uint64_t) const override {
return make_ready_future<>();
}
virtual future<> record_large_cells(const sstables::sstable&, const sstables::key&,
const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override {
return make_ready_future<>();
}
virtual future<> record_large_partitions(const sstables::sstable&, const sstables::key&,
uint64_t, uint64_t, uint64_t, uint64_t) const override {
return make_ready_future<>();
}
virtual future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override {
return make_ready_future<>();
}
virtual future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override {
return make_ready_future<>();
}
};
using large_data_records_handler = large_data_test_handler;
}
// Test that writing an SSTable with data exceeding large data thresholds

View File

@@ -1008,7 +1008,7 @@ private:
_mnotifier.local().unregister_listener(&_ss.local()).get();
});
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg).get();
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg, _feature_service.local()).get();
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,