db: implement large_data_record_index

Per-table index over large_data_records from all live SSTables.
Uses three intrusive multisets (partitions, rows, cells) with member
hooks directly on large_data_record.  Auto-unlink handles cleanup
when SSTables are destroyed.  Aggregation (max across SSTables for
the same key) happens at lookup time via equal_range.
This commit is contained in:
Taras Veretilnyk
2026-05-13 13:54:52 +02:00
parent e0fee41d25
commit ae8879d2f6
2 changed files with 162 additions and 0 deletions

View File

@@ -76,6 +76,82 @@ future<> large_data_handler::unplug_system_keyspace() noexcept {
co_await _sys_ks.unplug();
}
void large_data_record_index::register_sstable(sstables::shared_sstable sst) {
auto& records_opt = sst->get_large_data_records();
if (!records_opt) {
return;
}
for (auto& rec : records_opt->elements) {
switch (rec.type) {
case sstables::large_data_type::partition_size:
case sstables::large_data_type::rows_in_partition:
_partitions.insert(rec);
break;
case sstables::large_data_type::row_size:
_rows.insert(rec);
break;
case sstables::large_data_type::elements_in_collection:
_collections.insert(rec);
break;
default:
break;
}
}
}
void large_data_record_index::rebuild(
const std::unordered_set<sstables::shared_sstable>& sstables) {
_partitions.clear();
_rows.clear();
_collections.clear();
for (const auto& sst : sstables) {
register_sstable(sst);
}
}
std::optional<large_data_record_index::partition_entry>
large_data_record_index::lookup_partition(bytes_view pk_bytes) const {
lookup_key lk{pk_bytes, {}, {}};
auto [begin, end] = _partitions.equal_range(lk, _partitions.key_comp());
if (begin == end) {
return std::nullopt;
}
partition_entry result;
for (auto it = begin; it != end; ++it) {
result.partition_size = std::max(result.partition_size, it->value);
result.rows = std::max(result.rows, it->elements_count);
}
return result;
}
std::optional<uint64_t> large_data_record_index::lookup_row(bytes_view pk_bytes,
bytes_view ck_bytes) const {
lookup_key lk{pk_bytes, ck_bytes, {}};
auto [begin, end] = _rows.equal_range(lk, _rows.key_comp());
if (begin == end) {
return std::nullopt;
}
uint64_t result = 0;
for (auto it = begin; it != end; ++it) {
result = std::max(result, it->value);
}
return result;
}
std::optional<uint64_t> large_data_record_index::lookup_collection(bytes_view pk_bytes,
bytes_view ck_bytes, bytes_view column_name) const {
lookup_key lk{pk_bytes, ck_bytes, column_name};
auto [begin, end] = _collections.equal_range(lk, _collections.key_comp());
if (begin == end) {
return std::nullopt;
}
uint64_t result = 0;
for (auto it = begin; it != end; ++it) {
result = std::max(result, it->elements_count);
}
return result;
}
sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
return sst.component_basename(sstables::component_type::Data);
}

View File

@@ -8,11 +8,17 @@
#pragma once
#include <concepts>
#include <cstdint>
#include <optional>
#include <boost/intrusive/set.hpp>
#include "bytes.hh"
#include "schema/schema_fwd.hh"
#include "system_keyspace.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/types.hh"
#include "utils/assert.hh"
#include "utils/hash.hh"
#include "utils/updateable_value.hh"
#include "utils/pluggable.hh"
@@ -21,10 +27,90 @@ class sstable;
class key;
}
class partition_key_view;
namespace db {
class system_keyspace;
using sstables::large_data_record;
struct lookup_key {
bytes_view pk;
bytes_view ck;
bytes_view column_name;
};
// Compile-time comparison depth, one per multiset.
// partition → pk only
// row → pk + ck
// cell → pk + ck + column_name
enum class record_type { partition, row, collection };
template<record_type Depth>
struct record_compare {
private:
static bytes_view get_pk(const large_data_record& r) noexcept { return bytes_view(r.partition_key.value); }
static bytes_view get_pk(const lookup_key& k) noexcept { return k.pk; }
static bytes_view get_ck(const large_data_record& r) noexcept { return bytes_view(r.clustering_key.value); }
static bytes_view get_ck(const lookup_key& k) noexcept { return k.ck; }
static bytes_view get_col(const large_data_record& r) noexcept { return bytes_view(r.column_name.value); }
static bytes_view get_col(const lookup_key& k) noexcept { return k.column_name; }
public:
template<typename L, typename R>
requires (std::same_as<L, large_data_record> || std::same_as<R, large_data_record>)
bool operator()(const L& a, const R& b) const noexcept {
auto l_pk = get_pk(a), r_pk = get_pk(b);
if (l_pk != r_pk) {
return l_pk < r_pk;
}
if constexpr (Depth == record_type::partition) {
return false;
}
auto l_ck = get_ck(a), r_ck = get_ck(b);
if (l_ck != r_ck) {
return l_ck < r_ck;
}
if constexpr (Depth == record_type::row) {
return false;
}
return get_col(a) < get_col(b);
}
};
template<record_type Depth>
using record_set = boost::intrusive::multiset<large_data_record,
boost::intrusive::member_hook<large_data_record,
large_data_record::index_hook_type, &large_data_record::_index_hook>,
boost::intrusive::compare<record_compare<Depth>>,
boost::intrusive::constant_time_size<false>>;
// Per-table index over large_data_records from all live SSTables.
// Links directly into records stored in each SSTable's scylla_metadata
// via intrusive member hooks (auto_unlink). Aggregation (max across
// SSTables for the same key) happens at lookup time via equal_range.
class large_data_record_index {
public:
struct partition_entry {
uint64_t partition_size = 0;
uint64_t rows = 0;
};
void register_sstable(sstables::shared_sstable sst);
void rebuild(const std::unordered_set<sstables::shared_sstable>& sstables);
std::optional<partition_entry> lookup_partition(bytes_view pk_bytes) const;
std::optional<uint64_t> lookup_row(bytes_view pk_bytes, bytes_view ck_bytes) const;
std::optional<uint64_t> lookup_collection(bytes_view pk_bytes,
bytes_view ck_bytes, bytes_view column_name) const;
private:
record_set<record_type::partition> _partitions;
record_set<record_type::row> _rows;
record_set<record_type::collection> _collections;
};
class large_data_handler {
public:
struct stats {