mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-01 05:35:48 +00:00
erge 'db: store large data records in SSTable metadata and serve via virtual tables' from Benny Halevy
`system.large_partitions`, `system.large_rows`, and `system.large_cells` store records keyed by SSTable name. When SSTables are migrated between shards or nodes (resharding, streaming, decommission), the records are lost because the destination never writes entries for the migrated SSTables. This patch series moves the source of truth for large data records into the SSTable's scylla metadata component (new `LargeDataRecords` tag 13) and reimplements the three `system.large_*` tables as virtual tables that query live SSTables on demand. A cluster feature flag (`LARGE_DATA_VIRTUAL_TABLES`) gates the transition for safe rolling upgrades. When the cluster feature is enabled, each node drops the old system large_* tables and starts serving the corresponding tables using virtual tables that represent the large data records now stored on the sstables. Note that the virtual tables will be empty after upgrade until the sstables that contained large data are rewritten, therefore it is recommended to run upgrade sstables compaction or major compaction to repopulate the sstables scylla-metadata with large data records. 1. **keys: move key_to_str() to keys/keys.hh** — make the helper reusable across large_data_handler, virtual tables, and scylla-sstable 2. **sstables: add LargeDataRecords metadata type (tag 13)** — new struct with binary-serialized key fields, scylla-sstable JSON support, format documentation 3. **large_data_handler: rename partition_above_threshold to above_threshold_result** — generalize the struct for reuse 4. **large_data_handler: return above_threshold_result from maybe_record_large_cells** — separate booleans for cell size vs collection elements thresholds 5. **sstables: populate LargeDataRecords from writer** — bounded min-heaps (one per large_data_type), configurable top-N via `compaction_large_data_records_per_sstable` 6. **test: add LargeDataRecords round-trip unit tests** — verify write/read, top-N bounding, below-threshold behavior 7. **db: call initialize_virtual_tables from shard 0 only** — preparatory refactoring to enable cross-shard coordination 8. **db: implement large_data virtual tables with feature flag gating** — three virtual table classes, feature flag activation, legacy SSTable fallback, dual-threshold dedup, cross-shard collection Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1276 * Although this fixes a bug where large data entries are effectively lost when sstables are renamed or migrated, the changes are intrusive and do not warrant a backport Closes scylladb/scylladb#29257 * github.com:scylladb/scylladb: db: implement large_data virtual tables with feature flag gating db: call initialize_virtual_tables from shard 0 only test: add LargeDataRecords round-trip unit tests sstables: populate LargeDataRecords from writer large_data_handler: return above_threshold_result from maybe_record_large_cells large_data_handler: rename partition_above_threshold to above_threshold_result sstables: add LargeDataRecords metadata type (tag 13) sstables: add fmt::formatter for large_data_type keys: move key_to_str() to keys/keys.hh
This commit is contained in:
@@ -790,6 +790,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Log a warning when writing a number of rows larger than this value.")
|
||||
, compaction_collection_elements_count_warning_threshold(this, "compaction_collection_elements_count_warning_threshold", liveness::LiveUpdate, value_status::Used, 10000,
|
||||
"Log a warning when writing a collection containing more elements than this value.")
|
||||
, compaction_large_data_records_per_sstable(this, "compaction_large_data_records_per_sstable", liveness::LiveUpdate, value_status::Used, 10,
|
||||
"Maximum number of large data records per type to store in each SSTable's scylla metadata.")
|
||||
/**
|
||||
* @Group Common memtable settings
|
||||
*/
|
||||
|
||||
@@ -228,6 +228,7 @@ public:
|
||||
named_value<uint32_t> compaction_large_cell_warning_threshold_mb;
|
||||
named_value<uint32_t> compaction_rows_count_warning_threshold;
|
||||
named_value<uint32_t> compaction_collection_elements_count_warning_threshold;
|
||||
named_value<uint32_t> compaction_large_data_records_per_sstable;
|
||||
named_value<uint32_t> memtable_total_space_in_mb;
|
||||
named_value<uint32_t> concurrent_reads;
|
||||
named_value<uint32_t> concurrent_writes;
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include <seastar/core/when_all.hh>
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
@@ -39,19 +40,19 @@ large_data_handler::large_data_handler(uint64_t partition_threshold_bytes, uint6
|
||||
partition_threshold_bytes, row_threshold_bytes, cell_threshold_bytes, rows_count_threshold, _collection_elements_count_threshold);
|
||||
}
|
||||
|
||||
future<large_data_handler::partition_above_threshold> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) {
|
||||
future<large_data_handler::above_threshold_result> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) {
|
||||
SCYLLA_ASSERT(running());
|
||||
partition_above_threshold above_threshold{partition_size > _partition_threshold_bytes, rows > _rows_count_threshold};
|
||||
above_threshold_result above_threshold{.size = partition_size > _partition_threshold_bytes, .elements = rows > _rows_count_threshold};
|
||||
static_assert(std::is_same_v<decltype(above_threshold.size), bool>);
|
||||
_stats.partitions_bigger_than_threshold += above_threshold.size; // increment if true
|
||||
if (above_threshold.size || above_threshold.rows) [[unlikely]] {
|
||||
if (above_threshold.size || above_threshold.elements) [[unlikely]] {
|
||||
return with_sem([&sst, &key, partition_size, rows, range_tombstones, dead_rows, this] {
|
||||
return record_large_partitions(sst, key, partition_size, rows, range_tombstones, dead_rows);
|
||||
}).then([above_threshold] {
|
||||
return above_threshold;
|
||||
});
|
||||
}
|
||||
return make_ready_future<partition_above_threshold>();
|
||||
return make_ready_future<above_threshold_result>();
|
||||
}
|
||||
|
||||
void large_data_handler::start() {
|
||||
@@ -75,10 +76,6 @@ future<> large_data_handler::unplug_system_keyspace() noexcept {
|
||||
co_await _sys_ks.unplug();
|
||||
}
|
||||
|
||||
template <typename T> static std::string key_to_str(const T& key, const schema& s) {
|
||||
return fmt::to_string(key.with_schema(s));
|
||||
}
|
||||
|
||||
sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
|
||||
return sst.component_basename(sstables::component_type::Data);
|
||||
}
|
||||
@@ -145,6 +142,10 @@ future<> large_data_handler::maybe_update_large_data_entries_sstable_name(sstabl
|
||||
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
|
||||
}
|
||||
|
||||
bool cql_table_large_data_handler::skip_cql_writes() const {
|
||||
return bool(_feat.large_data_virtual_tables);
|
||||
}
|
||||
|
||||
cql_table_large_data_handler::cql_table_large_data_handler(gms::feature_service& feat,
|
||||
utils::updateable_value<uint32_t> partition_threshold_mb,
|
||||
utils::updateable_value<uint32_t> row_threshold_mb,
|
||||
@@ -183,6 +184,9 @@ future<> cql_table_large_data_handler::do_insert_large_data_entry(std::string_vi
|
||||
sstring ks_name, sstring cf_name, sstring sstable_name,
|
||||
int64_t size, sstring partition_key, db_clock::time_point compaction_time,
|
||||
const std::vector<sstring>& extra_fields, Args&&... args) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
if (!sys_ks) {
|
||||
co_return;
|
||||
@@ -287,6 +291,9 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
SCYLLA_ASSERT(sys_ks);
|
||||
const sstring req =
|
||||
@@ -332,6 +339,9 @@ cql_table_large_data_handler::row_reinsert_func cql_table_large_data_handler::ma
|
||||
}
|
||||
|
||||
future<> cql_table_large_data_handler::update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const {
|
||||
if (skip_cql_writes()) {
|
||||
co_return;
|
||||
}
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
SCYLLA_ASSERT(sys_ks);
|
||||
// sstable_name is a clustering key, so we can't update it in place.
|
||||
|
||||
@@ -91,24 +91,25 @@ public:
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
|
||||
struct partition_above_threshold {
|
||||
struct above_threshold_result {
|
||||
bool size = false;
|
||||
bool rows = false;
|
||||
bool elements = false;
|
||||
};
|
||||
future<partition_above_threshold> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
future<above_threshold_result> maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows);
|
||||
|
||||
future<bool> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
future<above_threshold_result> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
SCYLLA_ASSERT(running());
|
||||
if (cell_size > _cell_threshold_bytes || collection_elements > _collection_elements_count_threshold) [[unlikely]] {
|
||||
above_threshold_result above_threshold{.size = cell_size > _cell_threshold_bytes, .elements = collection_elements > _collection_elements_count_threshold};
|
||||
if (above_threshold.size || above_threshold.elements) [[unlikely]] {
|
||||
return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, collection_elements, this] {
|
||||
return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size, collection_elements);
|
||||
}).then([] {
|
||||
return true;
|
||||
}).then([above_threshold] {
|
||||
return above_threshold;
|
||||
});
|
||||
}
|
||||
return make_ready_future<bool>(false);
|
||||
return make_ready_future<above_threshold_result>();
|
||||
}
|
||||
|
||||
future<> maybe_delete_large_data_entries(sstables::shared_sstable sst);
|
||||
@@ -180,6 +181,12 @@ protected:
|
||||
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, uint64_t row_size) const override;
|
||||
|
||||
private:
|
||||
// Returns true if CQL writes to system.large_* tables should be skipped.
|
||||
// Once LARGE_DATA_VIRTUAL_TABLES is enabled, large data records are served
|
||||
// from SSTable metadata via virtual tables and the physical CQL tables are
|
||||
// dropped, so writing to them is both unnecessary and would fail.
|
||||
bool skip_cql_writes() const;
|
||||
|
||||
future<> internal_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const;
|
||||
future<> internal_record_large_cells_and_collections(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
|
||||
@@ -26,6 +26,8 @@
|
||||
#include "db/size_estimates_virtual_reader.hh"
|
||||
#include "db/view/build_progress_virtual_reader.hh"
|
||||
#include "index/built_indexes_virtual_reader.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "transport/protocol_server.hh"
|
||||
@@ -35,6 +37,7 @@
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "locator/load_sketch.hh"
|
||||
#include "types/list.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -1461,6 +1464,595 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
// Helper: convert bytes stored in a disk_string to sstring.
|
||||
// disk_string<uint32_t>::value is of type `bytes`.
|
||||
static sstring disk_string_to_sstring(const sstables::disk_string<uint32_t>& ds) {
|
||||
return sstring(to_string_view(ds.value));
|
||||
}
|
||||
|
||||
// Helper: convert int64_t millis-since-epoch to db_clock::time_point
|
||||
// for use with timestamp_type columns in virtual tables.
|
||||
static db_clock::time_point millis_to_db_clock(int64_t ms) {
|
||||
return db_clock::time_point(db_clock::duration(ms));
|
||||
}
|
||||
|
||||
// Derive compaction_time (milliseconds since Unix epoch) from the SSTable's
|
||||
// generation UUID. If the generation is not UUID-based (legacy integer
|
||||
// generations), returns 0.
|
||||
static int64_t compaction_time_from_generation(const sstables::sstable& sst) {
|
||||
auto gen = sst.generation();
|
||||
if (gen.is_uuid_based()) {
|
||||
return utils::UUID_gen::unix_timestamp(gen.as_uuid()).count();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows partition_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_partitions CQL table.
|
||||
class large_partitions_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t partition_size;
|
||||
sstring partition_key;
|
||||
int64_t compaction_time;
|
||||
int64_t rows;
|
||||
int64_t range_tombstones;
|
||||
int64_t dead_rows;
|
||||
};
|
||||
|
||||
// Extract partition_size records from a single table's SSTables on the local shard.
|
||||
// Called on each shard via map() to collect records cross-shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
// Legacy SSTable without LargeDataRecords: emit synthetic row
|
||||
// if above_threshold > 0 for partition_size or rows_in_partition stats.
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::partition_size);
|
||||
auto rows_stat = sst->get_large_data_stat(sstables::large_data_type::rows_in_partition);
|
||||
if ((stat && stat->above_threshold > 0) || (rows_stat && rows_stat->above_threshold > 0)) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.partition_size = static_cast<int64_t>(stat ? stat->max_value : 0),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
.rows = static_cast<int64_t>(rows_stat ? rows_stat->max_value : 0),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::partition_size &&
|
||||
rec.type != sstables::large_data_type::rows_in_partition) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.partition_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.compaction_time = compaction_time,
|
||||
.rows = static_cast<int64_t>(rec.elements_count),
|
||||
.range_tombstones = static_cast<int64_t>(rec.range_tombstones),
|
||||
.dead_rows = static_cast<int64_t>(rec.dead_rows),
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_partitions_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, 1);
|
||||
schema_builder builder(system_keyspace::NAME, system_keyspace::LARGE_PARTITIONS, std::make_optional(id));
|
||||
builder.with_column("keyspace_name", utf8_type, column_kind::partition_key);
|
||||
builder.with_column("table_name", utf8_type, column_kind::partition_key);
|
||||
builder.with_column("sstable_name", utf8_type, column_kind::clustering_key);
|
||||
builder.with_column("partition_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key);
|
||||
builder.with_column("partition_key", utf8_type, column_kind::clustering_key);
|
||||
builder.with_column("rows", long_type);
|
||||
builder.with_column("compaction_time", timestamp_type);
|
||||
builder.with_column("range_tombstones", long_type);
|
||||
builder.with_column("dead_rows", long_type);
|
||||
builder.set_comment("partitions larger than specified threshold");
|
||||
builder.with_hash_version();
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t partition_size, const sstring& pk) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(partition_size),
|
||||
data_value(pk).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
// Phase 1: Determine which (ks, table) partitions this shard owns.
|
||||
// Table metadata is shared across all shards, so local iteration is sufficient.
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
// Sort by token order — streaming_virtual_table requires
|
||||
// partitions to be emitted in token order.
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
// Phase 2: For each owned table, collect records from ALL shards
|
||||
// in parallel, then sort and emit immediately. We process one
|
||||
// table at a time to bound memory usage: once we have emitted
|
||||
// page_size records we stop and let CQL paging fetch the rest.
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
// Sort by (sstable_name ASC, partition_size DESC, partition_key ASC) to match clustering order
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.partition_size != b.partition_size) {
|
||||
return a.partition_size > b.partition_size; // DESC
|
||||
}
|
||||
return a.partition_key < b.partition_key;
|
||||
});
|
||||
// A partition that exceeds both the size and the row-count
|
||||
// thresholds produces two LargeDataRecords entries (one per
|
||||
// type) that map to identical clustering keys. Remove the
|
||||
// duplicates so that every emitted row has a unique key.
|
||||
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
|
||||
return a.sstable_name == b.sstable_name
|
||||
&& a.partition_size == b.partition_size
|
||||
&& a.partition_key == b.partition_key;
|
||||
}), records.end());
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.partition_size, rec.partition_key));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
if (rec.rows != 0) {
|
||||
set_cell(cr.cells(), "rows", rec.rows);
|
||||
}
|
||||
if (rec.range_tombstones != 0) {
|
||||
set_cell(cr.cells(), "range_tombstones", rec.range_tombstones);
|
||||
}
|
||||
if (rec.dead_rows != 0) {
|
||||
set_cell(cr.cells(), "dead_rows", rec.dead_rows);
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows row_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_rows CQL table.
|
||||
class large_rows_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t row_size;
|
||||
sstring partition_key;
|
||||
sstring clustering_key;
|
||||
int64_t compaction_time;
|
||||
};
|
||||
|
||||
// Extract row_size records from a single table's SSTables on the local shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::row_size);
|
||||
if (stat && stat->above_threshold > 0) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.row_size = static_cast<int64_t>(stat->max_value),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.clustering_key = "",
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::row_size) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
auto ck_str = rec.clustering_key.value.empty()
|
||||
? sstring()
|
||||
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.row_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.clustering_key = std::move(ck_str),
|
||||
.compaction_time = compaction_time,
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_rows_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_ROWS, 1);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_ROWS, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("row_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
||||
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("compaction_time", timestamp_type)
|
||||
.set_comment("rows larger than specified threshold")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t row_size,
|
||||
const sstring& pk, const sstring& ck) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(row_size),
|
||||
data_value(pk).serialize_nonnull(),
|
||||
data_value(ck).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.row_size != b.row_size) {
|
||||
return a.row_size > b.row_size; // DESC
|
||||
} else if (a.partition_key != b.partition_key) {
|
||||
return a.partition_key < b.partition_key;
|
||||
}
|
||||
return a.clustering_key < b.clustering_key;
|
||||
});
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.row_size, rec.partition_key, rec.clustering_key));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table backed by LargeDataRecords in SSTable scylla metadata.
|
||||
// Shows cell_size records from all live SSTables, matching the
|
||||
// schema of the legacy system.large_cells CQL table.
|
||||
class large_cells_virtual_table : public streaming_virtual_table {
|
||||
sharded<replica::database>& _db;
|
||||
|
||||
struct record {
|
||||
sstring sstable_name;
|
||||
int64_t cell_size;
|
||||
sstring partition_key;
|
||||
sstring clustering_key;
|
||||
sstring column_name;
|
||||
int64_t collection_elements; // -1 means not applicable
|
||||
int64_t compaction_time;
|
||||
};
|
||||
|
||||
// Extract cell_size records from a single table's SSTables on the local shard.
|
||||
static future<std::vector<record>> collect_local_records(replica::table& table) {
|
||||
std::vector<record> records;
|
||||
auto table_schema = table.schema();
|
||||
auto sstables = co_await table.take_sstable_set_snapshot();
|
||||
for (const auto& sst : sstables) {
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
if (!records_opt) {
|
||||
auto stat = sst->get_large_data_stat(sstables::large_data_type::cell_size);
|
||||
auto elem_stat = sst->get_large_data_stat(sstables::large_data_type::elements_in_collection);
|
||||
if ((stat && stat->above_threshold > 0) || (elem_stat && elem_stat->above_threshold > 0)) {
|
||||
records.push_back(record{
|
||||
.sstable_name = sst->component_basename(sstables::component_type::Data),
|
||||
.cell_size = static_cast<int64_t>(stat ? stat->max_value : 0),
|
||||
.partition_key = "(details unavailable - legacy SSTable)",
|
||||
.clustering_key = "",
|
||||
.column_name = "",
|
||||
.collection_elements = elem_stat ? static_cast<int64_t>(elem_stat->max_value) : int64_t(-1),
|
||||
.compaction_time = compaction_time_from_generation(*sst),
|
||||
});
|
||||
}
|
||||
continue;
|
||||
}
|
||||
auto compaction_time = compaction_time_from_generation(*sst);
|
||||
auto sst_name = sst->component_basename(sstables::component_type::Data);
|
||||
for (const auto& rec : records_opt->elements) {
|
||||
if (rec.type != sstables::large_data_type::cell_size &&
|
||||
rec.type != sstables::large_data_type::elements_in_collection) {
|
||||
continue;
|
||||
}
|
||||
auto pk = sstables::key_view(rec.partition_key.value).to_partition_key(*table_schema);
|
||||
auto ck_str = rec.clustering_key.value.empty()
|
||||
? sstring()
|
||||
: sstring(key_to_str(clustering_key_prefix::from_bytes(rec.clustering_key.value), *table_schema));
|
||||
records.push_back(record{
|
||||
.sstable_name = sst_name,
|
||||
.cell_size = static_cast<int64_t>(rec.value),
|
||||
.partition_key = key_to_str(pk, *table_schema),
|
||||
.clustering_key = std::move(ck_str),
|
||||
.column_name = disk_string_to_sstring(rec.column_name),
|
||||
.collection_elements = rec.elements_count > 0
|
||||
? static_cast<int64_t>(rec.elements_count) : int64_t(-1),
|
||||
.compaction_time = compaction_time,
|
||||
});
|
||||
}
|
||||
}
|
||||
co_return records;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit large_cells_virtual_table(sharded<replica::database>& db)
|
||||
: streaming_virtual_table(build_schema())
|
||||
, _db(db)
|
||||
{
|
||||
_shard_aware = true;
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::LARGE_CELLS, 1);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::LARGE_CELLS, std::make_optional(id))
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("sstable_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("cell_size", reversed_type_impl::get_instance(long_type), column_kind::clustering_key)
|
||||
.with_column("partition_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("clustering_key", utf8_type, column_kind::clustering_key)
|
||||
.with_column("column_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("collection_elements", long_type)
|
||||
.with_column("compaction_time", timestamp_type)
|
||||
.set_comment("cells larger than specified threshold")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
dht::decorated_key make_partition_key(const sstring& ks_name, const sstring& cf_name) {
|
||||
return dht::decorate_key(*_s, partition_key::from_exploded(*_s, {
|
||||
data_value(ks_name).serialize_nonnull(),
|
||||
data_value(cf_name).serialize_nonnull()
|
||||
}));
|
||||
}
|
||||
|
||||
clustering_key make_clustering_key(const sstring& sstable_name, int64_t cell_size,
|
||||
const sstring& pk, const sstring& ck, const sstring& col_name) {
|
||||
return clustering_key::from_exploded(*_s, {
|
||||
data_value(sstable_name).serialize_nonnull(),
|
||||
reversed_type_impl::get_instance(long_type)->decompose(cell_size),
|
||||
data_value(pk).serialize_nonnull(),
|
||||
data_value(ck).serialize_nonnull(),
|
||||
data_value(col_name).serialize_nonnull()
|
||||
});
|
||||
}
|
||||
|
||||
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
|
||||
struct table_info {
|
||||
table_id tid;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
std::vector<table_info> owned_tables;
|
||||
|
||||
auto& db = _db.local();
|
||||
db.get_tables_metadata().for_each_table([&] (table_id tid, lw_shared_ptr<replica::table> table) {
|
||||
auto& ks_name = table->schema()->ks_name();
|
||||
auto& cf_name = table->schema()->cf_name();
|
||||
auto dk = make_partition_key(ks_name, cf_name);
|
||||
if (this_shard_owns(dk) && contains_key(qr.partition_range(), dk)) {
|
||||
owned_tables.push_back(table_info{tid, std::move(dk)});
|
||||
}
|
||||
});
|
||||
|
||||
std::ranges::sort(owned_tables, dht::ring_position_less_comparator(*_s),
|
||||
std::mem_fn(&table_info::dk));
|
||||
|
||||
static constexpr size_t page_size = 1000;
|
||||
size_t emitted = 0;
|
||||
|
||||
for (auto& ti : owned_tables) {
|
||||
auto per_shard = co_await _db.map([tid = ti.tid] (replica::database& db) -> future<std::vector<record>> {
|
||||
if (auto table = db.get_tables_metadata().get_table_if_exists(tid)) {
|
||||
co_return co_await collect_local_records(*table);
|
||||
}
|
||||
co_return std::vector<record>{};
|
||||
});
|
||||
std::vector<record> records;
|
||||
for (auto& shard_records : per_shard) {
|
||||
records.insert(records.end(),
|
||||
std::make_move_iterator(shard_records.begin()),
|
||||
std::make_move_iterator(shard_records.end()));
|
||||
}
|
||||
if (records.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::ranges::sort(records, [] (const record& a, const record& b) {
|
||||
if (a.sstable_name != b.sstable_name) {
|
||||
return a.sstable_name < b.sstable_name;
|
||||
} else if (a.cell_size != b.cell_size) {
|
||||
return a.cell_size > b.cell_size; // DESC
|
||||
} else if (a.partition_key != b.partition_key) {
|
||||
return a.partition_key < b.partition_key;
|
||||
} else if (a.clustering_key != b.clustering_key) {
|
||||
return a.clustering_key < b.clustering_key;
|
||||
}
|
||||
return a.column_name < b.column_name;
|
||||
});
|
||||
// A collection cell that exceeds both the size and the
|
||||
// element-count thresholds produces two LargeDataRecords
|
||||
// entries that map to identical clustering keys. Remove the
|
||||
// duplicates so that every emitted row has a unique key.
|
||||
records.erase(std::unique(records.begin(), records.end(), [] (const record& a, const record& b) {
|
||||
return a.sstable_name == b.sstable_name
|
||||
&& a.cell_size == b.cell_size
|
||||
&& a.partition_key == b.partition_key
|
||||
&& a.clustering_key == b.clustering_key
|
||||
&& a.column_name == b.column_name;
|
||||
}), records.end());
|
||||
co_await result.emit_partition_start(ti.dk);
|
||||
for (const auto& rec : records) {
|
||||
clustering_row cr(make_clustering_key(rec.sstable_name, rec.cell_size,
|
||||
rec.partition_key, rec.clustering_key, rec.column_name));
|
||||
if (rec.compaction_time != 0) {
|
||||
set_cell(cr.cells(), "compaction_time", millis_to_db_clock(rec.compaction_time));
|
||||
}
|
||||
if (rec.collection_elements >= 0) {
|
||||
set_cell(cr.cells(), "collection_elements", rec.collection_elements);
|
||||
}
|
||||
co_await result.emit_row(std::move(cr));
|
||||
}
|
||||
co_await result.emit_partition_end();
|
||||
emitted += records.size();
|
||||
if (emitted >= page_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Helper: register a virtual table on the local shard.
|
||||
static future<> add_virtual_table(
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<replica::database>& dist_db,
|
||||
sharded<service::storage_service>& dist_ss,
|
||||
std::unique_ptr<virtual_table>&& tbl) {
|
||||
auto& virtual_tables = *sys_ks.local().get_virtual_tables_registry();
|
||||
auto& db = dist_db.local();
|
||||
auto& ss = dist_ss.local();
|
||||
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
}
|
||||
|
||||
future<> initialize_virtual_tables(
|
||||
@@ -1469,41 +2061,86 @@ future<> initialize_virtual_tables(
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<service::tablet_allocator>& tablet_allocator,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
db::config& cfg) {
|
||||
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
|
||||
auto& virtual_tables = *virtual_tables_registry;
|
||||
auto& db = dist_db.local();
|
||||
auto& ss = dist_ss.local();
|
||||
db::config& cfg,
|
||||
gms::feature_service& feat) {
|
||||
co_await smp::invoke_on_all([&] () -> future<> {
|
||||
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss, std::move(tbl));
|
||||
};
|
||||
|
||||
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
co_await db.create_local_system_table(schema, false, ss.get_erm_factory());
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
auto& db = dist_db.local();
|
||||
|
||||
// Add built-in virtual tables here.
|
||||
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
|
||||
co_await add_table(std::make_unique<token_ring_table>(db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<snapshots_table>(dist_db));
|
||||
co_await add_table(std::make_unique<protocol_servers_table>(dist_ss.local()));
|
||||
co_await add_table(std::make_unique<runtime_info_table>(dist_db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<versions_table>());
|
||||
co_await add_table(std::make_unique<db_config_table>(cfg));
|
||||
co_await add_table(std::make_unique<clients_table>(dist_ss.local()));
|
||||
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
|
||||
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
|
||||
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
|
||||
co_await add_table(std::make_unique<cdc_timestamps_table>(db, dist_ss.local()));
|
||||
co_await add_table(std::make_unique<cdc_streams_table>(db, dist_ss.local()));
|
||||
|
||||
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
|
||||
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
|
||||
});
|
||||
|
||||
// Drop old physical system.large_* tables and register virtual
|
||||
// replacements. Must run on shard 0 (uses cross-shard operations).
|
||||
auto activate_large_data_virtual_tables = [&dist_db, &sys_ks, &dist_ss] () -> future<> {
|
||||
// Drop old physical system.large_* tables. Their data is now
|
||||
// served from SSTable metadata via virtual tables.
|
||||
// TODO: In a follow-up, read existing large data entries from
|
||||
// the old tables and populate per-sstable LargeDataRecords
|
||||
// metadata via component_rewrite before dropping.
|
||||
for (auto table_name : {
|
||||
db::system_keyspace::LARGE_PARTITIONS,
|
||||
db::system_keyspace::LARGE_ROWS,
|
||||
db::system_keyspace::LARGE_CELLS}) {
|
||||
try {
|
||||
co_await replica::database::legacy_drop_table_on_all_shards(
|
||||
dist_db, sys_ks, db::system_keyspace::NAME, table_name, false);
|
||||
} catch (const replica::no_such_column_family&) {
|
||||
// Already dropped (e.g. restart after feature was enabled).
|
||||
}
|
||||
}
|
||||
|
||||
// Add virtual tables on all shards.
|
||||
co_await smp::invoke_on_all([&dist_db, &sys_ks, &dist_ss] () -> future<> {
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_partitions_virtual_table>(dist_db));
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_rows_virtual_table>(dist_db));
|
||||
co_await add_virtual_table(sys_ks, dist_db, dist_ss,
|
||||
std::make_unique<large_cells_virtual_table>(dist_db));
|
||||
});
|
||||
};
|
||||
|
||||
// Add built-in virtual tables here.
|
||||
co_await add_table(std::make_unique<cluster_status_table>(dist_ss, dist_gossiper));
|
||||
co_await add_table(std::make_unique<token_ring_table>(db, ss));
|
||||
co_await add_table(std::make_unique<snapshots_table>(dist_db));
|
||||
co_await add_table(std::make_unique<protocol_servers_table>(ss));
|
||||
co_await add_table(std::make_unique<runtime_info_table>(dist_db, ss));
|
||||
co_await add_table(std::make_unique<versions_table>());
|
||||
co_await add_table(std::make_unique<db_config_table>(cfg));
|
||||
co_await add_table(std::make_unique<clients_table>(ss));
|
||||
co_await add_table(std::make_unique<raft_state_table>(dist_raft_gr));
|
||||
co_await add_table(std::make_unique<load_per_node>(tablet_allocator, dist_db, dist_raft_gr, ms, dist_gossiper));
|
||||
co_await add_table(std::make_unique<tablet_sizes>(tablet_allocator, dist_db, dist_raft_gr, ms));
|
||||
co_await add_table(std::make_unique<cdc_timestamps_table>(db, ss));
|
||||
co_await add_table(std::make_unique<cdc_streams_table>(db, ss));
|
||||
|
||||
db.find_column_family(system_keyspace::size_estimates()).set_virtual_reader(mutation_source(db::size_estimates::virtual_reader(db, sys_ks.local())));
|
||||
db.find_column_family(system_keyspace::views_builds_in_progress()).set_virtual_reader(mutation_source(db::view::build_progress_virtual_reader(db)));
|
||||
db.find_column_family(system_keyspace::built_indexes()).set_virtual_reader(mutation_source(db::index::built_indexes_virtual_reader(db)));
|
||||
if (feat.large_data_virtual_tables) {
|
||||
// Feature already enabled (e.g. test environment or restart after
|
||||
// upgrade). Activate directly as a coroutine — no seastar::async
|
||||
// context needed.
|
||||
co_await activate_large_data_virtual_tables();
|
||||
} else {
|
||||
// Feature not yet enabled. Register a callback that will fire
|
||||
// when the feature is enabled during rolling upgrade. The callback
|
||||
// runs inside seastar::async context (via feature_service::enable),
|
||||
// so .get() is safe.
|
||||
//
|
||||
// The listener_registration must outlive the feature, so we store
|
||||
// it in a static variable (process lifetime). This function is
|
||||
// only called on shard 0, so the listener fires only on shard 0.
|
||||
static gms::feature::listener_registration large_data_vt_listener;
|
||||
large_data_vt_listener = feat.large_data_virtual_tables.when_enabled(
|
||||
[activate_large_data_virtual_tables = std::move(activate_large_data_virtual_tables)] {
|
||||
activate_large_data_virtual_tables().get();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
virtual_tables_registry::virtual_tables_registry() : unique_ptr(std::make_unique<virtual_tables_registry_impl>()) {
|
||||
|
||||
@@ -24,6 +24,7 @@ class tablet_allocator;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
class feature_service;
|
||||
class gossiper;
|
||||
}
|
||||
|
||||
@@ -44,7 +45,8 @@ future<> initialize_virtual_tables(
|
||||
sharded<db::system_keyspace>&,
|
||||
sharded<service::tablet_allocator>&,
|
||||
sharded<netw::messaging_service>&,
|
||||
db::config&);
|
||||
db::config&,
|
||||
gms::feature_service&);
|
||||
|
||||
|
||||
class virtual_table;
|
||||
|
||||
@@ -33,6 +33,7 @@ in individual sections
|
||||
| ext_timestamp_stats
|
||||
| schema
|
||||
| components_digests
|
||||
| large_data_records
|
||||
|
||||
`sharding_metadata` (tag 1): describes what token sub-ranges are included in this
|
||||
sstable. This is used, when loading the sstable, to determine which shard(s)
|
||||
@@ -80,6 +81,11 @@ all SSTable component files that are checksummed during write. Each entry maps a
|
||||
type (e.g., Data, Index, Filter, Statistics, etc.) to its CRC32 checksum. This allows
|
||||
verifying the integrity of individual component files.
|
||||
|
||||
`large_data_records` (tag 13): an `array<large_data_record>` with the top-N individual large
|
||||
data entries (partitions, rows, cells) found during the sstable write. Unlike `large_data_stats`
|
||||
which only stores aggregate statistics, this records the actual keys and sizes so they survive
|
||||
tablet/shard migration.
|
||||
|
||||
The [scylla sstable dump-scylla-metadata](https://github.com/scylladb/scylladb/blob/master/docs/operating-scylla/admin-tools/scylla-sstable.rst#dump-scylla-metadata) tool
|
||||
can be used to dump the scylla metadata in JSON format.
|
||||
|
||||
@@ -203,3 +209,35 @@ in the statistics component, which lacks column names and other metadata. Unlike
|
||||
the full schema stored in the system schema tables, it is not intended to be
|
||||
comprehensive, but it contains enough information for tools like scylla-sstable
|
||||
to parse an sstable in a self-sufficient manner.
|
||||
|
||||
## large_data_records subcomponent
|
||||
|
||||
large_data_records = record_count large_data_record*
|
||||
record_count = be32
|
||||
large_data_record = large_data_type partition_key clustering_key column_name value elements_count range_tombstones dead_rows
|
||||
large_data_type = be32 // same enum as in large_data_stats
|
||||
partition_key = string32 // binary serialized partition key (sstables::key::get_bytes())
|
||||
clustering_key = string32 // binary serialized clustering key (clustering_key_prefix::representation()), empty if N/A
|
||||
column_name = string32 // column name as text, empty for partition/row entries
|
||||
value = be64 // size in bytes (partition, row, or cell size depending on type)
|
||||
elements_count = be64 // type-dependent element count (see below)
|
||||
range_tombstones = be64 // number of range tombstones (partition_size records only, 0 otherwise)
|
||||
dead_rows = be64 // number of dead rows (partition_size records only, 0 otherwise)
|
||||
string32 = string32_size byte*
|
||||
string32_size = be32
|
||||
|
||||
The large_data_records component holds individual top-N large data entries
|
||||
(partitions, rows, cells) found during the sstable write. Unlike large_data_stats,
|
||||
which only stores aggregate per-type statistics (max value, threshold, count above
|
||||
threshold), large_data_records preserves the actual partition key, clustering key,
|
||||
column name, and size for each above-threshold entry. This information is embedded
|
||||
in the sstable file itself and therefore survives tablet/shard migration.
|
||||
|
||||
The elements_count field carries a type-dependent element count:
|
||||
|
||||
- For partition_size and rows_in_partition records: number of rows in the partition
|
||||
- For cell_size and elements_in_collection records: number of elements in the collection (0 for non-collection cells)
|
||||
- For row_size records: 0
|
||||
|
||||
The range_tombstones and dead_rows fields are meaningful only for
|
||||
partition_size records and are zero for all other record types.
|
||||
|
||||
@@ -524,6 +524,7 @@ The content is dumped in JSON, using the following schema:
|
||||
"scylla_version": String
|
||||
"ext_timestamp_stats": {"$key": int64, ...}
|
||||
"sstable_identifier": String, // UUID
|
||||
"large_data_records": [$LARGE_DATA_RECORD, ...]
|
||||
}
|
||||
|
||||
$SHARDING_METADATA := {
|
||||
@@ -548,6 +549,17 @@ The content is dumped in JSON, using the following schema:
|
||||
"above_threshold": Uint
|
||||
}
|
||||
|
||||
$LARGE_DATA_RECORD := {
|
||||
"type": String, // large_data_type name
|
||||
"partition_key": String, // human-readable partition key (decoded from binary)
|
||||
"clustering_key": String, // human-readable clustering key (decoded from binary), empty if N/A
|
||||
"column_name": String, // column name, empty for partition/row entries
|
||||
"value": Uint64, // size in bytes (partition, row, or cell size depending on type)
|
||||
"elements_count": Uint64, // rows (partition_size, rows_in_partition) or collection elements (cell_size, elements_in_collection), 0 for row_size
|
||||
"range_tombstones": Uint64, // range tombstones (partition_size records only, 0 otherwise)
|
||||
"dead_rows": Uint64 // dead rows (partition_size records only, 0 otherwise)
|
||||
}
|
||||
|
||||
dump-schema
|
||||
^^^^^^^^^^^
|
||||
|
||||
|
||||
@@ -128,6 +128,79 @@ Expiring Data
|
||||
|
||||
In order to prevent stale data from appearing, all rows in ``system.large_partitions`` table are inserted with Time To Live (TTL) equal to 30 days.
|
||||
|
||||
|
||||
.. _large-data-virtual-tables:
|
||||
|
||||
Virtual Tables (``LARGE_DATA_VIRTUAL_TABLES`` Feature)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Starting with ScyllaDB 2026.2, the ``system.large_partitions``,
|
||||
``system.large_rows``, and ``system.large_cells`` tables are implemented as
|
||||
**virtual tables** that read directly from metadata stored inside each SSTable
|
||||
rather than from separate physical system tables.
|
||||
|
||||
How it works
|
||||
""""""""""""
|
||||
|
||||
When an SSTable is written (during flush or compaction), the SSTable writer
|
||||
records the top-N above-threshold large data entries into a
|
||||
``LargeDataRecords`` component in the SSTable's ``.Scylla`` metadata file.
|
||||
The virtual tables query this metadata from all live SSTables at read time,
|
||||
so large data records automatically follow their SSTables when they are
|
||||
migrated between shards or nodes (e.g. during tablet migration or repair).
|
||||
|
||||
The maximum number of records stored per large data type per SSTable is
|
||||
controlled by the ``compaction_large_data_records_per_sstable`` configuration
|
||||
option (default: 10).
|
||||
|
||||
Activation
|
||||
""""""""""
|
||||
|
||||
The transition is gated by the ``LARGE_DATA_VIRTUAL_TABLES`` cluster feature
|
||||
flag, which is automatically enabled once all nodes in the cluster are
|
||||
upgraded to a version that supports it.
|
||||
|
||||
When the feature is enabled on a node:
|
||||
|
||||
1. The old physical ``system.large_partitions``, ``system.large_rows``, and
|
||||
``system.large_cells`` tables are dropped.
|
||||
2. Virtual table replacements are registered in their place.
|
||||
3. New SSTable writes stop populating the old physical tables.
|
||||
|
||||
During a rolling upgrade, as long as any node in the cluster has not been
|
||||
upgraded, the feature remains disabled and all nodes continue writing to
|
||||
the old physical tables. This ensures safe rollback if the upgrade needs
|
||||
to be reverted.
|
||||
|
||||
Upgrade considerations
|
||||
""""""""""""""""""""""
|
||||
|
||||
After the ``LARGE_DATA_VIRTUAL_TABLES`` feature is enabled, the virtual tables
|
||||
derive their content from ``LargeDataRecords`` metadata stored in SSTables.
|
||||
SSTables written by older versions do not contain this metadata. As a result,
|
||||
**the virtual tables may appear empty until those older SSTables are rewritten**
|
||||
by compaction.
|
||||
|
||||
To populate the virtual tables promptly after upgrade, run one of the following
|
||||
on each node:
|
||||
|
||||
* **Upgrade SSTables compaction** (recommended -- rewrites SSTables without
|
||||
waiting for normal compaction triggers):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool upgradesstables --include-all-sstables
|
||||
|
||||
* **Major compaction** (rewrites all SSTables into a single set, but may
|
||||
temporarily increase disk usage):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool compact
|
||||
|
||||
Once the SSTables have been rewritten, the virtual tables will show the
|
||||
current large data records.
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
|
||||
Additional Resources
|
||||
|
||||
@@ -170,4 +170,12 @@ Expiring Data
|
||||
^^^^^^^^^^^^^
|
||||
In order to prevent stale data from appearing, all rows in the ``system.large_rows`` and ``system.large_cells`` tables are inserted with Time To Live (TTL) equal to 30 days.
|
||||
|
||||
Virtual Tables
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
Starting with ScyllaDB 2026.2, ``system.large_rows`` and ``system.large_cells``
|
||||
are implemented as virtual tables that read from SSTable metadata. See
|
||||
:ref:`large-data-virtual-tables` for details on the
|
||||
``LARGE_DATA_VIRTUAL_TABLES`` feature and upgrade considerations.
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
|
||||
@@ -181,6 +181,7 @@ public:
|
||||
gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv };
|
||||
gms::feature writetime_ttl_individual_element { *this, "WRITETIME_TTL_INDIVIDUAL_ELEMENT"sv };
|
||||
gms::feature arbitrary_tablet_boundaries { *this, "ARBITRARY_TABLET_BOUNDARIES"sv };
|
||||
gms::feature large_data_virtual_tables { *this, "LARGE_DATA_VIRTUAL_TABLES"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -932,6 +932,13 @@ struct fmt::formatter<clustering_key_prefix::with_schema_wrapper> : fmt::formatt
|
||||
}
|
||||
};
|
||||
|
||||
// Convert a key (partition_key, clustering_key_prefix, etc.) to a human-readable
|
||||
// string using its schema.
|
||||
template <typename T>
|
||||
std::string key_to_str(const T& key, const schema& s) {
|
||||
return fmt::to_string(key.with_schema(s));
|
||||
}
|
||||
|
||||
template<>
|
||||
struct appending_hash<partition_key_view> {
|
||||
template<typename Hasher>
|
||||
|
||||
4
main.cc
4
main.cc
@@ -1964,9 +1964,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing virtual tables");
|
||||
smp::invoke_on_all([&] {
|
||||
return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg);
|
||||
}).get();
|
||||
db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg, feature_service.local()).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
|
||||
@@ -382,6 +382,7 @@ static auto configure_sstables_manager(const db::config& cfg, const database_con
|
||||
.memory_reclaim_threshold = cfg.components_memory_reclaim_threshold,
|
||||
.data_file_directories = cfg.data_file_directories(),
|
||||
.format = cfg.sstable_format,
|
||||
.large_data_records_per_sstable = cfg.compaction_large_data_records_per_sstable,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -1306,9 +1306,20 @@ bool operator==(const column_definition& x, const column_definition& y)
|
||||
}
|
||||
|
||||
// Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId
|
||||
//
|
||||
// The version suffix (e.g. "_v1") allows creating a table with a
|
||||
// different UUID than the original (version 0). This is used when
|
||||
// replacing a physical table with a virtual table of the same name:
|
||||
// the virtual table uses version 1 so its UUID differs from the old
|
||||
// physical table, allowing the old table to be properly dropped by
|
||||
// UUID before the replacement is registered.
|
||||
table_id
|
||||
generate_legacy_id(const sstring& ks_name, const sstring& cf_name) {
|
||||
return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name));
|
||||
generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version) {
|
||||
auto name = ks_name + cf_name;
|
||||
if (version > 0) {
|
||||
name += fmt::format("_v{}", version);
|
||||
}
|
||||
return table_id(utils::UUID_gen::get_name_UUID(name));
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::set_compaction_strategy_options(std::map<sstring, sstring>&& options) {
|
||||
|
||||
@@ -1075,7 +1075,7 @@ public:
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const view_ptr& view);
|
||||
|
||||
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name);
|
||||
table_id generate_legacy_id(const sstring& ks_name, const sstring& cf_name, unsigned version = 0);
|
||||
|
||||
|
||||
// Thrown when attempted to access a schema-dependent object using
|
||||
|
||||
@@ -20,8 +20,10 @@
|
||||
#include "utils/exceptions.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "db/corrupt_data_handler.hh"
|
||||
#include "keys/keys.hh"
|
||||
|
||||
#include <functional>
|
||||
#include <queue>
|
||||
#include <boost/iterator/iterator_facade.hpp>
|
||||
#include <boost/container/static_vector.hpp>
|
||||
|
||||
@@ -626,6 +628,46 @@ private:
|
||||
large_data_stats_entry _cell_size_entry;
|
||||
large_data_stats_entry _elements_in_collection_entry;
|
||||
|
||||
// Bounded min-heaps for top-N large data records, one per large_data_type.
|
||||
// Size-type heaps (partition_size, row_size, cell_size) compare by `value`;
|
||||
// element-count-type heaps (rows_in_partition, elements_in_collection)
|
||||
// compare by `elements_count`.
|
||||
struct large_data_record_cmp_by_value {
|
||||
bool operator()(const large_data_record& a, const large_data_record& b) const {
|
||||
return a.value > b.value; // min-heap: smallest value on top
|
||||
}
|
||||
};
|
||||
struct large_data_record_cmp_by_elements {
|
||||
bool operator()(const large_data_record& a, const large_data_record& b) const {
|
||||
return a.elements_count > b.elements_count; // min-heap: smallest elements_count on top
|
||||
}
|
||||
};
|
||||
using ld_size_heap = std::priority_queue<large_data_record, std::vector<large_data_record>, large_data_record_cmp_by_value>;
|
||||
using ld_elements_heap = std::priority_queue<large_data_record, std::vector<large_data_record>, large_data_record_cmp_by_elements>;
|
||||
ld_size_heap _ld_partition_size_records;
|
||||
ld_elements_heap _ld_rows_in_partition_records;
|
||||
ld_size_heap _ld_row_size_records;
|
||||
ld_size_heap _ld_cell_size_records;
|
||||
ld_elements_heap _ld_elements_in_collection_records;
|
||||
|
||||
// Insert a record into a bounded min-heap, keeping at most N entries.
|
||||
// Uses the heap's own comparator to decide eviction: since the comparator
|
||||
// defines a min-heap (smallest on top), comp(rec, top) is true when rec
|
||||
// is "greater" than top in the heap's ordering, meaning it should replace it.
|
||||
template <typename Heap>
|
||||
void insert_into_ld_heap(Heap& heap, large_data_record rec) {
|
||||
auto max_records = _cfg.large_data_records_per_sstable;
|
||||
if (max_records == 0) {
|
||||
return;
|
||||
}
|
||||
if (heap.size() < max_records) {
|
||||
heap.push(std::move(rec));
|
||||
} else if (typename Heap::value_compare{}(rec, heap.top())) {
|
||||
heap.pop();
|
||||
heap.push(std::move(rec));
|
||||
}
|
||||
}
|
||||
|
||||
void init_file_writers();
|
||||
|
||||
// Returns the closed writer
|
||||
@@ -1098,7 +1140,48 @@ void writer::maybe_record_large_partitions(const sstables::sstable& sst, const s
|
||||
row_count_entry.max_value = std::max(row_count_entry.max_value, rows);
|
||||
auto ret = _sst.get_large_data_handler().maybe_record_large_partitions(sst, partition_key, partition_size, rows, range_rombstones, dead_rows).get();
|
||||
size_entry.above_threshold += unsigned(bool(ret.size));
|
||||
row_count_entry.above_threshold += unsigned(bool(ret.rows));
|
||||
row_count_entry.above_threshold += unsigned(bool(ret.elements));
|
||||
|
||||
auto trace_log = [&] (large_data_type type) {
|
||||
slogger.trace("Detected large partition: sstable={}, partition_key={}, record_type={}, partition_size={}, rows={}, range_tombstones={}, dead_rows={}",
|
||||
_sst.component_basename(component_type::Data),
|
||||
key_to_str(partition_key.to_partition_key(_schema), _schema),
|
||||
type,
|
||||
partition_size,
|
||||
rows,
|
||||
range_rombstones,
|
||||
dead_rows);
|
||||
};
|
||||
|
||||
// Populate top-N large data records into separate per-type heaps.
|
||||
if (ret.size) {
|
||||
trace_log(large_data_type::partition_size);
|
||||
const auto& pk_bytes = partition_key.get_bytes();
|
||||
insert_into_ld_heap(_ld_partition_size_records, large_data_record{
|
||||
.type = large_data_type::partition_size,
|
||||
.partition_key = disk_string<uint32_t>{bytes(pk_bytes)},
|
||||
.clustering_key = disk_string<uint32_t>{bytes()},
|
||||
.column_name = disk_string<uint32_t>{bytes()},
|
||||
.value = partition_size,
|
||||
.elements_count = rows,
|
||||
.range_tombstones = range_rombstones,
|
||||
.dead_rows = dead_rows,
|
||||
});
|
||||
}
|
||||
if (ret.elements) {
|
||||
trace_log(large_data_type::rows_in_partition);
|
||||
const auto& pk_bytes = partition_key.get_bytes();
|
||||
insert_into_ld_heap(_ld_rows_in_partition_records, large_data_record{
|
||||
.type = large_data_type::rows_in_partition,
|
||||
.partition_key = disk_string<uint32_t>{bytes(pk_bytes)},
|
||||
.clustering_key = disk_string<uint32_t>{bytes()},
|
||||
.column_name = disk_string<uint32_t>{bytes()},
|
||||
.value = partition_size,
|
||||
.elements_count = rows,
|
||||
.range_tombstones = range_rombstones,
|
||||
.dead_rows = dead_rows,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
@@ -1109,6 +1192,22 @@ void writer::maybe_record_large_rows(const sstables::sstable& sst, const sstable
|
||||
}
|
||||
if (_sst.get_large_data_handler().maybe_record_large_rows(sst, partition_key, clustering_key, row_size).get()) {
|
||||
entry.above_threshold++;
|
||||
|
||||
slogger.trace("Detected large row: sstable={}, partition_key={}, clustering_key={}, record_type={}, row_size={}",
|
||||
_sst.component_basename(component_type::Data),
|
||||
key_to_str(partition_key.to_partition_key(_schema), _schema),
|
||||
clustering_key ? key_to_str(*clustering_key, _schema) : "",
|
||||
large_data_type::row_size,
|
||||
row_size);
|
||||
const auto& pk_bytes = partition_key.get_bytes();
|
||||
auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes();
|
||||
insert_into_ld_heap(_ld_row_size_records, large_data_record{
|
||||
.type = large_data_type::row_size,
|
||||
.partition_key = disk_string<uint32_t>{bytes(pk_bytes)},
|
||||
.clustering_key = disk_string<uint32_t>{std::move(ck_bytes)},
|
||||
.column_name = disk_string<uint32_t>{bytes()},
|
||||
.value = row_size,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1122,14 +1221,54 @@ void writer::maybe_record_large_cells(const sstables::sstable& sst, const sstabl
|
||||
if (collection_elements_entry.max_value < collection_elements) {
|
||||
collection_elements_entry.max_value = collection_elements;
|
||||
}
|
||||
if (_sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size, collection_elements).get()) {
|
||||
if (cell_size > cell_size_entry.threshold) {
|
||||
cell_size_entry.above_threshold++;
|
||||
}
|
||||
if (collection_elements > collection_elements_entry.threshold) {
|
||||
collection_elements_entry.above_threshold++;
|
||||
}
|
||||
auto ret = _sst.get_large_data_handler().maybe_record_large_cells(_sst, *_partition_key, clustering_key, cdef, cell_size, collection_elements).get();
|
||||
if (ret.size) {
|
||||
cell_size_entry.above_threshold++;
|
||||
}
|
||||
if (ret.elements) {
|
||||
collection_elements_entry.above_threshold++;
|
||||
}
|
||||
|
||||
auto trace_log = [&] (large_data_type type) {
|
||||
slogger.trace("Detected large cell: sstable={}, partition_key={}, clustering_key={}, column={}, record_type={}, cell_size={}, collection_elements={}",
|
||||
_sst.component_basename(component_type::Data),
|
||||
key_to_str(partition_key.to_partition_key(_schema), _schema),
|
||||
clustering_key ? key_to_str(*clustering_key, _schema) : "",
|
||||
cdef.name_as_text(),
|
||||
type,
|
||||
cell_size,
|
||||
collection_elements);
|
||||
};
|
||||
|
||||
// Populate top-N large data records into separate per-type heaps.
|
||||
if (ret.size) {
|
||||
trace_log(large_data_type::cell_size);
|
||||
const auto& pk_bytes = partition_key.get_bytes();
|
||||
auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes();
|
||||
auto col_name = cdef.name_as_text();
|
||||
insert_into_ld_heap(_ld_cell_size_records, large_data_record{
|
||||
.type = large_data_type::cell_size,
|
||||
.partition_key = disk_string<uint32_t>{bytes(pk_bytes)},
|
||||
.clustering_key = disk_string<uint32_t>{std::move(ck_bytes)},
|
||||
.column_name = disk_string<uint32_t>{to_bytes(col_name)},
|
||||
.value = cell_size,
|
||||
.elements_count = collection_elements,
|
||||
});
|
||||
}
|
||||
if (ret.elements) {
|
||||
trace_log(large_data_type::elements_in_collection);
|
||||
const auto& pk_bytes = partition_key.get_bytes();
|
||||
auto ck_bytes = clustering_key ? clustering_key->view().representation().linearize() : bytes();
|
||||
auto col_name = cdef.name_as_text();
|
||||
insert_into_ld_heap(_ld_elements_in_collection_records, large_data_record{
|
||||
.type = large_data_type::elements_in_collection,
|
||||
.partition_key = disk_string<uint32_t>{bytes(pk_bytes)},
|
||||
.clustering_key = disk_string<uint32_t>{std::move(ck_bytes)},
|
||||
.column_name = disk_string<uint32_t>{to_bytes(col_name)},
|
||||
.value = cell_size,
|
||||
.elements_count = collection_elements,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
|
||||
@@ -1719,7 +1858,32 @@ void writer::consume_end_of_stream() {
|
||||
std::optional<scylla_metadata::ext_timestamp_stats> ts_stats(scylla_metadata::ext_timestamp_stats{
|
||||
.map = _collector.get_ext_timestamp_stats()
|
||||
});
|
||||
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats));
|
||||
// Drain all per-type min-heaps into a single large_data_records array.
|
||||
std::optional<scylla_metadata::large_data_records> ld_records;
|
||||
{
|
||||
utils::chunked_vector<large_data_record> records;
|
||||
auto drain_size_heap = [&records](ld_size_heap& heap) {
|
||||
while (!heap.empty()) {
|
||||
records.push_back(std::move(const_cast<large_data_record&>(heap.top())));
|
||||
heap.pop();
|
||||
}
|
||||
};
|
||||
auto drain_elements_heap = [&records](ld_elements_heap& heap) {
|
||||
while (!heap.empty()) {
|
||||
records.push_back(std::move(const_cast<large_data_record&>(heap.top())));
|
||||
heap.pop();
|
||||
}
|
||||
};
|
||||
drain_size_heap(_ld_partition_size_records);
|
||||
drain_elements_heap(_ld_rows_in_partition_records);
|
||||
drain_size_heap(_ld_row_size_records);
|
||||
drain_size_heap(_ld_cell_size_records);
|
||||
drain_elements_heap(_ld_elements_in_collection_records);
|
||||
if (!records.empty()) {
|
||||
ld_records = scylla_metadata::large_data_records{.elements = std::move(records)};
|
||||
}
|
||||
}
|
||||
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats), std::move(ld_records));
|
||||
if (!_cfg.leave_unsealed) {
|
||||
_sst.seal_sstable(_cfg.backup).get();
|
||||
}
|
||||
|
||||
@@ -1664,6 +1664,10 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept {
|
||||
if (ld_stats) {
|
||||
_large_data_stats.emplace(*ld_stats);
|
||||
}
|
||||
auto* ld_records = _components->scylla_metadata->data.get<scylla_metadata_type::LargeDataRecords, scylla_metadata::large_data_records>();
|
||||
if (ld_records) {
|
||||
_large_data_records.emplace(*ld_records);
|
||||
}
|
||||
auto* origin = _components->scylla_metadata->data.get<scylla_metadata_type::SSTableOrigin, scylla_metadata::sstable_origin>();
|
||||
if (origin) {
|
||||
_origin = sstring(to_string_view(bytes_view(origin->value)));
|
||||
@@ -2295,7 +2299,8 @@ static sstable_column_kind to_sstable_column_kind(column_kind k) {
|
||||
|
||||
void
|
||||
sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
std::optional<scylla_metadata::large_data_stats> ld_stats, std::optional<scylla_metadata::ext_timestamp_stats> ts_stats) {
|
||||
std::optional<scylla_metadata::large_data_stats> ld_stats, std::optional<scylla_metadata::ext_timestamp_stats> ts_stats,
|
||||
std::optional<scylla_metadata::large_data_records> ld_records) {
|
||||
auto&& first_key = get_first_decorated_key();
|
||||
auto&& last_key = get_last_decorated_key();
|
||||
|
||||
@@ -2318,6 +2323,9 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
if (ld_stats) {
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::LargeDataStats>(std::move(*ld_stats));
|
||||
}
|
||||
if (ld_records) {
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::LargeDataRecords>(std::move(*ld_records));
|
||||
}
|
||||
if (!_origin.empty()) {
|
||||
scylla_metadata::sstable_origin o;
|
||||
o.value = bytes(to_bytes_view(std::string_view(_origin)));
|
||||
|
||||
@@ -123,6 +123,7 @@ struct sstable_writer_config {
|
||||
size_t summary_byte_cost;
|
||||
sstring origin;
|
||||
bool correct_pi_block_width = true;
|
||||
uint32_t large_data_records_per_sstable = 10;
|
||||
|
||||
private:
|
||||
explicit sstable_writer_config() {}
|
||||
@@ -628,6 +629,7 @@ private:
|
||||
// It can be disengaged normally when loading legacy sstables that do not have this
|
||||
// information in their scylla metadata.
|
||||
std::optional<scylla_metadata::large_data_stats> _large_data_stats;
|
||||
std::optional<scylla_metadata::large_data_records> _large_data_records;
|
||||
sstring _origin;
|
||||
std::optional<scylla_metadata::ext_timestamp_stats> _ext_timestamp_stats;
|
||||
optimized_optional<sstable_id> _sstable_identifier;
|
||||
@@ -708,7 +710,8 @@ private:
|
||||
void write_scylla_metadata(shard_id shard,
|
||||
run_identifier identifier,
|
||||
std::optional<scylla_metadata::large_data_stats> ld_stats,
|
||||
std::optional<scylla_metadata::ext_timestamp_stats> ts_stats);
|
||||
std::optional<scylla_metadata::ext_timestamp_stats> ts_stats,
|
||||
std::optional<scylla_metadata::large_data_records> ld_records = std::nullopt);
|
||||
|
||||
future<> read_filter(sstable_open_config cfg = {});
|
||||
|
||||
@@ -1092,6 +1095,12 @@ public:
|
||||
// the map. Otherwise, return a disengaged optional.
|
||||
std::optional<large_data_stats_entry> get_large_data_stat(large_data_type t) const noexcept;
|
||||
|
||||
// Return the large_data_records stored in scylla_metadata, if present.
|
||||
// Absent on legacy SSTables that predate LargeDataRecords support.
|
||||
const std::optional<scylla_metadata::large_data_records>& get_large_data_records() const noexcept {
|
||||
return _large_data_records;
|
||||
}
|
||||
|
||||
// Return the extended timestamp statistics map.
|
||||
// Some or all entries may be missing if not present in scylla_metadata
|
||||
scylla_metadata::ext_timestamp_stats::map_type get_ext_timestamp_stats() const noexcept;
|
||||
|
||||
@@ -226,6 +226,7 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const {
|
||||
cfg.summary_byte_cost = summary_byte_cost(_config.sstable_summary_ratio);
|
||||
|
||||
cfg.origin = std::move(origin);
|
||||
cfg.large_data_records_per_sstable = _config.large_data_records_per_sstable();
|
||||
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ public:
|
||||
utils::updateable_value<double> memory_reclaim_threshold = utils::updateable_value<double>(0.2);
|
||||
const std::vector<sstring>& data_file_directories;
|
||||
utils::updateable_value<sstring> format = utils::updateable_value<sstring>(fmt::to_string(sstable_version_types::me));
|
||||
utils::updateable_value<uint32_t> large_data_records_per_sstable = utils::updateable_value<uint32_t>(10);
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
@@ -548,6 +548,7 @@ enum class scylla_metadata_type : uint32_t {
|
||||
SSTableIdentifier = 10,
|
||||
Schema = 11,
|
||||
ComponentsDigests = 12,
|
||||
LargeDataRecords = 13,
|
||||
};
|
||||
|
||||
// UUID is used for uniqueness across nodes, such that an imported sstable
|
||||
@@ -595,6 +596,31 @@ struct large_data_stats_entry {
|
||||
auto describe_type(sstable_version_types v, Describer f) { return f(max_value, threshold, above_threshold); }
|
||||
};
|
||||
|
||||
// A single top-N large data record stored in the SSTable's scylla metadata.
|
||||
// Records are written by the sstable writer and survive tablet/shard migration
|
||||
// because they live in the SSTable file itself rather than in a CQL system table.
|
||||
struct large_data_record {
|
||||
large_data_type type;
|
||||
disk_string<uint32_t> partition_key; // binary serialized partition key (sstables::key::get_bytes())
|
||||
disk_string<uint32_t> clustering_key; // binary serialized CK (clustering_key_prefix::representation()), empty if N/A
|
||||
disk_string<uint32_t> column_name; // column name as text, empty for partition/row entries
|
||||
uint64_t value; // size in bytes (partition, row, or cell size depending on type)
|
||||
// Type-dependent element count:
|
||||
// partition_size, rows_in_partition: number of rows in the partition
|
||||
// cell_size, elements_in_collection: number of elements in the collection (0 for non-collection cells)
|
||||
// row_size: 0
|
||||
uint64_t elements_count;
|
||||
// Partition-level auxiliary fields (meaningful only for partition_size records, 0 otherwise):
|
||||
uint64_t range_tombstones; // number of range tombstones in the partition
|
||||
uint64_t dead_rows; // number of dead rows in the partition
|
||||
|
||||
template <typename Describer>
|
||||
auto describe_type(sstable_version_types v, Describer f) {
|
||||
return f(type, partition_key, clustering_key, column_name, value,
|
||||
elements_count, range_tombstones, dead_rows);
|
||||
}
|
||||
};
|
||||
|
||||
// Types of extended timestamp statistics.
|
||||
//
|
||||
// Note: For extensibility, never reuse an identifier,
|
||||
@@ -639,6 +665,7 @@ struct sstable_schema_type {
|
||||
struct scylla_metadata {
|
||||
using extension_attributes = disk_hash<uint32_t, disk_string<uint32_t>, disk_string<uint32_t>>;
|
||||
using large_data_stats = disk_hash<uint32_t, large_data_type, large_data_stats_entry>;
|
||||
using large_data_records = disk_array<uint32_t, large_data_record>;
|
||||
using sstable_origin = disk_string<uint32_t>;
|
||||
using scylla_build_id = disk_string<uint32_t>;
|
||||
using scylla_version = disk_string<uint32_t>;
|
||||
@@ -659,7 +686,8 @@ struct scylla_metadata {
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ExtTimestampStats, ext_timestamp_stats>,
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::SSTableIdentifier, sstable_identifier>,
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::Schema, sstable_schema>,
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ComponentsDigests, components_digests>
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ComponentsDigests, components_digests>,
|
||||
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::LargeDataRecords, large_data_records>
|
||||
> data;
|
||||
std::optional<uint32_t> digest;
|
||||
|
||||
@@ -898,3 +926,24 @@ struct fmt::formatter<sstables::deletion_time> {
|
||||
dt.marked_for_delete_at, dt.local_deletion_time);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<sstables::large_data_type> : fmt::formatter<string_view> {
|
||||
template <typename FormatContext>
|
||||
auto format(sstables::large_data_type type, FormatContext& ctx) const {
|
||||
using enum sstables::large_data_type;
|
||||
switch (type) {
|
||||
case partition_size:
|
||||
return formatter<string_view>::format("partition_size", ctx);
|
||||
case row_size:
|
||||
return formatter<string_view>::format("row_size", ctx);
|
||||
case cell_size:
|
||||
return formatter<string_view>::format("cell_size", ctx);
|
||||
case rows_in_partition:
|
||||
return formatter<string_view>::format("rows_in_partition", ctx);
|
||||
case elements_in_collection:
|
||||
return formatter<string_view>::format("elements_in_collection", ctx);
|
||||
}
|
||||
return formatter<string_view>::format("unknown", ctx);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -90,6 +90,9 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
|
||||
// Check the only the large row is added to system.large_rows.
|
||||
BOOST_REQUIRE_EQUAL(rows.size(), 1);
|
||||
auto row0 = rows[0];
|
||||
// The result has 3 columns: partition_key, row_size, and table_name
|
||||
// (CQL adds the filtered partition key column to the result set when
|
||||
// using ALLOW FILTERING with a partial partition key restriction).
|
||||
BOOST_REQUIRE_EQUAL(row0.size(), 3);
|
||||
BOOST_REQUIRE_EQUAL(to_bytes(*row0[0]), "44");
|
||||
BOOST_REQUIRE_EQUAL(to_bytes(*row0[2]), "tbl");
|
||||
@@ -118,10 +121,11 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
|
||||
|
||||
e.execute_cql("delete from tbl where a = 44;").get();
|
||||
|
||||
// In order to guarantee that system.large_rows, system.large_cells and system.large_partitions have been updated, we have to
|
||||
// In order to guarantee that system.large_rows, system.large_cells and
|
||||
// system.large_partitions are empty, we need to:
|
||||
// * flush, so that a tombstone for the above delete is created.
|
||||
// * do a major compaction, so that the tombstone is combined with the old entry,
|
||||
// and the old sstable is deleted.
|
||||
// and the old sstable (which holds the large data records in its metadata) is deleted.
|
||||
flush(e);
|
||||
e.db().invoke_on_all([] (replica::database& dbi) {
|
||||
return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr<replica::table> t) {
|
||||
@@ -165,6 +169,71 @@ SEASTAR_THREAD_TEST_CASE(test_large_row_count_warning) {
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Test that when a partition exceeds both the size threshold and the row
|
||||
// count threshold, system.large_partitions still returns a single row
|
||||
// (not two rows with the same clustering key).
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_partitions_dual_threshold) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
// Set very low thresholds so that a single partition with a handful
|
||||
// of rows containing modest data exceeds both size and row-count
|
||||
// thresholds simultaneously.
|
||||
cfg->compaction_large_partition_warning_threshold_mb(1);
|
||||
cfg->compaction_rows_count_warning_threshold(10);
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("create table tbl (a int, b int, c text, primary key (a, b))").get();
|
||||
// Insert enough rows with enough data to exceed 1 MB partition size
|
||||
// AND more than 10 rows.
|
||||
sstring blob(128 * 1024, 'x'); // 128 KB per row
|
||||
for (int i = 0; i < 11; ++i) {
|
||||
e.execute_cql(format("insert into tbl (a, b, c) values (42, {}, '{}');", i, blob)).get();
|
||||
}
|
||||
flush(e);
|
||||
|
||||
// There must be exactly one row in system.large_partitions for
|
||||
// this table. Before the fix, two rows with the same clustering
|
||||
// key would have been emitted.
|
||||
assert_that(e.execute_cql(
|
||||
"select partition_key, rows from system.large_partitions "
|
||||
"where table_name = 'tbl' allow filtering;").get())
|
||||
.is_rows()
|
||||
.with_size(1);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
// Test that when a collection cell exceeds both the cell size threshold
|
||||
// and the collection element count threshold, system.large_cells still
|
||||
// returns a single row (not two rows with the same clustering key).
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_cells_dual_threshold) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
// Set very low thresholds so that a collection with modest elements
|
||||
// exceeds both size and element-count thresholds simultaneously.
|
||||
cfg->compaction_large_cell_warning_threshold_mb(1);
|
||||
cfg->compaction_collection_elements_count_warning_threshold(10);
|
||||
do_with_cql_env_thread([](cql_test_env& e) {
|
||||
e.execute_cql("create table tbl (a int, b list<text>, primary key (a))").get();
|
||||
// Insert 128 KB blobs, 11 times -> ~1.4 MB total, exceeding 1 MB
|
||||
// threshold. Also exceeds 10 element threshold.
|
||||
sstring blob(128 * 1024, 'x');
|
||||
for (int i = 0; i < 11; ++i) {
|
||||
e.execute_cql("update tbl set b = ['" + blob + "'] + b where a = 42;").get();
|
||||
}
|
||||
flush(e);
|
||||
|
||||
// There must be exactly one row in system.large_cells for this
|
||||
// table. Before the fix, two rows with the same clustering key
|
||||
// would have been emitted.
|
||||
assert_that(e.execute_cql(
|
||||
"select partition_key, column_name from system.large_cells "
|
||||
"where table_name = 'tbl' allow filtering;").get())
|
||||
.is_rows()
|
||||
.with_size(1);
|
||||
|
||||
return make_ready_future<>();
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
return do_with_cql_env([] (cql_test_env& e) {
|
||||
return seastar::async([&e] {
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "db/config.hh"
|
||||
#include "readers/combined.hh"
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
@@ -5091,70 +5092,34 @@ SEASTAR_TEST_CASE(test_sstable_reader_on_unknown_column) {
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct large_row_handler : public db::large_data_handler {
|
||||
using callback_t = std::function<void(const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements)>;
|
||||
callback_t callback;
|
||||
|
||||
large_row_handler(uint64_t large_rows_threshold, uint64_t rows_count_threshold, uint64_t cell_threshold_bytes, uint64_t collection_elements_threshold, callback_t callback)
|
||||
: large_data_handler(std::numeric_limits<uint64_t>::max(), large_rows_threshold, cell_threshold_bytes,
|
||||
rows_count_threshold, collection_elements_threshold)
|
||||
, callback(std::move(callback)) {
|
||||
// A large_data_handler with configurable thresholds for testing.
|
||||
// After refactoring, the handler only does threshold comparison + logging;
|
||||
// actual large data recording goes into the SSTable's LargeDataRecords metadata.
|
||||
struct large_data_test_handler : public db::large_data_handler {
|
||||
large_data_test_handler(uint64_t partition_threshold_bytes, uint64_t large_rows_threshold,
|
||||
uint64_t cell_threshold_bytes, uint64_t rows_count_threshold,
|
||||
uint64_t collection_elements_threshold)
|
||||
: large_data_handler(partition_threshold_bytes, large_rows_threshold, cell_threshold_bytes,
|
||||
rows_count_threshold, collection_elements_threshold) {
|
||||
start();
|
||||
}
|
||||
|
||||
virtual future<> record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, clustering_key, row_size, 0, 0, nullptr, 0, 0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, clustering_key, 0, 0, 0, &cdef, cell_size, collection_elements);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> record_large_partitions(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
uint64_t partition_size, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows_count) const override {
|
||||
const schema_ptr s = sst.get_schema();
|
||||
callback(*s, partition_key, nullptr, rows_count, range_tombstones_count, dead_rows_count, nullptr, 0, 0);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
virtual future<> delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> update_large_data_entries_sstable_name(const schema& s, sstring old_name, sstring new_name, std::string_view large_table_name) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
protected:
|
||||
future<> record_large_cells(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, const column_definition&, uint64_t, uint64_t) const override { return make_ready_future<>(); }
|
||||
future<> record_large_rows(const sstables::sstable&, const sstables::key&,
|
||||
const clustering_key_prefix*, uint64_t) const override { return make_ready_future<>(); }
|
||||
future<> delete_large_data_entries(const schema&, sstring, std::string_view) const override { return make_ready_future<>(); }
|
||||
future<> update_large_data_entries_sstable_name(const schema&, sstring, sstring, std::string_view) const override { return make_ready_future<>(); }
|
||||
future<> record_large_partitions(const sstables::sstable&, const sstables::key&,
|
||||
uint64_t, uint64_t, uint64_t, uint64_t) const override { return make_ready_future<>(); }
|
||||
};
|
||||
}
|
||||
|
||||
static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
|
||||
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
|
||||
unsigned i = 0;
|
||||
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
|
||||
BOOST_REQUIRE_LT(i, expected.size());
|
||||
BOOST_REQUIRE_GT(row_size, threshold);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
if (clustering_key) {
|
||||
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
|
||||
}
|
||||
++i;
|
||||
};
|
||||
|
||||
large_row_handler handler(threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(), threshold,
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(s, version);
|
||||
@@ -5164,14 +5129,50 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
// depends on the encoding statistics (because of variable-length encoding). The original values
|
||||
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
|
||||
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
BOOST_REQUIRE_EQUAL(i, expected.size());
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
// Collect row_size records (order is not guaranteed, so just count and validate).
|
||||
unsigned row_size_count = 0;
|
||||
unsigned static_row_count = 0;
|
||||
unsigned clustering_row_count = 0;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type != large_data_type::row_size) {
|
||||
continue;
|
||||
}
|
||||
BOOST_REQUIRE_GT(rec.value, threshold);
|
||||
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
|
||||
if (rec_ck_str.empty()) {
|
||||
++static_row_count;
|
||||
} else {
|
||||
++clustering_row_count;
|
||||
}
|
||||
++row_size_count;
|
||||
}
|
||||
}
|
||||
// Count expected static rows (nullptr entries) and clustering rows.
|
||||
unsigned expected_static = 0;
|
||||
unsigned expected_clustering = 0;
|
||||
for (auto* ck : expected) {
|
||||
if (ck) {
|
||||
++expected_clustering;
|
||||
} else {
|
||||
++expected_static;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(row_size_count, expected.size());
|
||||
BOOST_REQUIRE_EQUAL(static_row_count, expected_static);
|
||||
BOOST_REQUIRE_EQUAL(clustering_row_count, expected_clustering);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5189,26 +5190,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
|
||||
static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit, replica::memtable& mt, const partition_key& pk,
|
||||
std::vector<clustering_key*> expected, uint64_t threshold, sstables::sstable_version_types version) {
|
||||
unsigned i = 0;
|
||||
auto f = [&i, &expected, &pk, &threshold](const schema& s, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_TEST_MESSAGE(format("i={} ck={} cell_size={} threshold={}", i, clustering_key ? format("{}", *clustering_key) : "null", cell_size, threshold));
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(s), partition_key.to_partition_key(s).components(s)));
|
||||
BOOST_REQUIRE_LT(i, expected.size());
|
||||
BOOST_REQUIRE_GT(cell_size, threshold);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
if (clustering_key) {
|
||||
BOOST_REQUIRE(expected[i]->equal(s, *clustering_key));
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(expected[i], nullptr);
|
||||
}
|
||||
++i;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), f);
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), threshold,
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(s, version);
|
||||
@@ -5218,14 +5202,50 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
// depends on the encoding statistics (because of variable-length encoding). The original values
|
||||
// were chosen with the default-constructed encoding_stats, so let's keep it that way.
|
||||
sst->write_components(mt.make_mutation_reader(s, std::move(permit)), 1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
BOOST_REQUIRE_EQUAL(i, expected.size());
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
// Collect cell_size records (order is not guaranteed, so just count and validate).
|
||||
unsigned cell_size_count = 0;
|
||||
unsigned static_cell_count = 0;
|
||||
unsigned clustering_cell_count = 0;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type != large_data_type::cell_size) {
|
||||
continue;
|
||||
}
|
||||
BOOST_REQUIRE_GT(rec.value, threshold);
|
||||
auto rec_ck_str = sstring(reinterpret_cast<const char*>(rec.clustering_key.value.data()), rec.clustering_key.value.size());
|
||||
if (rec_ck_str.empty()) {
|
||||
++static_cell_count;
|
||||
} else {
|
||||
++clustering_cell_count;
|
||||
}
|
||||
++cell_size_count;
|
||||
}
|
||||
}
|
||||
// Count expected static cells (nullptr entries) and clustering cells.
|
||||
unsigned expected_static = 0;
|
||||
unsigned expected_clustering = 0;
|
||||
for (auto* ck : expected) {
|
||||
if (ck) {
|
||||
++expected_clustering;
|
||||
} else {
|
||||
++expected_static;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(cell_size_count, expected.size());
|
||||
BOOST_REQUIRE_EQUAL(static_cell_count, expected_static);
|
||||
BOOST_REQUIRE_EQUAL(clustering_cell_count, expected_clustering);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation partition = s.new_mutation("pv");
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5244,8 +5264,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
sv += "foo ";
|
||||
@@ -5258,27 +5278,27 @@ static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uin
|
||||
schema_ptr sc = s.schema();
|
||||
auto mt = make_memtable(sc, {p});
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&logged, &pk, &threshold, &rows, &range_tombstones](const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones_count, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(rows_count, threshold);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
|
||||
// Inserting one range tombstone creates two range tombstone marker rows
|
||||
BOOST_REQUIRE_EQUAL(rows_count, rows + range_tombstones * 2);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones_count, range_tombstones * 2);
|
||||
logged = true;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
// rows_count_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
threshold, std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5306,8 +5326,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
int expected_dead_rows = 0;
|
||||
@@ -5354,7 +5374,6 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
|
||||
}
|
||||
auto ck_start = ck;
|
||||
auto rt_start = bound_view(ck_start, tests::random::get_bool() ? bound_kind::incl_start : bound_kind::excl_start);
|
||||
auto sv_end = sv;
|
||||
auto rt_size = tests::random::get_int(1, 10);
|
||||
for (auto i = 0; i < rt_size; i++) {
|
||||
sv += "X";
|
||||
@@ -5373,25 +5392,27 @@ static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold,
|
||||
auto mt = make_lw_shared<replica::memtable>(sc);
|
||||
mt->apply(p);
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&] (const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(rows_count, threshold);
|
||||
BOOST_REQUIRE_EQUAL(rows_count, rows);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, expected_dead_rows + expected_expired_rows + expected_rows_with_dead_cell);
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, expected_range_tombstones);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
logged = true;
|
||||
};
|
||||
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), threshold, std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), f);
|
||||
// rows_count_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
threshold, std::numeric_limits<uint64_t>::max());
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::rows_in_partition && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5415,8 +5436,8 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
mutation p = s.new_mutation("pv");
|
||||
const partition_key& pk = p.key();
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
kv_map[to_bytes(format("key{}", i))] = to_bytes(format("val{}", i));
|
||||
@@ -5425,25 +5446,28 @@ static void test_sstable_too_many_collection_elements_f(int elements, uint64_t t
|
||||
schema_ptr sc = s.schema();
|
||||
auto mt = make_memtable(sc, {p});
|
||||
|
||||
bool logged = false;
|
||||
auto f = [&logged, &pk, &threshold](const schema& sc, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t rows_count, uint64_t range_tombstones, uint64_t dead_rows,
|
||||
const column_definition* cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
BOOST_REQUIRE_GT(collection_elements, threshold);
|
||||
BOOST_REQUIRE(std::ranges::equal(pk.components(sc), partition_key.to_partition_key(sc).components(sc)));
|
||||
BOOST_REQUIRE_EQUAL(range_tombstones, 0);
|
||||
BOOST_REQUIRE_EQUAL(dead_rows, 0);
|
||||
logged = true;
|
||||
};
|
||||
|
||||
BOOST_TEST_MESSAGE(format("elements={} threshold={} expected={}", elements, threshold, expected));
|
||||
large_row_handler handler(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), threshold, f);
|
||||
// collection_elements_threshold = threshold; all other thresholds at MAX.
|
||||
large_data_test_handler handler(std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(), threshold);
|
||||
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
auto sst = env.make_sstable(sc, version);
|
||||
sst->write_components(mt->make_mutation_reader(sc, semaphore.make_permit()), 1, sc, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
|
||||
sst->open_data(cfg).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(logged, expected);
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
bool found = false;
|
||||
if (records_opt) {
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::elements_in_collection && rec.elements_count > threshold) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_EQUAL(found, expected);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
|
||||
@@ -5463,6 +5487,220 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_too_many_collection_elements) {
|
||||
}
|
||||
}
|
||||
|
||||
// Handler for testing LargeDataRecords population in SSTable metadata.
|
||||
// large_data_test_handler already serves this purpose; keep this alias for
|
||||
// test readability / backward compatibility.
|
||||
namespace {
|
||||
using large_data_records_handler = large_data_test_handler;
|
||||
}
|
||||
|
||||
// Test that writing an SSTable with data exceeding large data thresholds
|
||||
// populates LargeDataRecords in the SSTable's scylla metadata, and that
|
||||
// the records survive a round-trip (write + open_data).
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
|
||||
// Use low thresholds so that our small test data exceeds them.
|
||||
// partition_threshold=1 byte, row_threshold=1 byte, cell_threshold=1 byte,
|
||||
// rows_count_threshold=MAX (not tested here), collection_elements_threshold=MAX
|
||||
large_data_records_handler handler(1, 1, 1,
|
||||
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max());
|
||||
|
||||
for (auto version : writable_sstable_versions) {
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
// Create a mutation with a clustering row whose serialized cell value
|
||||
// exceeds the 1-byte thresholds, so partition_size, row_size, and
|
||||
// cell_size records are all generated.
|
||||
// Use make_pkey() (no argument) to generate a key on this shard.
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey("ck1");
|
||||
ss.add_row(m, ck, "a_value_that_is_larger_than_one_byte");
|
||||
|
||||
auto mt = make_memtable(s, {m});
|
||||
auto sst = env.make_sstable(s, version);
|
||||
sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()),
|
||||
1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sst->open_data().get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
BOOST_REQUIRE(records_opt.has_value());
|
||||
auto& records = records_opt->elements;
|
||||
BOOST_REQUIRE(!records.empty());
|
||||
|
||||
// We expect records for partition_size, row_size, and cell_size,
|
||||
// since all exceed the 1-byte thresholds.
|
||||
bool found_partition_size = false;
|
||||
bool found_row_size = false;
|
||||
bool found_cell_size = false;
|
||||
|
||||
for (auto& rec : records) {
|
||||
BOOST_TEST_MESSAGE(format("LargeDataRecord: type={}, pk='{}', ck='{}', col='{}', value={}",
|
||||
static_cast<uint32_t>(rec.type),
|
||||
to_hex(rec.partition_key.value),
|
||||
to_hex(rec.clustering_key.value),
|
||||
sstring(reinterpret_cast<const char*>(rec.column_name.value.data()), rec.column_name.value.size()),
|
||||
rec.value));
|
||||
|
||||
BOOST_REQUIRE(rec.value > 0);
|
||||
|
||||
// Verify partition_key field is non-empty (binary serialized key)
|
||||
BOOST_REQUIRE(!rec.partition_key.value.empty());
|
||||
// Verify binary partition key round-trips to the original partition key
|
||||
auto rec_pk = sstables::key_view(rec.partition_key.value).to_partition_key(*s);
|
||||
BOOST_REQUIRE(rec_pk.equal(*s, pk.key()));
|
||||
|
||||
switch (rec.type) {
|
||||
case large_data_type::partition_size:
|
||||
found_partition_size = true;
|
||||
// clustering_key and column_name should be empty for partition-level entries
|
||||
BOOST_REQUIRE(rec.clustering_key.value.empty());
|
||||
BOOST_REQUIRE(rec.column_name.value.empty());
|
||||
// Verify partition-level auxiliary fields:
|
||||
// elements_count = rows in partition (1 clustering row + 1 implicit static row)
|
||||
BOOST_REQUIRE_EQUAL(rec.elements_count, 2u);
|
||||
BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u);
|
||||
BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u);
|
||||
break;
|
||||
case large_data_type::row_size:
|
||||
found_row_size = true;
|
||||
// Static rows have an empty clustering key; clustering rows have a non-empty one.
|
||||
// Both are valid row_size records.
|
||||
// Verify binary clustering key round-trips for non-static rows.
|
||||
if (!rec.clustering_key.value.empty()) {
|
||||
auto rec_ck = clustering_key_prefix::from_bytes(rec.clustering_key.value);
|
||||
BOOST_REQUIRE(rec_ck.equal(*s, ck));
|
||||
}
|
||||
BOOST_REQUIRE(rec.column_name.value.empty());
|
||||
// Non-partition records should have zero auxiliary fields
|
||||
BOOST_REQUIRE_EQUAL(rec.elements_count, 0u);
|
||||
BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u);
|
||||
BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u);
|
||||
break;
|
||||
case large_data_type::cell_size:
|
||||
found_cell_size = true;
|
||||
BOOST_REQUIRE(!rec.column_name.value.empty());
|
||||
// For cell_size records, elements_count may be non-zero
|
||||
// (collection element count) but range_tombstones/dead_rows
|
||||
// should be zero.
|
||||
BOOST_REQUIRE_EQUAL(rec.range_tombstones, 0u);
|
||||
BOOST_REQUIRE_EQUAL(rec.dead_rows, 0u);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_MESSAGE(found_partition_size, "Expected a partition_size record");
|
||||
BOOST_REQUIRE_MESSAGE(found_row_size, "Expected a row_size record");
|
||||
BOOST_REQUIRE_MESSAGE(found_cell_size, "Expected a cell_size record");
|
||||
}, { &handler }).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the bounded top-N heap correctly keeps only the largest entries.
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
|
||||
// Row threshold = 1 byte so every row triggers recording.
|
||||
// partition threshold = MAX to avoid partition records (simplifies counting).
|
||||
large_data_records_handler handler(
|
||||
std::numeric_limits<uint64_t>::max(), // partition threshold
|
||||
1, // row threshold
|
||||
std::numeric_limits<uint64_t>::max(), // cell threshold
|
||||
std::numeric_limits<uint64_t>::max(), // rows count threshold
|
||||
std::numeric_limits<uint64_t>::max() // collection elements threshold
|
||||
);
|
||||
|
||||
for (auto version : writable_sstable_versions) {
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
// Set large_data_records_per_sstable to 3 so we can test bounding.
|
||||
env.db_config().compaction_large_data_records_per_sstable(3);
|
||||
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
// Create 6 partitions, each with one row of increasing size.
|
||||
// Since each partition has exactly one row, we get 6 row_size records
|
||||
// competing for 3 slots.
|
||||
// Use make_pkeys() to generate shard-local keys.
|
||||
auto pkeys = ss.make_pkeys(6);
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
auto& pk = pkeys[i];
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey(format("ck{}", i));
|
||||
// Row value size increases with i. Pad with 'x' characters.
|
||||
sstring val(100 * (i + 1), 'x');
|
||||
ss.add_row(m, ck, val);
|
||||
muts.push_back(std::move(m));
|
||||
}
|
||||
|
||||
auto mt = make_memtable(s, muts);
|
||||
auto sst = env.make_sstable(s, version);
|
||||
sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()),
|
||||
6, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sst->open_data().get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
BOOST_REQUIRE(records_opt.has_value());
|
||||
|
||||
// Count row_size records.
|
||||
size_t row_size_count = 0;
|
||||
uint64_t min_row_value = std::numeric_limits<uint64_t>::max();
|
||||
for (auto& rec : records_opt->elements) {
|
||||
if (rec.type == large_data_type::row_size) {
|
||||
row_size_count++;
|
||||
min_row_value = std::min(min_row_value, rec.value);
|
||||
}
|
||||
}
|
||||
|
||||
// We wrote 6 rows but limited to top-3, so we should have exactly 3 row_size records.
|
||||
BOOST_REQUIRE_EQUAL(row_size_count, 3u);
|
||||
|
||||
// The smallest of the top-3 should be from one of the larger rows (index 3, 4, or 5).
|
||||
// Row at index 3 has value size ~ 400 bytes of 'x' chars. The actual row_size will be
|
||||
// larger due to serialization overhead, but should be well above 100 * 1 = 100 bytes
|
||||
// (the smallest row). This verifies the heap evicted smaller entries.
|
||||
// We just check that the minimum value in the top-3 is larger than what the
|
||||
// smallest row (index 0, ~100 bytes of payload) would produce.
|
||||
BOOST_REQUIRE_GT(min_row_value, 100u);
|
||||
}, { &handler }).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Test that an SSTable with data below all thresholds produces no LargeDataRecords.
|
||||
SEASTAR_THREAD_TEST_CASE(test_large_data_records_none_when_below_threshold) {
|
||||
// All thresholds at maximum => nothing should be recorded.
|
||||
large_data_records_handler handler(
|
||||
std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max(),
|
||||
std::numeric_limits<uint64_t>::max()
|
||||
);
|
||||
|
||||
for (auto version : writable_sstable_versions) {
|
||||
sstables::test_env::do_with_async([&] (auto& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
ss.add_row(m, ss.make_ckey("ck1"), "small_value");
|
||||
|
||||
auto mt = make_memtable(s, {m});
|
||||
auto sst = env.make_sstable(s, version);
|
||||
sst->write_components(mt->make_mutation_reader(s, env.make_reader_permit()),
|
||||
1, s, env.manager().configure_writer("test"), encoding_stats{}).get();
|
||||
sst->open_data().get();
|
||||
|
||||
auto& records_opt = sst->get_large_data_records();
|
||||
// With all thresholds at max, no records should be written.
|
||||
BOOST_REQUIRE(!records_opt.has_value());
|
||||
}, { &handler }).get();
|
||||
}
|
||||
}
|
||||
|
||||
// The following test runs on test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||
//
|
||||
|
||||
@@ -1008,9 +1008,7 @@ private:
|
||||
_mnotifier.local().unregister_listener(&_ss.local()).get();
|
||||
});
|
||||
|
||||
smp::invoke_on_all([&] {
|
||||
return db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg);
|
||||
}).get();
|
||||
db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg, _feature_service.local()).get();
|
||||
|
||||
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
|
||||
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,
|
||||
|
||||
@@ -260,6 +260,7 @@ test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, ss
|
||||
.memory_reclaim_threshold = db_config->components_memory_reclaim_threshold,
|
||||
.data_file_directories = db_config->data_file_directories(),
|
||||
.format = db_config->sstable_format,
|
||||
.large_data_records_per_sstable = db_config->compaction_large_data_records_per_sstable,
|
||||
},
|
||||
feature_service,
|
||||
cache_tracker,
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "db/corrupt_data_handler.hh"
|
||||
#include "db/object_storage_endpoint_param.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "readers/combined.hh"
|
||||
#include "readers/filtering.hh"
|
||||
@@ -1338,17 +1339,7 @@ const char* to_string(sstables::scylla_metadata_type t) {
|
||||
case sstables::scylla_metadata_type::SSTableIdentifier: return "sstable_identifier";
|
||||
case sstables::scylla_metadata_type::Schema: return "schema";
|
||||
case sstables::scylla_metadata_type::ComponentsDigests: return "components_digests";
|
||||
}
|
||||
std::abort();
|
||||
}
|
||||
|
||||
const char* to_string(sstables::large_data_type t) {
|
||||
switch (t) {
|
||||
case sstables::large_data_type::partition_size: return "partition_size";
|
||||
case sstables::large_data_type::row_size: return "row_size";
|
||||
case sstables::large_data_type::cell_size: return "cell_size";
|
||||
case sstables::large_data_type::rows_in_partition: return "rows_in_partition";
|
||||
case sstables::large_data_type::elements_in_collection: return "elements_in_collection";
|
||||
case sstables::scylla_metadata_type::LargeDataRecords: return "large_data_records";
|
||||
}
|
||||
std::abort();
|
||||
}
|
||||
@@ -1363,12 +1354,13 @@ const char* to_string(sstables::ext_timestamp_stats_type t) {
|
||||
|
||||
class scylla_metadata_visitor {
|
||||
json_writer& _writer;
|
||||
schema_ptr _schema;
|
||||
|
||||
dht::token as_token(const sstables::disk_string<uint16_t>& ds) const {
|
||||
return dht::token(dht::token::kind::key, bytes_view(ds));
|
||||
}
|
||||
public:
|
||||
scylla_metadata_visitor(json_writer& writer) : _writer(writer) { }
|
||||
scylla_metadata_visitor(json_writer& writer, schema_ptr schema = nullptr) : _writer(writer), _schema(std::move(schema)) { }
|
||||
|
||||
void operator()(const sstables::sharding_metadata& val) const {
|
||||
_writer.StartArray();
|
||||
@@ -1440,7 +1432,7 @@ public:
|
||||
void operator()(const sstables::scylla_metadata::large_data_stats& val) const {
|
||||
_writer.StartObject();
|
||||
for (const auto& [k, v] : val.map) {
|
||||
_writer.Key(to_string(k));
|
||||
_writer.Key(fmt::format("{}", k));
|
||||
_writer.StartObject();
|
||||
_writer.Key("max_value");
|
||||
_writer.Uint64(v.max_value);
|
||||
@@ -1452,6 +1444,32 @@ public:
|
||||
}
|
||||
_writer.EndObject();
|
||||
}
|
||||
void operator()(const sstables::large_data_record& val) const {
|
||||
_writer.StartObject();
|
||||
_writer.Key("type");
|
||||
_writer.String(fmt::format("{}", val.type));
|
||||
_writer.Key("partition_key");
|
||||
auto pk = sstables::key_view(val.partition_key.value).to_partition_key(*_schema);
|
||||
_writer.String(key_to_str(pk, *_schema));
|
||||
_writer.Key("clustering_key");
|
||||
if (!val.clustering_key.value.empty()) {
|
||||
auto ck = clustering_key_prefix::from_bytes(val.clustering_key.value);
|
||||
_writer.String(key_to_str(ck, *_schema));
|
||||
} else {
|
||||
_writer.String("");
|
||||
}
|
||||
_writer.Key("column_name");
|
||||
_writer.String(disk_string_to_string(val.column_name));
|
||||
_writer.Key("value");
|
||||
_writer.Uint64(val.value);
|
||||
_writer.Key("elements_count");
|
||||
_writer.Uint64(val.elements_count);
|
||||
_writer.Key("range_tombstones");
|
||||
_writer.Uint64(val.range_tombstones);
|
||||
_writer.Key("dead_rows");
|
||||
_writer.Uint64(val.dead_rows);
|
||||
_writer.EndObject();
|
||||
}
|
||||
void operator()(const sstables::scylla_metadata::ext_timestamp_stats& val) const {
|
||||
_writer.StartObject();
|
||||
for (const auto& [k, v] : val.map) {
|
||||
@@ -1538,7 +1556,7 @@ void dump_scylla_metadata_operation(schema_ptr schema, reader_permit permit, con
|
||||
continue;
|
||||
}
|
||||
for (const auto& [k, v] : m->data.data) {
|
||||
std::visit(scylla_metadata_visitor(writer), v);
|
||||
std::visit(scylla_metadata_visitor(writer, schema), v);
|
||||
}
|
||||
if (m->digest.has_value()) {
|
||||
writer.Key("digest");
|
||||
|
||||
Reference in New Issue
Block a user