mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 03:20:37 +00:00
db: implement large_data virtual tables with feature flag gating
Replace the physical system.large_partitions, system.large_rows, and system.large_cells CQL tables with virtual tables that read from LargeDataRecords stored in SSTable scylla metadata (tag 13). The transition is gated by a new LARGE_DATA_VIRTUAL_TABLES cluster feature flag: - Before the feature is enabled: the old physical tables remain in all_tables(), CQL writes are active, no virtual tables are registered. This ensures safe rollback during rolling upgrades. - After the feature is enabled: old physical tables are dropped from disk via legacy_drop_table_on_all_shards(), virtual tables are registered on all shards, and CQL writes are skipped via skip_cql_writes() in cql_table_large_data_handler. Key implementation details: - Three virtual table classes (large_partitions_virtual_table, large_rows_virtual_table, large_cells_virtual_table) extend streaming_virtual_table with cross-shard record collection. - generate_legacy_id() gains a version parameter; virtual tables use version 1 to get different UUIDs than the old physical tables. - compaction_time is derived from SSTable generation UUID at display time via UUID_gen::unix_timestamp(). - Legacy SSTables without LargeDataRecords emit synthetic summary rows based on above_threshold > 0 in LargeDataStats. - The activation logic uses two paths: when the feature is already enabled (test env, restart), it runs as a coroutine; when not yet enabled, it registers a when_enabled callback that runs inside seastar::async from feature_service::enable(). - sstable_3_x_test updated to use a simplified large_data_test_handler and validate LargeDataRecords in SSTable metadata directly.
This commit is contained in:
@@ -142,6 +142,10 @@ future<> large_data_handler::maybe_update_large_data_entries_sstable_name(sstabl
|
||||
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
|
||||
}
|
||||
|
||||
bool cql_table_large_data_handler::skip_cql_writes() const {
|
||||
return bool(_feat.large_data_virtual_tables);
|
||||
}
|
||||
|
||||
cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& feat,
|
||||
utils::updateable_value<uint32_t> partition_threshold_mb,
|
||||
utils::updateable_value<uint32_t> row_threshold_mb,
|
||||
@@ -180,6 +184,9 @@ future<> cql_table_large_data_handler::do_insert_large_data_entry(std::string_vi
|
||||
sstring ks_name, sstring cf_name, sstring sstable_name,
|
||||
int64_t size, sstring partition_key, db_clock::time_point compaction_time,
|
||||
const std::vector<sstring>& extra_fields, Args&&... args) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
if (!sys_ks) {
|
||||
co_return;
|
||||
@@ -284,6 +291,9 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
SCYLLA_ASSERT(sys_ks);
|
||||
const sstring req =
|
||||
@@ -329,6 +339,9 @@ cql_table_large_data_handler::row_reinsert_func cql_table_large_data_handler::ma
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
SCYLLA_ASSERT(sys_ks);
|
||||
// sstable_name is a clustering key, so we can't update it in place.
|
||||
|
||||
@@ -181,6 +181,12 @@ protected:
|
||||
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override;
|
||||
|
||||
private:
|
||||
// Returns true if CQL writes to system.large_* tables should be skipped.
|
||||
// Once LARGE_DATA_VIRTUAL_TABLES is enabled, large data records are served
|
||||
// from SSTable metadata via virtual tables and the physical CQL tables are
|
||||
// dropped, so writing to them is both unnecessary and would fail.
|
||||
bool skip_cql_writes() const;
|
||||
|
||||
future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
|
||||
future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
|
||||
@@ -26,6 +26,8 @@
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
#include "db/view/build_progress_virtual_reader.hh"
|
||||
#include "index/built_indexes_virtual_reader.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "transport/protocol_server.hh"
|
||||
@@ -35,6 +37,7 @@
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "locator/load_sketch.hh"
|
||||
#include "types/list.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -1461,6 +1464,595 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
// Helper: convert bytes stored in a disk_string to sstring.
|
||||
// disk_string<uint32_t>::value is of type `bytes`.
|
||||
static sstring disk_string_to_sstring(const sstables::disk_string<uint32_t>& ds) {
|
||||
return sstring(to_string_view(ds.value));
|
||||
}
|
||||
|
||||
// Helper: convert int64_t millis-since-epoch to db_clock::time_point
|
||||
// for use with timestamp_type columns in virtual tables.
|
||||
static db_clock::time_point millis_to_db_clock(int64_t ms) {
|
||||
return db_clock::time_point(db_clock::duration(ms));
|
||||
}
|
||||
|
||||
// Derive compaction_time (milliseconds since Unix epoch) from the SSTable's
|
||||
// generation UUID. If the generation is not UUID-based (legacy integer
|
||||
// generations), returns 0.
|
||||
static int64_t compaction_time_from_generation(const sstables::sstable& sst) {
|
||||
auto gen = sst.generation();
|
||||
if (gen.is_uuid_based()) {
|
||||
return utils::UUID_gen::unix_timestamp(gen.as_uuid()).count();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows partition_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_partitions CQL table.
|
||||
class large_partitions_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t partition_size;
|
||||
sstring partition_key;
|
||||
int64_t compaction_time;
|
||||
int64_t rows;
|
||||
int64_t range_tombstones;
|
||||
int64_t dead_rows;
|
||||
};
|
||||
|
||||
// Extract partition_size records from a single table's SSTables on the local shard.
|
||||
// Called on each shard via map() to collect records cross-shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
// Legacy SSTable without LargeDataRecords: emit synthetic row
|
||||
// if above_threshold > 0 for partition_size or rows_in_partition stats.
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::partition_size);
|
||||
auto rows_stat = sst->get_large_data_stat(sstables::large_data_type::rows_in_partition);
|
||||
if ((stat && stat->above_threshold > 0) || (rows_stat && rows_stat->above_threshold > 0)) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.partition_size = static_cast<int64_t>(stat ? stat->max_value : 0),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
.rows = static_cast<int64_t>(rows_stat ? rows_stat->max_value : 0),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::partition_size &&
|
||||
rec.type != sstables::large_data_type::rows_in_partition) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.partition_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.compaction_time = compaction_time,
|
||||
.rows = static_cast<int64_t>(rec.elements_count),
|
||||
.range_tombstones = static_cast<int64_t>(rec.range_tombstones),
|
||||
.dead_rows = static_cast<int64_t>(rec.dead_rows),
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_partitions_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, 1);
|
||||
schema_builder builder(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, std::make_optional(id));
|
||||
builder.with_column("keyspace_name", utf8_type, column_kind::partition_key);
|
||||
builder.with_column("table_name", utf8_type, column_kind::partition_key);
|
||||
builder.with_column("sstable_name", utf8_type, column_kind::clustering_key);
|
||||
builder.with_column("partition_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key);
|
||||
builder.with_column("partition_key", utf8_type, column_kind::clustering_key);
|
||||
builder.with_column("rows", long_type);
|
||||
builder.with_column("compaction_time", timestamp_type);
|
||||
builder.with_column("range_tombstones", long_type);
|
||||
builder.with_column("dead_rows", long_type);
|
||||
builder.set_comment("partitions larger than specified threshold");
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t partition_size, const sstring& pk) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(partition_size),
|
||||
data_value(pk).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Phase 1: Determine which (ks, table) partitions this shard owns.
|
||||
// Table metadata is shared across all shards, so local iteration is sufficient.
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
// Sort by token order — streaming_virtual_table requires
|
||||
// partitions to be emitted in token order.
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
// Phase 2: For each owned table, collect records from ALL shards
|
||||
// in parallel, then sort and emit immediately. We process one
|
||||
// table at a time to bound memory usage: once we have emitted
|
||||
// page_size records we stop and let CQL paging fetch the rest.
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
// Sort by (sstable_name ASC, partition_size DESC, partition_key ASC) to match clustering order
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.partition_size != b.partition_size) {
|
||||
return a.partition_size > b.partition_size; // DESC
|
||||
}
|
||||
return a.partition_key < b.partition_key;
|
||||
});
|
||||
// A partition that exceeds both the size and the row-count
|
||||
// thresholds produces two LargeDataRecords entries (one per
|
||||
// type) that map to identical clustering keys. Remove the
|
||||
// duplicates so that every emitted row has a unique key.
|
||||
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
|
||||
return a.sstable_name == b.sstable_name
|
||||
&& a.partition_size == b.partition_size
|
||||
&& a.partition_key == b.partition_key;
|
||||
}), records.end());
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.partition_size, rec.partition_key));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
if (rec.rows != 0) {
|
||||
set_cell(cr.cells(), "rows", rec.rows);
|
||||
}
|
||||
if (rec.range_tombstones != 0) {
|
||||
set_cell(cr.cells(), "range_tombstones", rec.range_tombstones);
|
||||
}
|
||||
if (rec.dead_rows != 0) {
|
||||
set_cell(cr.cells(), "dead_rows", rec.dead_rows);
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows row_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_rows CQL table.
|
||||
class large_rows_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t row_size;
|
||||
sstring partition_key;
|
||||
sstring clustering_key;
|
||||
int64_t compaction_time;
|
||||
};
|
||||
|
||||
// Extract row_size records from a single table's SSTables on the local shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::row_size);
|
||||
if (stat && stat->above_threshold > 0) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.row_size = static_cast<int64_t>(stat->max_value),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.clustering_key = "",
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::row_size) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
auto ck_str = rec.clustering_key.value.empty()
|
||||
? sstring()
|
||||
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.row_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.clustering_key = std::move(ck_str),
|
||||
.compaction_time = compaction_time,
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_rows_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_ROWS, 1);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_ROWS, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
||||
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("compaction_time", timestamp_type)
|
||||
.set_comment("rows larger than specified threshold")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t row_size,
|
||||
const sstring& pk, const sstring& ck) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(row_size),
|
||||
data_value(pk).serialize_nonnull(),
|
||||
data_value(ck).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.row_size != b.row_size) {
|
||||
return a.row_size > b.row_size; // DESC
|
||||
} else if (a.partition_key != b.partition_key) {
|
||||
return a.partition_key < b.partition_key;
|
||||
}
|
||||
return a.clustering_key < b.clustering_key;
|
||||
});
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.row_size, rec.partition_key, rec.clustering_key));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows cell_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_cells CQL table.
|
||||
class large_cells_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t cell_size;
|
||||
sstring partition_key;
|
||||
sstring clustering_key;
|
||||
sstring column_name;
|
||||
int64_t collection_elements; // -1 means not applicable
|
||||
int64_t compaction_time;
|
||||
};
|
||||
|
||||
// Extract cell_size records from a single table's SSTables on the local shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::cell_size);
|
||||
auto elem_stat = sst->get_large_data_stat(sstables::large_data_type::elements_in_collection);
|
||||
if ((stat && stat->above_threshold > 0) || (elem_stat && elem_stat->above_threshold > 0)) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.cell_size = static_cast<int64_t>(stat ? stat->max_value : 0),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.clustering_key = "",
|
||||
.column_name = "",
|
||||
.collection_elements = elem_stat ? static_cast<int64_t>(elem_stat->max_value) : int64_t(-1),
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::cell_size &&
|
||||
rec.type != sstables::large_data_type::elements_in_collection) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
auto ck_str = rec.clustering_key.value.empty()
|
||||
? sstring()
|
||||
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.cell_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.clustering_key = std::move(ck_str),
|
||||
.column_name = disk_string_to_sstring(rec.column_name),
|
||||
.collection_elements = rec.elements_count > 0
|
||||
? static_cast<int64_t>(rec.elements_count) : int64_t(-1),
|
||||
.compaction_time = compaction_time,
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_cells_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_CELLS, 1);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_CELLS, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
||||
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("column_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("collection_elements", long_type)
|
||||
.with_column("compaction_time", timestamp_type)
|
||||
.set_comment("cells larger than specified threshold")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t cell_size,
|
||||
const sstring& pk, const sstring& ck, const sstring& col_name) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(cell_size),
|
||||
data_value(pk).serialize_nonnull(),
|
||||
data_value(ck).serialize_nonnull(),
|
||||
data_value(col_name).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.cell_size != b.cell_size) {
|
||||
return a.cell_size > b.cell_size; // DESC
|
||||
} else if (a.partition_key != b.partition_key) {
|
||||
return a.partition_key < b.partition_key;
|
||||
} else if (a.clustering_key != b.clustering_key) {
|
||||
return a.clustering_key < b.clustering_key;
|
||||
}
|
||||
return a.column_name < b.column_name;
|
||||
});
|
||||
// A collection cell that exceeds both the size and the
|
||||
// element-count thresholds produces two LargeDataRecords
|
||||
// entries that map to identical clustering keys. Remove the
|
||||
// duplicates so that every emitted row has a unique key.
|
||||
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
|
||||
return a.sstable_name == b.sstable_name
|
||||
&& a.cell_size == b.cell_size
|
||||
&& a.partition_key == b.partition_key
|
||||
&& a.clustering_key == b.clustering_key
|
||||
&& a.column_name == b.column_name;
|
||||
}), records.end());
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.cell_size,
|
||||
rec.partition_key, rec.clustering_key, rec.column_name));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
if (rec.collection_elements >= 0) {
|
||||
set_cell(cr.cells(), "collection_elements", rec.collection_elements);
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Helper: register a virtual table on the local shard.
|
||||
static future<> add_virtual_table(
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<replica::database>& dist_db,
|
||||
sharded<service::storage_service>& dist_ss,
|
||||
std::unique_ptr<virtual_table>&& tbl) {
|
||||
auto& virtual_tables = *sys_ks.local().get_virtual_tables_registry();
|
||||
auto& db = dist_db.local();
|
||||
auto& ss = dist_ss.local();
|
||||
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
}
|
||||
|
||||
future<> initialize_virtual_tables(
|
||||
@@ -1469,43 +2061,86 @@ future<> initialize_virtual_tables(
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<service::tablet_allocator>& tablet_allocator,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
db::config& cfg) {
|
||||
db::config& cfg,
|
||||
gms::feature_service& feat) {
|
||||
co_await smp::invoke_on_all([&] () -> future<> {
|
||||
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
|
||||
auto& virtual_tables = *virtual_tables_registry;
|
||||
auto& db = dist_db.local();
|
||||
auto& ss = dist_ss.local();
|
||||
|
||||
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss, std::move(tbl));
|
||||
};
|
||||
|
||||
auto& db = dist_db.local();
|
||||
|
||||
// Add built-in virtual tables here.
|
||||
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
|
||||
co_await add_table(std::make_unique<token_ring_table>(db, ss));
|
||||
co_await add_table(std::make_unique<token_ring_table>(db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<snapshots_table>(dist_db));
|
||||
co_await add_table(std::make_unique<protocol_servers_table>(ss));
|
||||
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
|
||||
co_await add_table(std::make_unique<protocol_servers_table>(dist_ss.local()));
|
||||
co_await add_table(std::make_unique<runtime_info_table>(dist_db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<versions_table>());
|
||||
co_await add_table(std::make_unique<db_config_table>(cfg));
|
||||
co_await add_table(std::make_unique<clients_table>(ss));
|
||||
co_await add_table(std::make_unique<clients_table>(dist_ss.local()));
|
||||
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
|
||||
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
|
||||
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
|
||||
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
|
||||
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
|
||||
co_await add_table(std::make_unique<cdc_timestamps_table>(db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<cdc_streams_table>(db, dist_ss.local()));
|
||||
|
||||
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
|
||||
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
|
||||
});
|
||||
|
||||
// Drop old physical system.large_* tables and register virtual
|
||||
// replacements. Must run on shard 0 (uses cross-shard operations).
|
||||
auto activate_large_data_virtual_tables = [&dist_db, &sys_ks, &dist_ss] () -> future<> {
|
||||
// Drop old physical system.large_* tables. Their data is now
|
||||
// served from SSTable metadata via virtual tables.
|
||||
// TODO: In a follow-up, read existing large data entries from
|
||||
// the old tables and populate per-sstable LargeDataRecords
|
||||
// metadata via component_rewrite before dropping.
|
||||
for (auto table_name : {
|
||||
db::system_keyspace::LARGE_PARTITIONS,
|
||||
db::system_keyspace::LARGE_ROWS,
|
||||
db::system_keyspace::LARGE_CELLS}) {
|
||||
try {
|
||||
co_await replica::database::legacy_drop_table_on_all_shards(
|
||||
dist_db, sys_ks, db::system_keyspace::NAME, table_name, false);
|
||||
} catch (const replica::no_such_column_family&) {
|
||||
// Already dropped (e.g. restart after feature was enabled).
|
||||
}
|
||||
}
|
||||
|
||||
// Add virtual tables on all shards.
|
||||
co_await smp::invoke_on_all([&dist_db, &sys_ks, &dist_ss] () -> future<> {
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_partitions_virtual_table>(dist_db));
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_rows_virtual_table>(dist_db));
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_cells_virtual_table>(dist_db));
|
||||
});
|
||||
};
|
||||
|
||||
if (feat.large_data_virtual_tables) {
|
||||
// Feature already enabled (e.g. test environment or restart after
|
||||
// upgrade). Activate directly as a coroutine — no seastar::async
|
||||
// context needed.
|
||||
co_await activate_large_data_virtual_tables();
|
||||
} else {
|
||||
// Feature not yet enabled. Register a callback that will fire
|
||||
// when the feature is enabled during rolling upgrade. The callback
|
||||
// runs inside seastar::async context (via feature_service::enable),
|
||||
// so .get() is safe.
|
||||
//
|
||||
// The listener_registration must outlive the feature, so we store
|
||||
// it in a static variable (process lifetime). This function is
|
||||
// only called on shard 0, so the listener fires only on shard 0.
|
||||
static gms::feature::listener_registration large_data_vt_listener;
|
||||
large_data_vt_listener = feat.large_data_virtual_tables.when_enabled(
|
||||
[activate_large_data_virtual_tables = std::move(activate_large_data_virtual_tables)] {
|
||||
activate_large_data_virtual_tables().get();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {
|
||||
|
||||
@@ -24,6 +24,7 @@ class tablet_allocator;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
class feature_service;
|
||||
class gossiper;
|
||||
}
|
||||
|
||||
@@ -44,7 +45,8 @@ future<> initialize_virtual_tables(
|
||||
sharded<db::system_keyspace>&,
|
||||
sharded<service::tablet_allocator>&,
|
||||
sharded<netw::messaging_service>&,
|
||||
db::config&);
|
||||
db::config&,
|
||||
gms::feature_service&);
|
||||
|
||||
|
||||
class virtual_table;
|
||||
|
||||
@@ -128,6 +128,79 @@ Expiring Data
|
||||
|
||||
In order to prevent stale data from appearing, all rows in ``system.large_partitions`` table are inserted with Time To Live (TTL) equal to 30 days.
|
||||
|
||||
|
||||
.. _large-data-virtual-tables:
|
||||
|
||||
Virtual Tables (``LARGE_DATA_VIRTUAL_TABLES`` Feature)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Starting with ScyllaDB 2026.2, the ``system.large_partitions``,
|
||||
``system.large_rows``, and ``system.large_cells`` tables are implemented as
|
||||
**virtual tables** that read directly from metadata stored inside each SSTable
|
||||
rather than from separate physical system tables.
|
||||
|
||||
How it works
|
||||
""""""""""""
|
||||
|
||||
When an SSTable is written (during flush or compaction), the SSTable writer
|
||||
records the top-N above-threshold large data entries into a
|
||||
``LargeDataRecords`` component in the SSTable's ``.Scylla`` metadata file.
|
||||
The virtual tables query this metadata from all live SSTables at read time,
|
||||
so large data records automatically follow their SSTables when they are
|
||||
migrated between shards or nodes (e.g. during tablet migration or repair).
|
||||
|
||||
The maximum number of records stored per large data type per SSTable is
|
||||
controlled by the ``compaction_large_data_records_per_sstable`` configuration
|
||||
option (default: 10).
|
||||
|
||||
Activation
|
||||
""""""""""
|
||||
|
||||
The transition is gated by the ``LARGE_DATA_VIRTUAL_TABLES`` cluster feature
|
||||
flag, which is automatically enabled once all nodes in the cluster are
|
||||
upgraded to a version that supports it.
|
||||
|
||||
When the feature is enabled on a node:
|
||||
|
||||
1. The old physical ``system.large_partitions``, ``system.large_rows``, and
|
||||
``system.large_cells`` tables are dropped.
|
||||
2. Virtual table replacements are registered in their place.
|
||||
3. New SSTable writes stop populating the old physical tables.
|
||||
|
||||
During a rolling upgrade, as long as any node in the cluster has not been
|
||||
upgraded, the feature remains disabled and all nodes continue writing to
|
||||
the old physical tables. This ensures safe rollback if the upgrade needs
|
||||
to be reverted.
|
||||
|
||||
Upgrade considerations
|
||||
""""""""""""""""""""""
|
||||
|
||||
After the ``LARGE_DATA_VIRTUAL_TABLES`` feature is enabled, the virtual tables
|
||||
derive their content from ``LargeDataRecords`` metadata stored in SSTables.
|
||||
SSTables written by older versions do not contain this metadata. As a result,
|
||||
**the virtual tables may appear empty until those older SSTables are rewritten**
|
||||
by compaction.
|
||||
|
||||
To populate the virtual tables promptly after upgrade, run one of the following
|
||||
on each node:
|
||||
|
||||
* **Upgrade SSTables compaction** (recommended -- rewrites SSTables without
|
||||
waiting for normal compaction triggers):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool upgradesstables --include-all-sstables
|
||||
|
||||
* **Major compaction** (rewrites all SSTables into a single set, but may
|
||||
temporarily increase disk usage):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool compact
|
||||
|
||||
Once the SSTables have been rewritten, the virtual tables will show the
|
||||
current large data records.
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
|
||||
Additional Resources
|
||||
|
||||
@@ -170,4 +170,12 @@ Expiring Data
|
||||
^^^^^^^^^^^^^
|
||||
In order to prevent stale data from appearing, all rows in the ``system.large_rows`` and ``system.large_cells`` tables are inserted with Time To Live (TTL) equal to 30 days.
|
||||
|
||||
Virtual Tables
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
Starting with ScyllaDB 2026.2, ``system.large_rows`` and ``system.large_cells``
|
||||
are implemented as virtual tables that read from SSTable metadata. See
|
||||
:ref:`large-data-virtual-tables` for details on the
|
||||
``LARGE_DATA_VIRTUAL_TABLES`` feature and upgrade considerations.
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
|
||||
@@ -181,6 +181,7 @@ public:
|
||||
gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv };
|
||||
gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv };
|
||||
gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv };
|
||||
gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
2
main.cc
2
main.cc
@@ -1963,7 +1963,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing virtual tables");
|
||||
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg).get();
|
||||
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg, feature_service.local()).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
|
||||
@@ -1306,9 +1306,20 @@ bool operator==(const column_definition& x, const column_definition& y)
|
||||
}
|
||||
|
||||
// Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId
|
||||
//
|
||||
// The version suffix (e.g. "_v1") allows creating a table with a
|
||||
// different UUID than the original (version 0). This is used when
|
||||
// replacing a physical table with a virtual table of the same name:
|
||||
// the virtual table uses version 1 so its UUID differs from the old
|
||||
// physical table, allowing the old table to be properly dropped by
|
||||
// UUID before the replacement is registered.
|
||||
table_id
|
||||
generate_legacy_id(const sstring& ks_name, const sstring& cf_name) {
|
||||
return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name));
|
||||
generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version) {
|
||||
auto name = ks_name + cf_name;
|
||||
if (version > 0) {
|
||||
name += fmt::format("_v{}", version);
|
||||
}
|
||||
return table_id(utils::UUID_gen::get_name_UUID(name));
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::set_compaction_strategy_options(std::map<sstring, sstring>&& options) {
|
||||
|
||||
@@ -1075,7 +1075,7 @@ public:
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const view_ptr& view);
|
||||
|
||||
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name);
|
||||
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version = 0);
|
||||
|
||||
|
||||
// Thrown when attempted to access a schema-dependent object using
|
||||
|
||||
@@ -90,6 +90,9 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
|
||||
// Check the only the large row is added to system.large_rows.
|
||||
BOOST_REQUIRE_EQUAL(rows.size(), 1);
|
||||
auto row0 = rows[0];
|
||||
// The result has 3 columns: partition_key, row_size, and table_name
|
||||
// (CQL adds the filtered partition key column to the result set when
|
||||
// using ALLOW FILTERING with a partial partition key restriction).
|
||||
BOOST_REQUIRE_EQUAL(row0.size(), 3);
|
||||
BOOST_REQUIRE_EQUAL(to_bytes(*row0[0]), "44");
|
||||
BOOST_REQUIRE_EQUAL(to_bytes(*row0[2]), "tbl");
|
||||
@@ -118,10 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
|
||||
|
||||
e.execute_cql("delete from tbl where a = 44;").get();
|
||||
|
||||
// In order to guarantee that system.large_rows, system.large_cells and system.large_partitions have been updated, we have to
|
||||
// In order to guarantee that system.large_rows, system.large_cells and
|
||||
// system.large_partitions are empty, we need to:
|
||||
// * flush, so that a tombstone for the above delete is created.
|
||||
// * do a major compaction, so that the tombstone is combined with the old entry,
|
||||
// and the old sstable is deleted.
|
||||
// and the old sstable (which holds the large data records in its metadata) is deleted.
|
||||
flush(e);
|
||||
e.db().invoke_on_all([] (replica::database& dbi) {
|
||||
return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr<replica::table> t) {
|
||||
@@ -165,6 +169,71 @@ SEASTAR_THREAD_TEST_CASE(test_large_row_count_warning) {
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Test that when a partition exceeds both the size threshold and the row
|
||||
// count threshold, system.large_partitions still returns a single row
|
||||
// (not two rows with the same clustering key).
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_partitions_dual_threshold) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
// Set very low thresholds so that a single partition with a handful
|
||||
// of rows containing modest data exceeds both size and row-count
|
||||
// thresholds simultaneously.
|
||||
cfg->compaction_large_partition_warning_threshold_mb(1);
|
||||
cfg->compaction_rows_count_warning_threshold(10);
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("create table tbl (a int, b int, c text, primary key (a, b))").get();
|
||||
// Insert enough rows with enough data to exceed 1 MB partition size
|
||||
// AND more than 10 rows.
|
||||
sstring blob(128 * 1024, 'x'); // 128 KB per row
|
||||
for (int i = 0; i < 11; ++i) {
|
||||
e.execute_cql(format("insert into tbl (a, b, c) values (42, {}, '{}');", i, blob)).get();
|
||||
}
|
||||
flush(e);
|
||||
|
||||
// There must be exactly one row in system.large_partitions for
|
||||
// this table. Before the fix, two rows with the same clustering
|
||||
// key would have been emitted.
|
||||
assert_that(e.execute_cql(
|
||||
"select partition_key, rows from system.large_partitions "
|
||||
"where table_name = 'tbl' allow filtering;").get())
|
||||
.is_rows()
|
||||
.with_size(1);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Test that when a collection cell exceeds both the cell size threshold
|
||||
// and the collection element count threshold, system.large_cells still
|
||||
// returns a single row (not two rows with the same clustering key).
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_cells_dual_threshold) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
// Set very low thresholds so that a collection with modest elements
|
||||
// exceeds both size and element-count thresholds simultaneously.
|
||||
cfg->compaction_large_cell_warning_threshold_mb(1);
|
||||
cfg->compaction_collection_elements_count_warning_threshold(10);
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("create table tbl (a int, b list<text>, primary key (a))").get();
|
||||
// Insert 128 KB blobs, 11 times -> ~1.4 MB total, exceeding 1 MB
|
||||
// threshold. Also exceeds 10 element threshold.
|
||||
sstring blob(128 * 1024, 'x');
|
||||
for (int i = 0; i < 11; ++i) {
|
||||
e.execute_cql("update tbl set b = ['" + blob + "'] + b where a = 42;").get();
|
||||
}
|
||||
flush(e);
|
||||
|
||||
// There must be exactly one row in system.large_cells for this
|
||||
// table. Before the fix, two rows with the same clustering key
|
||||
// would have been emitted.
|
||||
assert_that(e.execute_cql(
|
||||
"select partition_key, column_name from system.large_cells "
|
||||
"where table_name = 'tbl' allow filtering;").get())
|
||||
.is_rows()
|
||||
.with_size(1);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
return do_with_cql_env([] (cql_test_env& e) {
|
||||
return seastar::async([&e] {
|
||||
|
||||
@@ -5092,70 +5092,34 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) {
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct large_row_handler : public db::large_data_handler {
|
||||
using callback_t = std::function<void(const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements)>;
|
||||
callback_t callback;
|
||||
|
||||
large_row_handler(uint64_t large_rows_threshold, uint64_t rows_count_threshold, uint64_t cell_threshold_bytes, uint64_t collection_elements_threshold, callback_t callback)
|
||||
: large_data_handler(std::numeric_limits<uint64_t>::max(), large_rows_threshold, cell_threshold_bytes,
|
||||
rows_count_threshold, collection_elements_threshold)
|
||||
, callback(std::move(callback)) {
|
||||
// A large_data_handler with configurable thresholds for testing.
|
||||
// After refactoring, the handler only does threshold comparison + logging;
|
||||
// actual large data recording goes into the SSTable's LargeDataRecords metadata.
|
||||
struct large_data_test_handler : public db::large_data_handler {
|
||||
large_data_test_handler(uint64_t partition_threshold_bytes, uint64_t large_rows_threshold,
|
||||
uint64_t cell_threshold_bytes, uint64_t rows_count_threshold,
|
||||
uint64_t collection_elements_threshold)
|
||||
: large_data_handler(partition_threshold_bytes, large_rows_threshold, cell_threshold_bytes,
|
||||
rows_count_threshold, collection_elements_threshold) {
|
||||
start();
|
||||
}
|
||||
|
||||
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, clustering_key, row_size, 0, 0, nullptr, 0, 0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, clustering_key, 0, 0, 0, &cdef, cell_size, collection_elements);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
uint64_t partition_size, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows_count) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, nullptr, rows_count, range_tombstones_count, dead_rows_count, nullptr, 0, 0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
protected:
|
||||
future<> record_large_cells(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { return make_ready_future<>(); }
|
||||
future<> record_large_rows(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, uint64_t) const override { return make_ready_future<>(); }
|
||||
future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { return make_ready_future<>(); }
|
||||
future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { return make_ready_future<>(); }
|
||||
future<> record_large_partitions(const sstables::sstable&, const sstables::key&,
|
||||
uint64_t, uint64_t, uint64_t, uint64_t) const override { return make_ready_future<>(); }
|
||||
};
|
||||
}
|
||||
|
||||
static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
|
||||
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
|
||||
unsigned i = 0;
|
||||
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
|
||||
BOOST_REQUIRE_LT(i, expected.size());
|
||||
BOOST_REQUIRE_GT(row_size, threshold);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
if (clustering_key) {
|
||||
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
|
||||
}
|
||||
++i;
|
||||
};
|
||||
|
||||
large_row_handler handler(threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(), threshold,
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(s, version);
|
||||
@@ -5165,14 +5129,50 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
// depends on the encoding statistics (because of variable-length encoding). The original values
|
||||
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
|
||||
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
BOOST_REQUIRE_EQUAL(i, expected.size());
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
// Collect row_size records (order is not guaranteed, so just count and validate).
|
||||
unsigned row_size_count = 0;
|
||||
unsigned static_row_count = 0;
|
||||
unsigned clustering_row_count = 0;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type != large_data_type::row_size) {
|
||||
continue;
|
||||
}
|
||||
BOOST_REQUIRE_GT(rec.value, threshold);
|
||||
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
|
||||
if (rec_ck_str.empty()) {
|
||||
++static_row_count;
|
||||
} else {
|
||||
++clustering_row_count;
|
||||
}
|
||||
++row_size_count;
|
||||
}
|
||||
}
|
||||
// Count expected static rows (nullptr entries) and clustering rows.
|
||||
unsigned expected_static = 0;
|
||||
unsigned expected_clustering = 0;
|
||||
for (auto* ck : expected) {
|
||||
if (ck) {
|
||||
++expected_clustering;
|
||||
} else {
|
||||
++expected_static;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(row_size_count, expected.size());
|
||||
BOOST_REQUIRE_EQUAL(static_row_count, expected_static);
|
||||
BOOST_REQUIRE_EQUAL(clustering_row_count, expected_clustering);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5190,26 +5190,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
|
||||
static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
|
||||
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
|
||||
unsigned i = 0;
|
||||
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_TEST_MESSAGE(format("i={} ck={} cell_size={} threshold={}", i, clustering_key ? format("{}", *clustering_key) : "null", cell_size, threshold));
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
|
||||
BOOST_REQUIRE_LT(i, expected.size());
|
||||
BOOST_REQUIRE_GT(cell_size, threshold);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
if (clustering_key) {
|
||||
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
|
||||
}
|
||||
++i;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), f);
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), threshold,
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(s, version);
|
||||
@@ -5219,14 +5202,50 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
// depends on the encoding statistics (because of variable-length encoding). The original values
|
||||
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
|
||||
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
BOOST_REQUIRE_EQUAL(i, expected.size());
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
// Collect cell_size records (order is not guaranteed, so just count and validate).
|
||||
unsigned cell_size_count = 0;
|
||||
unsigned static_cell_count = 0;
|
||||
unsigned clustering_cell_count = 0;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type != large_data_type::cell_size) {
|
||||
continue;
|
||||
}
|
||||
BOOST_REQUIRE_GT(rec.value, threshold);
|
||||
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
|
||||
if (rec_ck_str.empty()) {
|
||||
++static_cell_count;
|
||||
} else {
|
||||
++clustering_cell_count;
|
||||
}
|
||||
++cell_size_count;
|
||||
}
|
||||
}
|
||||
// Count expected static cells (nullptr entries) and clustering cells.
|
||||
unsigned expected_static = 0;
|
||||
unsigned expected_clustering = 0;
|
||||
for (auto* ck : expected) {
|
||||
if (ck) {
|
||||
++expected_clustering;
|
||||
} else {
|
||||
++expected_static;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(cell_size_count, expected.size());
|
||||
BOOST_REQUIRE_EQUAL(static_cell_count, expected_static);
|
||||
BOOST_REQUIRE_EQUAL(clustering_cell_count, expected_clustering);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5245,8 +5264,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
sv += "foo ";
|
||||
@@ -5259,27 +5278,27 @@ static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uin
|
||||
schema_ptr sc = s.schema();
|
||||
auto mt = make_memtable(sc, {p});
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&logged, &pk, &threshold, &rows, &range_tombstones](const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(rows_count, threshold);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
// Inserting one range tombstone creates two range tombstone marker rows
|
||||
BOOST_REQUIRE_EQUAL(rows_count, rows + range_tombstones * 2);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones_count, range_tombstones * 2);
|
||||
logged = true;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
// rows_count_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
threshold, std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5307,8 +5326,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
int expected_dead_rows = 0;
|
||||
@@ -5355,7 +5374,6 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
|
||||
}
|
||||
auto ck_start = ck;
|
||||
auto rt_start = bound_view(ck_start, tests::random::get_bool() ? bound_kind::incl_start : bound_kind::excl_start);
|
||||
auto sv_end = sv;
|
||||
auto rt_size = tests::random::get_int(1, 10);
|
||||
for (auto i = 0; i < rt_size; i++) {
|
||||
sv += "X";
|
||||
@@ -5374,25 +5392,27 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
|
||||
auto mt = make_lw_shared<replica::memtable>(sc);
|
||||
mt->apply(p);
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&] (const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(rows_count, threshold);
|
||||
BOOST_REQUIRE_EQUAL(rows_count, rows);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, expected_dead_rows + expected_expired_rows + expected_rows_with_dead_cell);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, expected_range_tombstones);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
logged = true;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
// rows_count_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
threshold, std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5416,8 +5436,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
kv_map[to_bytes(format("key{}", i))] = to_bytes(format("val{}", i));
|
||||
@@ -5426,25 +5446,28 @@ static void test_sstable_too_many_collection_elements_f(int elements, uint64_t t
|
||||
schema_ptr sc = s.schema();
|
||||
auto mt = make_memtable(sc, {p});
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&logged, &pk, &threshold](const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(collection_elements, threshold);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
logged = true;
|
||||
};
|
||||
|
||||
BOOST_TEST_MESSAGE(format("elements={} threshold={} expected={}", elements, threshold, expected));
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, f);
|
||||
// collection_elements_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), threshold);
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::elements_in_collection && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5465,40 +5488,10 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_too_many_collection_elements) {
|
||||
}
|
||||
|
||||
// Handler for testing LargeDataRecords population in SSTable metadata.
|
||||
// Unlike large_row_handler above, this one allows setting the partition_threshold_bytes too.
|
||||
// large_data_test_handler already serves this purpose; keep this alias for
|
||||
// test readability / backward compatibility.
|
||||
namespace {
|
||||
struct large_data_records_handler : public db::large_data_handler {
|
||||
large_data_records_handler(uint64_t partition_threshold_bytes, uint64_t row_threshold_bytes,
|
||||
uint64_t cell_threshold_bytes, uint64_t rows_count_threshold,
|
||||
uint64_t collection_elements_threshold)
|
||||
: large_data_handler(partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes,
|
||||
rows_count_threshold, collection_elements_threshold) {
|
||||
start();
|
||||
}
|
||||
|
||||
virtual future<> record_large_rows(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, uint64_t) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_cells(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_partitions(const sstables::sstable&, const sstables::key&,
|
||||
uint64_t, uint64_t, uint64_t, uint64_t) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
};
|
||||
using large_data_records_handler = large_data_test_handler;
|
||||
}
|
||||
|
||||
// Test that writing an SSTable with data exceeding large data thresholds
|
||||
|
||||
@@ -1008,7 +1008,7 @@ private:
|
||||
_mnotifier.local().unregister_listener(&_ss.local()).get();
|
||||
});
|
||||
|
||||
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg).get();
|
||||
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg, _feature_service.local()).get();
|
||||
|
||||
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
|
||||
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,
|
||||
|
||||
Reference in New Issue
Block a user