Merge 'sstables/trie: implement BTI node format serialization and traversal' from Michał Chojnowski

This is the next part in the BTI index project.

Overarching issue: https://github.com/scylladb/scylladb/issues/19191
Previous part: https://github.com/scylladb/scylladb/pull/25154
Next part: implementing a trie cursor (the "set to key, step forwards, step backwards" thing) on top of the `node_reader` added here.

The new code added here is not used for anything yet, but it's posted as a separate PR
to keep things reviewably small.

This part implements the BTI trie node encoding, as described in https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.md#trie-nodes.
It contains the logic for encoding the abstract in-memory `writer_node`s (added in the previous PR)
into the on-disk format, and the logic for traversing the on-disk nodes during a read.

New functionality, no backporting needed.

Closes scylladb/scylladb#25317

* github.com:scylladb/scylladb:
  sstables/trie: add tests for BTI node serialization and traversal
  sstables/trie: implement BTI node traversal
  sstables/trie: implement BTI serialization
  utils/cached_file: add get_shared_page()
  utils/cached_file: replace a std::pair with a named struct
This commit is contained in:
Avi Kivity
2025-08-07 12:15:42 +03:00
13 changed files with 2072 additions and 10 deletions

View File

@@ -565,6 +565,7 @@ scylla_tests = set([
'test/boost/token_metadata_test',
'test/boost/top_k_test',
'test/boost/transport_test',
'test/boost/bti_node_sink_test',
'test/boost/trie_writer_test',
'test/boost/symmetric_key_test',
'test/boost/types_test',
@@ -866,6 +867,8 @@ scylla_core = (['message/messaging_service.cc',
'sstables/random_access_reader.cc',
'sstables/metadata_collector.cc',
'sstables/writer.cc',
'sstables/trie/bti_node_reader.cc',
'sstables/trie/bti_node_sink.cc',
'sstables/trie/trie_writer.cc',
'transport/cql_protocol_extension.cc',
'transport/event.cc',

View File

@@ -20,6 +20,8 @@ target_sources(sstables
sstables_manager.cc
sstable_version.cc
storage.cc
trie/bti_node_reader.cc
trie/bti_node_sink.cc
trie/trie_writer.cc
writer.cc)
target_include_directories(sstables

View File

@@ -29,6 +29,7 @@ public:
};
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename);
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos);
// Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables.
// SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background

View File

@@ -119,6 +119,10 @@ logging::logger sstlog("sstable");
on_internal_error(sstlog, std::move(ex));
}
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos) {
on_internal_error(sstlog, fmt::format("BTI parse error for node at pos {}", pos));
}
template <typename T>
const char* nullsafe_typename(T* x) noexcept {
try {

View File

@@ -0,0 +1,483 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "bti_node_reader.hh"
#include "bti_node_type.hh"
namespace sstables::trie {
get_child_result bti_get_child(uint64_t pos, const_bytes sp, int child_idx, bool forward) {
auto type = uint8_t(sp[0]) >> 4;
trie::get_child_result result;
auto single = [&](uint64_t offset) {
result.offset = offset;
result.idx = 0;
return result;
};
auto sparse = [&] [[gnu::always_inline]] (int type) {
auto bpp = bits_per_pointer_arr[type];
result.idx = child_idx;
result.offset = read_offset(sp.subspan(2 + int(sp[1])), child_idx, bpp);
return result;
};
auto dense = [&] [[gnu::always_inline]] (int type) {
auto bpp = bits_per_pointer_arr[type];
auto dense_span = uint64_t(sp[2]) + 1;
int idx = child_idx;
int increment = forward ? 1 : -1;
while (idx < int(dense_span) && idx >= 0) {
if (auto off = read_offset(sp.subspan(3), idx, bpp)) {
result.idx = idx;
result.offset = off;
return result;
} else {
idx += increment;
}
}
[[unlikely]] sstables::on_bti_parse_error(pos);
};
switch (type) {
case PAYLOAD_ONLY:
[[unlikely]] sstables::on_bti_parse_error(pos);
case SINGLE_NOPAYLOAD_4:
return single(uint64_t(sp[0]) & 0xf);
case SINGLE_NOPAYLOAD_12:
return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]));
case SINGLE_8:
return single(uint64_t(sp[2]));
case SINGLE_16:
return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3]));
case SPARSE_8:
return sparse(type);
case SPARSE_12:
return sparse(type);
case SPARSE_16:
return sparse(type);
case SPARSE_24:
return sparse(type);
case SPARSE_40:
return sparse(type);
case DENSE_12:
return dense(type);
case DENSE_16:
return dense(type);
case DENSE_24:
return dense(type);
case DENSE_32:
return dense(type);
case DENSE_40:
return dense(type);
case LONG_DENSE:
return dense(type);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
std::byte bti_get_child_transition(uint64_t pos, const_bytes raw, int idx) {
auto type = uint8_t(raw[0]) >> 4;
switch (type) {
case PAYLOAD_ONLY:
abort();
case SINGLE_NOPAYLOAD_4:
return raw[1];
case SINGLE_8:
return raw[1];
case SINGLE_NOPAYLOAD_12:
return raw[2];
case SINGLE_16:
return raw[1];
case SPARSE_8:
case SPARSE_12:
case SPARSE_16:
case SPARSE_24:
case SPARSE_40:
return raw[2 + idx];
case DENSE_12:
case DENSE_16:
case DENSE_24:
case DENSE_32:
case DENSE_40:
case LONG_DENSE:
return std::byte(uint8_t(raw[1]) + idx);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
load_final_node_result bti_read_node(int64_t pos, const_bytes sp) {
load_final_node_result result;
auto type = uint8_t(sp[0]) >> 4;
auto single = [&](uint8_t payload_bits) {
result.n_children = 1;
result.payload_bits = payload_bits;
return result;
};
auto sparse = [&] [[gnu::always_inline]] (int type) {
int n_children = int(sp[1]);
result.n_children = n_children;
result.payload_bits = uint8_t(sp[0]) & 0xf;
return result;
};
auto dense = [&] [[gnu::always_inline]] (int type) {
auto dense_span = uint64_t(sp[2]) + 1;
result.n_children = dense_span;
result.payload_bits = uint8_t(sp[0]) & 0xf;
return result;
};
switch (type) {
case PAYLOAD_ONLY:
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.n_children = 0;
return result;
case SINGLE_NOPAYLOAD_4:
return single(0);
case SINGLE_NOPAYLOAD_12:
return single(0);
case SINGLE_8:
return single(uint8_t(sp[0]) & 0xf);
case SINGLE_16:
return single(uint8_t(sp[0]) & 0xf);
case SPARSE_8:
return sparse(type);
case SPARSE_12:
return sparse(type);
case SPARSE_16:
return sparse(type);
case SPARSE_24:
return sparse(type);
case SPARSE_40:
return sparse(type);
case DENSE_12:
return dense(type);
case DENSE_16:
return dense(type);
case DENSE_24:
return dense(type);
case DENSE_32:
return dense(type);
case DENSE_40:
return dense(type);
case LONG_DENSE:
return dense(type);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
const_bytes bti_get_payload(int64_t pos, const_bytes sp) {
auto type = uint8_t(sp[0]) >> 4;
switch (type) {
case PAYLOAD_ONLY:
return sp.subspan(1);
case SINGLE_NOPAYLOAD_4:
case SINGLE_NOPAYLOAD_12:
return sp.subspan(1 + div_ceil(bits_per_pointer_arr[type], 8));
case SINGLE_8:
case SINGLE_16:
return sp.subspan(2 + div_ceil(bits_per_pointer_arr[type], 8));
case SPARSE_8:
case SPARSE_12:
case SPARSE_16:
case SPARSE_24:
case SPARSE_40: {
auto n_children = uint8_t(sp[1]);
return sp.subspan(2 + div_ceil(n_children * (8 + bits_per_pointer_arr[type]), 8));
}
case DENSE_12:
case DENSE_16:
case DENSE_24:
case DENSE_32:
case DENSE_40:
case LONG_DENSE: {
auto dense_span = uint8_t(sp[2]) + 1;
return sp.subspan(3 + div_ceil(dense_span * bits_per_pointer_arr[type], 8));
}
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
node_traverse_result bti_walk_down_along_key(int64_t pos, const_bytes sp, const_bytes key) {
auto type = uint8_t(sp[0]) >> 4;
trie::node_traverse_result result;
result.body_pos = pos;
result.traversed_key_bytes = 0;
auto single = [&](std::byte edge, uint64_t offset, uint8_t payload_bits) {
result.n_children = 1;
result.payload_bits = payload_bits;
if (key[0] <= edge) {
result.found_idx = 0;
result.found_byte = int(edge);
result.child_offset = offset;
} else {
result.found_idx = 1;
result.found_byte = -1;
result.child_offset = -1;
}
return result;
};
auto sparse = [&] [[gnu::always_inline]] (int type) {
int n_children = int(sp[1]);
auto idx = std::lower_bound(&sp[2], &sp[2 + n_children], key[0]) - &sp[2];
result.n_children = n_children;
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.found_idx = idx;
if (idx < n_children) {
auto bpp = bits_per_pointer_arr[type];
result.child_offset = read_offset(sp.subspan(2 + n_children), idx, bpp);
result.found_byte = int(sp[2 + idx]);
} else {
result.child_offset = -1;
result.found_byte = -1;
}
return result;
};
auto dense = [&] [[gnu::always_inline]] (int type) {
auto start = int(sp[1]);
auto idx = std::max<int>(0, int(key[0]) - start);
auto dense_span = uint64_t(sp[2]) + 1;
auto bpp = bits_per_pointer_arr[type];
result.n_children = dense_span;
result.payload_bits = uint8_t(sp[0]) & 0xf;
while (idx < int(dense_span)) {
if (auto off = read_offset(sp.subspan(3), idx, bpp)) {
result.child_offset = off;
result.found_idx = idx;
result.found_byte = start + idx;
return result;
} else {
++idx;
}
}
result.found_idx = dense_span;
result.child_offset = -1;
result.found_byte = -1;
return result;
};
switch (type) {
case PAYLOAD_ONLY:
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.n_children = 0;
result.found_idx = 0;
result.found_byte = -1;
result.child_offset = -1;
return result;
case SINGLE_NOPAYLOAD_4:
return single(sp[1], uint64_t(sp[0]) & 0xf, 0);
case SINGLE_NOPAYLOAD_12:
return single(sp[2], (uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0);
case SINGLE_8:
return single(sp[1], uint64_t(sp[2]), uint8_t(sp[0]) & 0xf);
case SINGLE_16:
return single(sp[1], uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf);
case SPARSE_8:
return sparse(type);
case SPARSE_12:
return sparse(type);
case SPARSE_16:
return sparse(type);
case SPARSE_24:
return sparse(type);
case SPARSE_40:
return sparse(type);
case DENSE_12:
return dense(type);
case DENSE_16:
return dense(type);
case DENSE_24:
return dense(type);
case DENSE_32:
return dense(type);
case DENSE_40:
return dense(type);
case LONG_DENSE:
return dense(type);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
node_traverse_sidemost_result bti_walk_down_leftmost_path(int64_t pos, const_bytes sp) {
auto type = uint8_t(sp[0]) >> 4;
trie::node_traverse_sidemost_result result;
result.body_pos = pos;
auto single = [&](uint64_t offset, uint8_t payload_bits) {
result.n_children = 1;
result.payload_bits = payload_bits;
result.child_offset = offset;
return result;
};
auto sparse = [&] [[gnu::always_inline]] (int type) {
int n_children = int(sp[1]);
auto bpp = bits_per_pointer_arr[type];
result.n_children = n_children;
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.child_offset = read_offset(sp.subspan(2 + n_children), 0, bpp);
return result;
};
auto dense = [&] [[gnu::always_inline]] (int type) {
auto dense_span = uint64_t(sp[2]) + 1;
auto bpp = bits_per_pointer_arr[type];
result.n_children = dense_span;
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.child_offset = read_offset(sp.subspan(3), 0, bpp);
return result;
};
switch (type) {
case PAYLOAD_ONLY:
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.n_children = 0;
result.child_offset = -1;
return result;
case SINGLE_NOPAYLOAD_4:
return single(uint64_t(sp[0]) & 0xf, 0);
case SINGLE_NOPAYLOAD_12:
return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0);
case SINGLE_8:
return single(uint64_t(sp[2]), uint8_t(sp[0]) & 0xf);
case SINGLE_16:
return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf);
case SPARSE_8:
return sparse(type);
case SPARSE_12:
return sparse(type);
case SPARSE_16:
return sparse(type);
case SPARSE_24:
return sparse(type);
case SPARSE_40:
return sparse(type);
case DENSE_12:
return dense(type);
case DENSE_16:
return dense(type);
case DENSE_24:
return dense(type);
case DENSE_32:
return dense(type);
case DENSE_40:
return dense(type);
case LONG_DENSE:
return dense(type);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
node_traverse_sidemost_result bti_walk_down_rightmost_path(int64_t pos, const_bytes sp) {
auto type = uint8_t(sp[0]) >> 4;
trie::node_traverse_sidemost_result result;
result.body_pos = pos;
auto single = [&](uint64_t offset, uint8_t payload_bits) {
result.n_children = 1;
result.payload_bits = payload_bits;
result.child_offset = offset;
return result;
};
auto sparse = [&] [[gnu::always_inline]] (int type) {
int n_children = int(sp[1]);
auto bpp = bits_per_pointer_arr[type];
result.n_children = n_children;
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.child_offset = read_offset(sp.subspan(2 + n_children), n_children - 1, bpp);
return result;
};
auto dense = [&] [[gnu::always_inline]] (int type) {
auto dense_span = uint64_t(sp[2]) + 1;
auto bpp = bits_per_pointer_arr[type];
result.n_children = dense_span;
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.child_offset = read_offset(sp.subspan(3), dense_span - 1, bpp);
return result;
};
switch (type) {
case PAYLOAD_ONLY:
result.payload_bits = uint8_t(sp[0]) & 0xf;
result.n_children = 0;
result.child_offset = -1;
return result;
case SINGLE_NOPAYLOAD_4:
return single(uint64_t(sp[0]) & 0xf, 0);
case SINGLE_NOPAYLOAD_12:
return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0);
case SINGLE_8:
return single(uint64_t(sp[2]), uint8_t(sp[0]) & 0xf);
case SINGLE_16:
return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf);
case SPARSE_8:
return sparse(type);
case SPARSE_12:
return sparse(type);
case SPARSE_16:
return sparse(type);
case SPARSE_24:
return sparse(type);
case SPARSE_40:
return sparse(type);
case DENSE_12:
return dense(type);
case DENSE_16:
return dense(type);
case DENSE_24:
return dense(type);
case DENSE_32:
return dense(type);
case DENSE_40:
return dense(type);
case LONG_DENSE:
return dense(type);
}
[[unlikely]] sstables::on_bti_parse_error(pos);
}
bti_node_reader::bti_node_reader(cached_file& f)
: _file(f) {
}
bool bti_node_reader::cached(int64_t pos) const {
return _cached_page && _cached_page->pos() / cached_file::page_size == pos / cached_file::page_size;
}
seastar::future<> bti_node_reader::load(int64_t pos) {
return _file.get().get_shared_page(pos, nullptr).then([this](auto page) {
_cached_page = std::move(page.ptr);
});
}
trie::load_final_node_result bti_node_reader::read_node(int64_t pos) {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_read_node(pos, sp);
}
trie::node_traverse_result bti_node_reader::walk_down_along_key(int64_t pos, const_bytes key) {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_walk_down_along_key(pos, sp, key);
}
trie::node_traverse_sidemost_result bti_node_reader::walk_down_leftmost_path(int64_t pos) {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_walk_down_leftmost_path(pos, sp);
}
trie::node_traverse_sidemost_result bti_node_reader::walk_down_rightmost_path(int64_t pos) {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_walk_down_rightmost_path(pos, sp);
}
trie::get_child_result bti_node_reader::get_child(int64_t pos, int child_idx, bool forward) const {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_get_child(pos, sp, child_idx, forward);
}
const_bytes bti_node_reader::get_payload(int64_t pos) const {
SCYLLA_ASSERT(cached(pos));
auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size);
return bti_get_payload(pos, sp);
}
} // namespace sstables::trie

View File

@@ -0,0 +1,53 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/cached_file.hh"
#include "node_reader.hh"
namespace sstables {
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos);
}
namespace sstables::trie {
// Implementation of concept `node_reader`.
get_child_result bti_get_child(uint64_t pos, const_bytes sp, int child_idx, bool forward);
std::byte bti_get_child_transition(uint64_t pos, const_bytes raw, int idx);
load_final_node_result bti_read_node(int64_t pos, const_bytes sp);
const_bytes bti_get_payload(int64_t pos, const_bytes sp);
node_traverse_result bti_walk_down_along_key(int64_t pos, const_bytes sp, const_bytes key);
node_traverse_sidemost_result bti_walk_down_leftmost_path(int64_t pos, const_bytes sp);
node_traverse_sidemost_result bti_walk_down_rightmost_path(int64_t pos, const_bytes sp);
// Deals with BTI-specific parts of trie traversal.
// (I.e. the parts which understand the BTI serialization format).
//
// (We talk about "traversal" and not "deserialization" because we don't actually
// want to deserialize the full nodes, we only want to walk over them.)
//
// See comments for concept `node_reader` for a description of the methods.
struct bti_node_reader {
using page_ptr = cached_file::ptr_type;
page_ptr _cached_page;
std::reference_wrapper<cached_file> _file;
bti_node_reader(cached_file& f);
bool cached(int64_t pos) const;
future<> load(int64_t pos);
trie::load_final_node_result read_node(int64_t pos);
trie::node_traverse_result walk_down_along_key(int64_t pos, const_bytes key);
trie::node_traverse_sidemost_result walk_down_leftmost_path(int64_t pos);
trie::node_traverse_sidemost_result walk_down_rightmost_path(int64_t pos);
trie::get_child_result get_child(int64_t pos, int child_idx, bool forward) const;
const_bytes get_payload(int64_t pos) const;
};
static_assert(node_reader<bti_node_reader>);
} // namespace sstables::trie

View File

@@ -0,0 +1,515 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "bti_node_sink.hh"
#include "utils/div_ceil.hh"
namespace sstables::trie {
sink_offset max_offset_from_child(const writer_node& x, sink_pos xpos) {
expensive_log("max_offset_from_child: x={} xpos={}", fmt::ptr(&x), xpos.value);
// We iterate over children in reverse order and, for the ones which aren't written yet,
// we compute their expected output position based on the accumulated values of _node_size and _branch_size.
// Total size of unwritten child branches which we iterated over so far.
// This is the offset between `x` and the next unwritten child branch we are going to see.
auto offset = sink_offset{0};
// Max offset noticed so far during iteration.
auto result = sink_offset{0};
for (auto it = x.get_children().rbegin(); it != x.get_children().rend(); ++it) {
if ((*it)->_pos.valid()) {
// This child has already been written out. It's position is set in stone.
expensive_log("max_offset_from_child: offset={} child={} _pos={} _node_size={}, _branch_size={}",
offset.value, fmt::ptr(&**it), (*it)->_pos.value, (*it)->_node_size.value, (*it)->_branch_size.value);
expensive_assert((*it)->_pos < xpos);
result = std::max<sink_offset>(result, xpos - (*it)->_pos);
} else {
// Child `*it` hasn't been written out yet.
// We compute its offset from `x` based on the total size of all siblings after it
// and the node size of `*it`.
auto delta = offset;
auto chain_length = (*it)->_transition_length;
if (chain_length > 2) {
delta = delta + sink_offset(2);
} else if (chain_length == 2) {
delta = delta + ((*it)->_node_size.value < 16 ? sink_offset(2) : sink_offset(3));
} else {
delta = delta + (*it)->_node_size;
}
expensive_log("max_offset_from_child: offset={} child={} _pos={} _node_size={}, _branch_size={} delta={}",
offset.value, fmt::ptr(&**it), (*it)->_pos.value, (*it)->_node_size.value, (*it)->_branch_size.value, delta.value);
result = std::max<sink_offset>(result, delta);
}
offset = offset + (*it)->_node_size + (*it)->_branch_size;
}
expensive_log("max_offset_from_child: x={} xpos={} result={}", fmt::ptr(&x), xpos.value, result.value);
return result;
}
// Chooses the best node type (i.e the one which will have the smallest valid on-disk representation)
// for the given writer_node, assuming that the values of _node_size, _branch_size and _output_pos
// in its children are accurate,
// and that the given node would be written at output position `xpos`.
static node_type choose_node_type_impl(const writer_node& x, sink_pos xpos) {
const auto n_children = x.get_children().size();
if (n_children == 0) {
// If there is no children, the answer is obvious.
return PAYLOAD_ONLY;
}
auto max_offset = max_offset_from_child(x, xpos);
// For a given offset bitwidth, contains the index of the leanest node type
// in singles[], sparses[] and denses[] which can be used to represent a node with
// this offset bitwidth.
constexpr static uint8_t widths_lookup[] = {
0,
0, 0, 0, 0,
1, 1, 1, 1,
2, 2, 2, 2,
3, 3, 3, 3,
4, 4, 4, 4,
4, 4, 4, 4,
5, 5, 5, 5,
5, 5, 5, 5,
6, 6, 6, 6,
6, 6, 6, 6,
7, 7, 7, 7,
7, 7, 7, 7,
7, 7, 7, 7,
7, 7, 7, 7,
7, 7, 7, 7,
7, 7, 7, 7,
};
auto width_idx = widths_lookup[std::bit_width<uint64_t>(max_offset.value)];
// Nodes with a single close child are very common, so they have dedicated node types.
// For offset widths which don't fit into the dedicated types,
// singles[] returns the smallest valid non-dedicated (multi-child) type.
constexpr node_type singles[] = {SINGLE_NOPAYLOAD_4, SINGLE_8, SINGLE_NOPAYLOAD_12, SINGLE_16, DENSE_24, DENSE_32, DENSE_40, LONG_DENSE};
if (n_children == 1) {
const auto has_payload = x._payload._payload_bits;
if (has_payload && (width_idx == 0 || width_idx == 2)) {
// SINGLE_NOPAYLOAD_4 and SINGLE_NOPAYLOAD_12 can't hold nodes with a payload.
// Fall back to the next smallest node type which can
// (SINGLE_8 and SINGLE_16 respectively).
return singles[width_idx + 1];
}
return singles[width_idx];
}
// For nodes with 2 or more children, we calculate the sizes that would result from choosing
// either the leanest dense node or the leanest sparse node, and we pick the one which
// results in smaller size.
constexpr node_type sparses[] = {SPARSE_8, SPARSE_8, SPARSE_12, SPARSE_16, SPARSE_24, SPARSE_40, SPARSE_40, LONG_DENSE};
constexpr node_type denses[] = {DENSE_12, DENSE_12, DENSE_12, DENSE_16, DENSE_24, DENSE_32, DENSE_40, LONG_DENSE};
const size_t dense_span = 1 + size_t(x.get_children().back()->transition()) - size_t(x.get_children().front()->transition());
auto sparse_size = 16 + div_ceil((bits_per_pointer_arr[sparses[width_idx]] + 8) * n_children, 8);
auto dense_size = 24 + div_ceil(bits_per_pointer_arr[denses[width_idx]] * dense_span, 8);
// If the sizes are equal, pick the dense type, because dense nodes are easier for the reader.
if (sparse_size < dense_size) {
return sparses[width_idx];
} else {
return denses[width_idx];
}
}
// This wrapper exists just to log the return value.
// (Logging the return value of a function with multiple `return` points
// is awkward).
static node_type choose_node_type(const writer_node& x, sink_pos xpos) {
auto res = choose_node_type_impl(x, xpos);
expensive_log("choose_node_type: x={} res={}", fmt::ptr(&x), int(res));
return res;
}
bti_node_sink::bti_node_sink(sstables::file_writer& w, size_t page_size) : _w(w), _page_size(page_size) {
expensive_assert(_page_size <= max_page_size);
}
void bti_node_sink::write_int(uint64_t x, size_t bytes) {
uint64_t be = cpu_to_be(x);
expensive_log("write_int: {}", fmt_hex({reinterpret_cast<const signed char*>(&be) + sizeof(be) - bytes, bytes}));
_w.write(reinterpret_cast<const char*>(&be) + sizeof(be) - bytes, bytes);
}
void bti_node_sink::write_bytes(const_bytes x) {
expensive_log("write_bytes: {}", fmt_hex({reinterpret_cast<const signed char*>(x.data()), x.size()}));
_w.write(reinterpret_cast<const char*>(x.data()), x.size());
}
[[gnu::always_inline]]
size_t bti_node_sink::write_sparse(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos xpos) {
write_int((type << 4) | x._payload._payload_bits, 1);
write_int(x.get_children().size(), 1);
for (const auto& c : x.get_children()) {
write_int(uint8_t(c->transition()), 1);
}
for (const auto& c : x.get_children()) {
uint64_t offset = (xpos - c->_pos).value;
write_int(offset, bytes_per_pointer);
}
write_bytes(x._payload.blob());
return 2 + x.get_children().size() * (1+bytes_per_pointer) + x._payload.blob().size();
}
[[gnu::always_inline]]
node_size bti_node_sink::size_sparse(const writer_node& x, int bits_per_pointer) const {
return node_size(2 + div_ceil(x.get_children().size() * (8+bits_per_pointer), 8) + x._payload.blob().size());
}
[[gnu::always_inline]]
size_t bti_node_sink::write_dense(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos xpos) {
int start = int(x.get_children().front()->transition());
auto dense_span = 1 + int(x.get_children().back()->transition()) - int(x.get_children().front()->transition());
write_int((type << 4) | x._payload._payload_bits, 1);
write_int(start, 1);
write_int(dense_span - 1, 1);
auto it = x.get_children().begin();
auto end_it = x.get_children().end();
for (int next = start; next < start + dense_span; ++next) {
uint64_t offset = 0;
if (it != end_it && int((*it)->transition()) == next) {
offset = (xpos - (*it)->_pos).value;
++it;
}
write_int(offset, bytes_per_pointer);
}
write_bytes(x._payload.blob());
return 3 + dense_span * (bytes_per_pointer) + x._payload.blob().size();
}
[[gnu::always_inline]]
node_size bti_node_sink::size_dense(const writer_node& x, int bits_per_pointer) const {
int first = int(x.get_children().front()->transition());
int last = int(x.get_children().back()->transition());
return node_size(3 + div_ceil(bits_per_pointer * (1 + last - first), 8) + x._payload.blob().size());
}
void bti_node_sink::write_body(const writer_node& x, sink_pos xpos, node_type type) {
switch (type) {
case PAYLOAD_ONLY: {
write_int(type << 4 | x._payload._payload_bits, 1);
write_bytes(x._payload.blob());
return;
}
case SINGLE_NOPAYLOAD_4: {
uint64_t offset = (xpos - x.get_children().front()->_pos).value;
uint8_t transition = uint8_t(x.get_children().front()->transition());
uint8_t arr[2];
arr[0] = (type << 4) | offset;
arr[1] = transition;
write_bytes({reinterpret_cast<const std::byte*>(arr), 2});
return;
}
case SINGLE_8: {
uint64_t offset = (xpos - x.get_children().front()->_pos).value;
uint8_t transition = uint8_t(x.get_children().front()->transition());
uint8_t arr[3 + sizeof(trie_payload::_payload_buf)];
arr[0] = (type << 4) | x._payload._payload_bits;
arr[1] = transition;
arr[2] = offset;
auto sz = x._payload.blob().size();
memcpy(&arr[3], x._payload.blob().data(), sz);
write_bytes({reinterpret_cast<const std::byte*>(arr), 3 + sz});
return;
}
case SINGLE_NOPAYLOAD_12: {
uint64_t offset = (xpos - x.get_children().front()->_pos).value;
uint8_t transition = uint8_t(x.get_children().front()->transition());
write_int((type << 4) | (offset >> 8), 1);
write_int(offset & 0xff, 1);
write_int(transition, 1);
return;
}
case SINGLE_16: {
uint64_t offset = (xpos - x.get_children().front()->_pos).value;
uint8_t transition = uint8_t(x.get_children().front()->transition());
write_int((type << 4) | x._payload._payload_bits, 1);
write_int(transition, 1);
write_int(offset, 2);
write_bytes(x._payload.blob());
return;
}
case SPARSE_8: {
write_sparse(x, type, 1, xpos);
return;
}
case SPARSE_12: {
write_int((type << 4) | x._payload._payload_bits, 1);
write_int(x.get_children().size(), 1);
for (const auto& c : x.get_children()) {
write_int(uint8_t(c->transition()), 1);
}
size_t i;
for (i = 0; i + 1 < x.get_children().size(); i += 2) {
uint64_t offset1 = (xpos - x.get_children()[i]->_pos).value;
uint64_t offset2 = (xpos - x.get_children()[i+1]->_pos).value;
write_int(offset1 << 12 | offset2, 3);
}
if (i < x.get_children().size()) {
uint64_t offset = (xpos - x.get_children()[i]->_pos).value;
write_int(offset << 4, 2);
}
write_bytes(x._payload.blob());
return;
}
case SPARSE_16: {
write_sparse(x, type, 2, xpos);
return;
}
case SPARSE_24: {
write_sparse(x, type, 3, xpos);
return;
}
case SPARSE_40: {
write_sparse(x, type, 5, xpos);
return;
}
case DENSE_12: {
int start = int(x.get_children().front()->transition());
auto dense_span = 1 + int(x.get_children().back()->transition()) - int(x.get_children().front()->transition());
write_int((type << 4) | x._payload._payload_bits, 1);
write_int(start, 1);
write_int(dense_span - 1, 1);
auto it = x.get_children().begin();
auto end_it = x.get_children().end();
int next = start;
for (; next + 1 < start + dense_span; next += 2) {
uint64_t offset_1 = 0;
uint64_t offset_2 = 0;
if (it != end_it && int((*it)->transition()) == next) {
offset_1 = (xpos - (*it)->_pos).value;
++it;
}
if (it != end_it && int((*it)->transition()) == next + 1) {
offset_2 = (xpos - (*it)->_pos).value;
++it;
}
write_int(offset_1 << 12 | offset_2, 3);
}
if (next < start + dense_span) {
uint64_t offset = 0;
if (it != end_it && int((*it)->transition()) == next) {
offset = (xpos - (*it)->_pos).value;
++it;
}
write_int(offset << 4, 2);
}
write_bytes(x._payload.blob());
return;
}
case DENSE_16: {
write_dense(x, type, 2, xpos);
return;
}
case DENSE_24: {
write_dense(x, type, 3, xpos);
return;
}
case DENSE_32: {
write_dense(x, type, 4, xpos);
return;
}
case DENSE_40: {
write_dense(x, type, 5, xpos);
return;
}
case LONG_DENSE: {
write_dense(x, type, 8, xpos);
return;
}
case NODE_TYPE_COUNT: {
abort();
}
}
abort();
}
sink_pos bti_node_sink::write_chain(const writer_node& x, node_size body_offset) {
int i = x._transition_length;
expensive_assert(i >= 2);
const std::byte* __restrict__ transition = x._transition.get();
sink_pos c1_pos = pos();
// Second-to-last node in the chain (i.e. last node before the body) can have size 2 or 3 bytes,
// depending on how big the last node (body) is.
uint64_t offset = body_offset.value;
if (offset >= 16) {
expensive_assert(offset < 4096);
write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_12 << 20), 3);
} else {
write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_4 << 12), 2);
}
i -= 1;
if (i == 1) {
return c1_pos;
}
// Third-to-last node in the chain has always size 2, but the offset can be equal to 3 or 2, depending
// on how big the second-to-last node was.
offset = (pos() - c1_pos).value;
write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_4 << 12), 2);
i -= 1;
// Fourth-to-last and earlier nodes in the chain always have the form 0x12??, where ?? is the transition byte.
// This is SIMDable -- we can load a vector of transition bytes from memory,
// reverse the order (earlier bytes in the transition chain become later nodes in the file)
// and add a 0x12 byte before every transition byte.
constexpr int s = 16; // Vector size.
typedef unsigned char vector2x __attribute__((__vector_size__(s*2))); // Output vector.
typedef unsigned char vector1x __attribute__((__vector_size__(s))); // Input vector.
auto z = uint8_t(SINGLE_NOPAYLOAD_4 << 4 | 2); // 0x12
vector1x zv = {z};
// An extra buffer above the file_writer. It allows us to do constexpr-sized
// memcpy, but incurs an extra copy overall. Is it worth it?
std::array<std::byte, 1024> outbuf;
static_assert(std::size(outbuf) % s == 0);
size_t outbuf_pos = 0;
// Serialize the chain in SIMD blocks of `s` transition bytes.
for (; i - s > 0; i -= s) {
vector1x v;
memcpy(&v, &transition[i - s], s);
vector2x d = __builtin_shufflevector(v, zv,
s, 15, s, 14, s, 13, s, 12, s, 11, s, 10, s, 9, s, 8,
s, 7, s, 6, s, 5, s, 4, s, 3, s, 2, s, 1, s, 0
);
memcpy(&outbuf[outbuf_pos], &d, sizeof(d));
outbuf_pos += sizeof(d);
if (outbuf_pos == 1024) [[unlikely]] {
write_bytes(outbuf);
outbuf_pos = 0;
}
}
// Write the remaining `i - 1` first bytes (excluding idx 0) of the transition chain.
// This is separated from the previous loop so that the previous loop enjoys the benefit
// of constexpr-sized memory ops.
{
vector1x v;
// Load into the `i - 1` last bytes of `v`.
memcpy(reinterpret_cast<char*>(&v) + sizeof(v) - (i - 1), &transition[1], (i - 1));
vector2x d = __builtin_shufflevector(v, zv,
s, 15, s, 14, s, 13, s, 12, s, 11, s, 10, s, 9, s, 8,
s, 7, s, 6, s, 5, s, 4, s, 3, s, 2, s, 1, s, 0
);
// Write out the first `2*(i-1)` first bytes of `v`.
memcpy(&outbuf[outbuf_pos], &d, 2 * (i - 1));
outbuf_pos += 2 * (i - 1);
write_bytes(std::span(outbuf).subspan(0, outbuf_pos));
}
return pos() - sink_offset(2);
}
sink_pos bti_node_sink::write(const writer_node& x, sink_pos xpos) {
expensive_assert(x._transition_length >= 1);
// Write last node in the chain.
sink_pos start_pos = pos();
auto type = choose_node_type(x, xpos);
write_body(x, xpos, type);
if (x._transition_length == 1) {
return start_pos;
}
return write_chain(x, node_size((pos() - start_pos).value));
}
node_size bti_node_sink::serialized_size(const writer_node& x, sink_pos xpos) const {
expensive_assert(x._transition_length >= 1);
auto inner = serialized_size_body(x, xpos);
auto outer = serialized_size_chain(x, inner);
return node_size((sink_offset(inner) + outer).value);
}
node_size bti_node_sink::serialized_size_chain(const writer_node& x, node_size body_offset) const {
return node_size(x._transition_length == 1 ? 0 : (body_offset.value >= 16 ? 1 : 0) + (x._transition_length - 1) * 2);
}
node_size bti_node_sink::serialized_size_body_type(const writer_node& x, node_type type) const {
switch (type) {
case PAYLOAD_ONLY: {
return node_size(1 + x._payload.blob().size());
}
case SINGLE_NOPAYLOAD_4: {
return node_size(2);
}
case SINGLE_8: {
return node_size(3 + x._payload.blob().size());
}
case SINGLE_NOPAYLOAD_12: {
return node_size(3);
}
case SINGLE_16: {
return node_size(4 + x._payload.blob().size());
}
case SPARSE_8: {
return size_sparse(x, 8);
}
case SPARSE_12: {
return size_sparse(x, 12);
}
case SPARSE_16: {
return size_sparse(x, 16);
}
case SPARSE_24: {
return size_sparse(x, 24);
}
case SPARSE_40: {
return size_sparse(x, 40);
}
case DENSE_12: {
return size_dense(x, 12);
}
case DENSE_16: {
return size_dense(x, 16);
}
case DENSE_24: {
return size_dense(x, 24);
}
case DENSE_32: {
return size_dense(x, 32);
}
case DENSE_40: {
return size_dense(x, 40);
}
case LONG_DENSE: {
return size_dense(x, 64);
}
case NODE_TYPE_COUNT: {
abort();
}
}
abort();
}
node_size bti_node_sink::serialized_size_body(const writer_node& x, sink_pos xpos) const {
return serialized_size_body_type(x, choose_node_type(x, xpos));
}
uint64_t bti_node_sink::page_size() const {
return _page_size;
}
uint64_t bti_node_sink::bytes_left_in_page() const {
return round_up(pos().value + 1, page_size()) - pos().value;
};
void bti_node_sink::pad_to_page_boundary() {
const static std::array<std::byte, max_page_size> zero_page = {};
_w.write(reinterpret_cast<const char*>(zero_page.data()), bytes_left_in_page());
}
sink_pos bti_node_sink::pos() const {
return sink_pos(_w.offset());
}
} // namespace sstables::trie

View File

@@ -0,0 +1,116 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "writer_node.hh"
#include "bti_node_type.hh"
#include "sstables/file_writer.hh"
namespace sstables::trie {
// Assuming the values of _node_size, _branch_size and _id in children are accurate,
// computes the distance between `x` and its farthest child.
//
// This is needed to pick the right BTI node type for `x`.
// (The node type must represent child offsets with enough bits to fit the biggest offset).
sink_offset max_offset_from_child(const writer_node& x, sink_pos pos);
// This object a stream of writer_node nodes (abstract nodes in RAM)
// into a stream of bytes on disk as defined by the BTI format at:
// https://github.com/apache/cassandra/blob/f16fb6765b8a3ff8f49accf61c908791520c0d6e/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.md
//
class bti_node_sink {
sstables::file_writer& _w;
size_t _page_size;
constexpr static size_t max_page_size = 64 * 1024;
public:
bti_node_sink(sstables::file_writer& w, size_t page_size);
private:
// Truncates `x` to the given number of least significant bytes,
// and writes out the result in big endian.
// E.g. write_int(0x1122334455667788ull, 3) writes 0x66 0x77 0x88 to the file.
void write_int(uint64_t x, size_t bytes);
// Writes raw bytes to the output file.
void write_bytes(const_bytes x);
// Writes a sparse-type node (SPARSE_*) to the file.
size_t write_sparse(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos pos);
// Computes the size of a sparse-type node (SPARSE_*).
node_size size_sparse(const writer_node& x, int bits_per_pointer) const;
// Writes a dense-type node (SPARSE_*) to the file.
size_t write_dense(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos pos);
// Computes the size of a dense-type node (DENSE_*).
node_size size_dense(const writer_node& x, int bits_per_pointer) const;
public:
// Writes the final BTI node of the chain corresponding to the writer_node,
// with the given BTI type.
// Offsets to children are computed by taking the difference between `pos`
// and the `_pos` of children.
//
// Note:
// writer_node represents a trie node and the multi-character edge leading to it.
// But BTI doesn't have a compact representation for a multi-character edge.
// Each character in the trie gets its own BTI node,
// and the character corresponding to this node's parent edge is stored in the parent node.
//
// For example, given a `writer_node` X, corresponding to edge "abc",
// `write_body` would write the BTI node containing offsets
// to X's children (and the first byte of their edges),
// while `write_chain` would write the tiny BTI nodes
// holding characters 'b' and 'c'.
// (Character 'a' is held by the parent of X).
//
// Note:
// For the purposes of serialization, `pos` is an abstract ID
// used only to compute the delta between this node's ID and children's ID.
// In real usage, this "abstract" node ID is of course the node's position
// in the file, so that the reader can trivially use the "ID"
// to jump directly to the node's position.
// So the `pos` passed here is always equal to `current_pos()`.
//
// But in tests, the `pos` really is abstract, and the user
// of bti_node_sink can fill the `_pos` of children and the `pos` passed
// here via an argument with whatever it wants, something possibly
// unrelated to `current_pos()`.
void write_body(const writer_node& x, sink_pos pos, node_type type);
// Writes all nodes, except the final node, of the chain corresponding to the writer_node.
// (See write_body for what "chain" means).
sink_pos write_chain(const writer_node& x, node_size body_offset);
// Writes both the body and the chain.
sink_pos write(const writer_node& x, sink_pos pos);
// Computes the size of the given node after serialization, in bytes,
// if it's serialized with the given BTI type.
node_size serialized_size_body_type(const writer_node& x, node_type type) const;
// Computes the size of the body (see write_body notes about "body" vs "chain")
// of the given node, in bytes, assuming it's serialized with the optimal
// BTI type for the given "position" of the body (see write_body notes about `pos`).
node_size serialized_size_body(const writer_node& x, sink_pos pos) const;
// Computes the size of the chain (see write_body notes about "body" vs "chain")
// of the given node, in bytes, assuming it's serialized with the optimal
// BTI types for the given offset between chain and body.
// (Note: the chain is always written immediately after the body, so the offset between
// body and chain is always equal to body size).
node_size serialized_size_chain(const writer_node& x, node_size body_offset) const;
// Computes the total size (body and chain, see write_body notes about "body" vs "chain")
// of the given node, in bytes, assuming it's serialized with the optimal
// BTI type for the given "position" of the body (see write_body notes about `pos`).
node_size serialized_size(const writer_node& x, sink_pos pos) const;
// Page size, same as the one passed via the constructor.
uint64_t page_size() const;
// Returns the number of bytes between current file position
// and the start of the next page.
// In range [0; page_size).
uint64_t bytes_left_in_page() const;
// Appends bytes_left_in_page() nul bytes to the file.
void pad_to_page_boundary();
// Returns the position reported by the underlying `file_writer`.
sink_pos pos() const;
};
static_assert(trie_writer_sink<bti_node_sink>);
} // namespace sstables::trie

View File

@@ -0,0 +1,88 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <cstdint>
#include <iterator>
#include <span>
#include <seastar/core/byteorder.hh>
using const_bytes = std::span<const std::byte>;
namespace sstables::trie {
// Each node type has a 4-bit identifier which is used in the on-disk format.
// The order of this enum mustn't be modified.
enum node_type {
PAYLOAD_ONLY,
SINGLE_NOPAYLOAD_4,
SINGLE_8,
SINGLE_NOPAYLOAD_12,
SINGLE_16,
SPARSE_8,
SPARSE_12,
SPARSE_16,
SPARSE_24,
SPARSE_40,
DENSE_12,
DENSE_16,
DENSE_24,
DENSE_32,
DENSE_40,
LONG_DENSE,
NODE_TYPE_COUNT, // Not a valid value.
};
// For each node type, contains the number of bits used per child offset.
constexpr static const uint8_t bits_per_pointer_arr[] = {
0,
4,
8,
12,
16,
8,
12,
16,
24,
40,
12,
16,
24,
32,
40,
64,
};
static_assert(std::size(bits_per_pointer_arr) == NODE_TYPE_COUNT);
// Given an array of offsets, each of size bits_per_pointer, read the one with index `idx`.
//
// Assumes that bits_per_pointer is divisible by 8 or equal to 12.
//
// Ordering note: an array of 12-bit offsets [0x123, 0x456, 0x789, 0xabc] is
// represented as the byte array 123456789abc.
//
// We want this to be always inlined so that bits_per_pointer is substituted with a constant,
// and this compiles to a simple load, not to a full-fledged memcpy.
[[gnu::always_inline]]
inline uint64_t read_offset(const_bytes sp, int idx, int bits_per_pointer) {
if (bits_per_pointer % 8 == 0) {
auto n = bits_per_pointer / 8;
uint64_t be = 0;
memcpy((char*)&be + 8 - n, (const char*)sp.data() + n * idx, n);
return seastar::be_to_cpu(be);
} else {
if (idx % 2 == 0) {
return seastar::read_be<uint16_t>((const char*)sp.data() + 3 * (idx / 2)) >> 4;
} else {
return seastar::read_be<uint16_t>((const char*)sp.data() + 3 * (idx / 2) + 1) & 0xfff;
}
}
}
} // namespace sstables::trie

View File

@@ -0,0 +1,168 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/future.hh>
// This file defines an interface between the format-agnostic part of a trie
// reader (the cursor which can be set to a specific key and then
// stepped forwards and backwards) and the format-specific part
// (which actually understand the bytes and bits of the nodes on disk).
namespace sstables::trie {
using const_bytes = std::span<const std::byte>;
// Contains information about the result of the (partial) walk down.
//
// For example, if the trie contains strings ("abc", "abd", "bc"),
// then the trie looks like this (where * is the root node):
//
// *
// a-----b
// b c
// c-d
//
// and if node "abd" is on a different page than node "ab", then
// `walk_down_along_key(pos_of_root, "abc")` might
// return node "ab" as the result, with
//
// payload_bits = 0
// n_children = 2
// found_idx = idx_of_child_d_in_node_ab
// found_byte = 'd'
// traversed_key_bytes = 2
// body_pos = pos_of_node_ab
// child_offset = pos_of_node_ab - pos_of_node_abc
struct node_traverse_result {
// Payload bits for this node.
uint8_t payload_bits;
// Number of child indexes (slots) for this node.
// ("Slots" because not all indexes have to be occupied).
int n_children;
// Index of first child whose transition is greater or equal to the next key byte.
// (Or, if such children don't exist, this is equal to n_children).
int found_idx;
// The transition byte of the child at found_idx.
// (Or, if there is no such child, this is equal to -1).
int found_byte;
// The number of edges traversed while walking down.
// I.e. the number of edges between this node and the ancestor
// at which the walk started.
//
// E.g. if we started at node X, and the
// at which the walk started.
int traversed_key_bytes;
// File position of this node.
int64_t body_pos;
// Position of this node minus position of the child at found_idx.
// Only valid if found_idx is valid in range [0, n_children).
int64_t child_offset;
};
// Fields have same meaning as in node_traverse_result.
struct node_traverse_sidemost_result {
uint8_t payload_bits;
int n_children;
int64_t body_pos;
int64_t child_offset;
};
// Contains information about the result of the get_child call.
struct get_child_result {
// The index of the child selected by the call.
int idx;
// The offset of the selected child from the parent.
// (A children always has position smaller than its parent).
uint64_t offset;
};
// Fields have same meaning as in node_traverse_result.
struct load_final_node_result {
uint16_t n_children;
uint8_t payload_bits;
};
// This concept contains all operations which abstract trie nodes
// needs to provide in order so that a trie cursor can be implemented
// over them.
template <typename T>
concept node_reader = requires(T& o, int64_t pos, const_bytes key, int child_idx, bool forwards) {
// Checks if the page containing the given file position has been loaded.
//
// Note: forcing the caller to explicitly call `load` before use
// is error-prone, but it lets us keep the performance-sensitive methods
// synchronous.
{ o.cached(pos) } -> std::same_as<bool>;
// Loads the page containing the given file position, and potentially
// unloads previously loaded pages.
//
// Ensures `cached(pos)` until the next `load(pos)` call.
//
// Precondition: pos lies within the file.
{ o.load(pos) } -> std::same_as<seastar::future<>>;
// Reads some basic information (payload and number of children)
// about the node.
//
// Precondition: cached(pos)
{ o.read_node(pos) } -> std::same_as<load_final_node_result>;
// Walks some distance down the trie
// from node at `pos` along `key`.
// Might walk over any number of *unimportant* nodes,
// but does not walk past the first *important* node.
//
// An *important* node is one which fullfills at least one of the following conditions:
// 1. Has a payload.
// 2. Has more than one child.
//
// (A trie iterator has to remember the important nodes on the path from the root
// to the current position, because stepping forwards or backwards might involve
// going up to the closest important node. Unimportant nodes don't have to be revisited).
//
// Note: we allow this method to traverse many *unimportant* nodes
// at once for optimization reasons. As of this writing, the method always
// walks over one node, but we might want to add an optimization which
// walks over long chains of SINGLE_NOPAYLOAD_4 nodes in batches.
//
// Precondition: cached(pos), key.size() > 0
{ o.walk_down_along_key(pos, key) } -> std::same_as<node_traverse_result>;
// Like walk_down_along_key, but always chooses the leftmost child instead
// of following a key.
//
// Precondition: cached(pos)
{ o.walk_down_leftmost_path(pos) } -> std::same_as<node_traverse_sidemost_result>;
// Like walk_down_along_key, but always chooses the rightmost child instead
// of following a key.
//
// Precondition: cached(pos)
{ o.walk_down_rightmost_path(pos) } -> std::same_as<node_traverse_sidemost_result>;
// Returns some information about a child of node at `pos`.
//
// The child is specified by an index, but not all child indexes must be occupied by a child.
// If an unoccupied index is passed, the method will return information for the nearest
// occupied child index which is greater (if `forwards`) or smaller (if `!forwards`)
//
// (This is used for stepping forwards/backwards over the trie).
//
// Precondition: cached(pos), child_idx lies between the smallest and the largest
// occupied child indexes (inclusive) for this node.
{ o.get_child(pos, child_idx, forwards) } -> std::same_as<get_child_result>;
// Returns the payload for the payload-carrying node at `pos`.
//
// The child is specified by an index, but not all child indexes must be occupied by a child.
// If an unoccupied index is passed, the method will return information for the nearest
// occupied child index which is greater (if `forwards`) or smaller (if `!forwards`)
//
// (This is used for stepping forwards/backwards over the trie).
//
// Precondition: cached(pos), the node carries a payload
{ o.get_payload(pos) } -> std::same_as<const_bytes>;
};
} // namespace sstables::trie

View File

@@ -266,6 +266,8 @@ add_scylla_test(top_k_test
KIND BOOST)
add_scylla_test(transport_test
KIND SEASTAR)
add_scylla_test(bti_node_sink_test
KIND BOOST)
add_scylla_test(trie_writer_test
KIND BOOST)
add_scylla_test(types_test

View File

@@ -0,0 +1,605 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/testing/thread_test_case.hh>
#include <fmt/ranges.h>
#include <ranges>
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "utils/memory_data_sink.hh"
#include "sstables/trie/bti_node_reader.hh"
#include "sstables/trie/bti_node_sink.hh"
// For instantiation of `writer_node::recalc_sizes`.
#include "sstables/trie/writer_node.impl.hh" // IWYU pragma: keep
// Calling BOOST_REQUIRE unconditionally is stupidly expensive.
// Checking the condition first, and only calling the BOOST assertions
// if the condition is false, makes the test orders of magnitude faster.
#define REQUIRE(a) do if (!(a)) BOOST_REQUIRE(a); while (0)
#define REQUIRE_EQUAL(a, b) do if (!((a) == (b))) BOOST_REQUIRE_EQUAL(a, b); while (0)
#define REQUIRE_GE(a, b) do if (!((a) >= (b))) BOOST_REQUIRE_GE(a, b); while (0)
#define REQUIRE_LE(a, b) do if (!((a) <= (b))) BOOST_REQUIRE_LE(a, b); while (0)
using namespace sstables::trie;
inline const_bytes string_as_bytes(std::string_view sv) {
return std::as_bytes(std::span(sv.data(), sv.size()));
}
inline std::string_view bytes_as_string(const_bytes sv) {
return {reinterpret_cast<const char*>(sv.data()), sv.size()};
}
static std::vector<std::byte> linearize(const memory_data_sink_buffers& bufs) {
std::vector<std::byte> retval;
for (const auto& frag : bufs.buffers()) {
auto v = std::as_bytes(std::span(frag));
retval.insert(retval.end(), v.begin(), v.end());
}
return retval;
}
std::vector<uint8_t> unpack_bitstring(const_bytes packed) {
std::vector<uint8_t> unpacked;
for (const auto byte : packed) {
for (int i = 7; i >= 0; --i) {
unpacked.push_back((uint8_t(byte) >> i) & 1);
}
}
return unpacked;
}
// Test read_offset() on a random blob, by unpacking the bits
// of the blob and the bits of read_offset() and checking that
// the relevant bitstrings are equal.
SEASTAR_THREAD_TEST_CASE(test_read_offset) {
auto test_blob_buf = tests::random::get_bytes(256);
auto test_string = std::as_bytes(std::span(test_blob_buf));
auto test_bitstring = unpack_bitstring(test_string);
for (const int width : {8, 12, 16, 24, 32, 48, 56, 64}) {
for (int idx = 0; idx * width + width <= int(test_string.size()); ++idx) {
uint64_t result = read_offset(test_string, idx, width);
auto read_blob = unpack_bitstring(object_representation(seastar::cpu_to_be(result)));
auto actual_bitstring = std::span(read_blob).subspan(64 - width, width);
auto expected_bitstring = std::span(test_bitstring).subspan(idx * width, width);
REQUIRE(std::ranges::equal(actual_bitstring, expected_bitstring));
}
}
}
std::vector<std::byte> serialize_body(const writer_node& node, sink_pos pos, node_type type) {
memory_data_sink_buffers bufs;
constexpr size_t page_size = 4096;
sstables::file_writer fw(data_sink(std::make_unique<memory_data_sink>(bufs)));
bti_node_sink serializer(fw, page_size);
auto sz = serializer.serialized_size_body_type(node, type);
serializer.write_body(node, pos, type);
fw.close();
REQUIRE(fw.offset() <= page_size);
REQUIRE(fw.offset() == uint64_t(sz.value));
return linearize(bufs);
}
struct serialize_chain_result {
std::vector<std::byte> serialized;
size_t starting_point;
};
serialize_chain_result serialize_chain(const writer_node& node, node_size body_offset) {
memory_data_sink_buffers bufs;
constexpr size_t page_size = 4096;
sstables::file_writer fw(data_sink(std::make_unique<memory_data_sink>(bufs)));
bti_node_sink serializer(fw, page_size);
auto sz = serializer.serialized_size_chain(node, body_offset);
auto starting_point = serializer.write_chain(node, body_offset);
fw.close();
REQUIRE(fw.offset() <= page_size);
REQUIRE(fw.offset() == uint64_t(sz.value));
return {linearize(bufs), starting_point.value};
}
struct payload_result {
uint8_t bits;
const_bytes bytes;
};
struct deserialize_node_result {
std::vector<uint64_t> offsets;
std::vector<std::byte> transitions;
payload_result payload;
};
struct lookup_result {
int idx;
int byte;
uint64_t offset;
};
inline lookup_result get_child(int64_t pos, const_bytes raw, int idx, bool forward) {
auto n_children = bti_read_node(pos, raw).n_children;
auto found_child = bti_get_child(pos, raw, idx, forward);
lookup_result result;
result.idx = found_child.idx;
result.offset = found_child.offset;
if (0 <= result.idx && result.idx < n_children) {
result.byte = int(bti_get_child_transition(pos, raw, result.idx));
} else {
result.byte = -1;
}
return result;
}
inline payload_result get_payload(int64_t pos, const_bytes raw) {
auto bits = bti_read_node(pos, raw).payload_bits;
auto bytes = bti_get_payload(pos, raw);
return {bits, bytes};
}
inline int get_n_children(int64_t pos, const_bytes raw) {
return bti_read_node(pos, raw).n_children;
}
deserialize_node_result deserialize_body(int64_t pos, const_bytes raw) {
deserialize_node_result result;
auto n_children = get_n_children(pos, raw);
for (int i = 0; i < n_children; ++i) {
auto child = get_child(pos, raw, i, true);
i = child.idx;
result.offsets.push_back(child.offset);
REQUIRE_GE(child.byte, 0);
result.transitions.push_back(std::byte(child.byte));
}
result.payload = get_payload(pos, raw);
return result;
}
struct deserialize_chain_result {
std::vector<std::byte> transition;
node_size body_offset;
};
deserialize_chain_result deserialize_chain(const_bytes raw, size_t start_point) {
REQUIRE(start_point < raw.size());
deserialize_chain_result result;
while (true) {
auto n_children = get_n_children(start_point, raw.subspan(start_point));
REQUIRE(n_children == 1);
auto child = get_child(start_point, raw.subspan(start_point), 0, true);
REQUIRE(child.idx == 0);
REQUIRE(child.offset > 0);
REQUIRE(child.offset < 4096);
REQUIRE_GE(child.byte, 0);
result.transition.push_back(std::byte(child.byte));
if (child.offset > start_point) {
REQUIRE(start_point == 0);
result.body_offset = node_size(child.offset);
return result;
}
start_point -= child.offset;
}
}
bool eligible(node_type type, const writer_node& node, sink_pos pos) {
auto max_offset = max_offset_from_child(node, pos);
REQUIRE(max_offset.valid());
auto width = std::bit_width<uint64_t>(max_offset.value);
switch (type) {
case PAYLOAD_ONLY:
return node.get_children().size() == 0;
case SINGLE_NOPAYLOAD_4:
return node.get_children().size() == 1 && width <= 4 && node._payload._payload_bits == 0;
case SINGLE_8:
return node.get_children().size() == 1 && width <= 8;
case SINGLE_NOPAYLOAD_12:
return node.get_children().size() == 1 && width <= 12 && node._payload._payload_bits == 0;
case SINGLE_16:
return node.get_children().size() == 1 && width <= 16;
case SPARSE_8:
return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 8;
case SPARSE_12:
return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 12;
case SPARSE_16:
return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 16;
case SPARSE_24:
return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 24;
case SPARSE_40:
return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 40;
case DENSE_12:
return node.get_children().size() >= 1 && width <= 12;
case DENSE_16:
return node.get_children().size() >= 1 && width <= 16;
case DENSE_24:
return node.get_children().size() >= 1 && width <= 24;
case DENSE_32:
return node.get_children().size() >= 1 && width <= 32;
case DENSE_40:
return node.get_children().size() >= 1 && width <= 40;
case LONG_DENSE:
return node.get_children().size() >= 1;
default: abort();
}
}
// Generates multiple interesting sets of child edges for a trie node,
// (which try to stress various conditionals in the encoding).
std::vector<std::vector<uint8_t>> get_some_interesting_transition_sets() {
std::vector<std::vector<uint8_t>> result;
// 0 children. Important edge case.
result.push_back({});
// 1 child, at both extremes and in the middle. (Extremes are useful to ensure that there is no weird wrapping).
result.push_back({0x00});
result.push_back({0x7f});
result.push_back({0xff});
// 2 children, at both extremes, and also with a gap between them (to test unused child slots in DENSE nodes).
result.push_back({0x00, 0x01});
result.push_back({0xfe, 0xff});
result.push_back({0x00, 0xff});
// 256 children, with all possible transition bytes.
// Edge case.
auto full = std::ranges::iota_view(0x00, 0x100) | std::ranges::to<std::vector<uint8_t>>();
result.push_back(full);
// 255 children, with all possible transition bytes except one.
auto almost_full = full;
almost_full.erase(almost_full.begin() + 100);
result.push_back(almost_full);
return result;
}
// For the given number of children and the given max supported integer width,
// generates a few sets of interesting child offsets.
// (Where "interesting" in this case means that they are just close to extremes of the supported integer
// range. This checks that no bits are lost).
std::vector<std::vector<int64_t>> get_some_interesting_child_offsets(int width, int n_children) {
std::vector<std::vector<int64_t>> result;
int64_t max = (int64_t(1) << width) - 1;
auto clamp_to_legal = [=] (int64_t v) {
return std::clamp<int64_t>(v, 1, max);
};
auto clamped_iota = [=] (int64_t a, int64_t b) {
return std::ranges::iota_view(a, b) | std::views::transform(clamp_to_legal) | std::ranges::to<std::vector>();
};
result.emplace_back(clamped_iota(1, n_children + 1));
result.emplace_back(clamped_iota(max - n_children + 1, max + 1));
return result;
}
writer_node::ptr<writer_node> make_node(
sink_pos pos,
const_bytes transition,
std::span<const uint8_t> child_transitions,
std::span<const int64_t> child_offsets,
std::optional<trie_payload> payload,
bump_allocator& alctr
) {
auto node = writer_node::create(transition, alctr);
for (size_t i = 0; i < child_transitions.size(); ++i) {
std::byte transition[] = {std::byte(child_transitions[i])};
auto child = node->add_child(transition, alctr);
child->_pos = pos - sink_offset(child_offsets[i]);
child->_node_size = node_size(1);
child->_branch_size = sink_offset(0);
child->_first_transition_byte = transition[0];
}
if (payload) {
node->set_payload(*payload);
}
return node;
}
void test_one_body(
std::span<const uint8_t> child_transitions,
std::span<const int64_t> offsets,
std::span<const std::byte> incoming_transition,
const std::optional<trie_payload>& payload_opt,
sink_pos pos
) {
testlog.trace("transitions={} offsets={} payload={}", child_transitions, offsets, bool(payload_opt));
SCYLLA_ASSERT(child_transitions.size() == offsets.size());
bump_allocator alctr(128 * 1024);
auto node = make_node(pos, incoming_transition, child_transitions, offsets, payload_opt, alctr);
for (node_type type = node_type(0); type < NODE_TYPE_COUNT; type = node_type(int(type) + 1)) {
testlog.trace("type={}", int(type));
if (eligible(type, *node, pos)) {
auto serialized = serialize_body(*node, pos, type);
testlog.trace("serialized={}", fmt_hex(serialized));
auto deserialized = deserialize_body(pos.value, serialized);
testlog.trace("deserialized_transitions={:x}", fmt::join(deserialized.transitions, ", "));
REQUIRE_EQUAL(deserialized.offsets.size(), child_transitions.size());
for (size_t i = 0; i < child_transitions.size(); ++i) {
REQUIRE_EQUAL(child_transitions[i], int(deserialized.transitions[i]));
REQUIRE_EQUAL(uint64_t(offsets[i]), deserialized.offsets[i]);
}
if (payload_opt) {
REQUIRE_EQUAL(deserialized.payload.bits, payload_opt->_payload_bits);
REQUIRE(std::ranges::equal(
deserialized.payload.bytes.subspan(0, payload_opt->blob().size()),
payload_opt->blob()));
} else {
REQUIRE_EQUAL(deserialized.payload.bits, 0);
}
int n_slots = 0;
testlog.trace("Test bti_read_node");
{
auto result = bti_read_node(pos.value, serialized);
REQUIRE_GE(result.n_children, child_transitions.size());
n_slots = result.n_children;
REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0);
}
testlog.trace("Test bti_walk_down_leftmost_path");
{
auto result = bti_walk_down_leftmost_path(pos.value, serialized);
REQUIRE_EQUAL(result.body_pos, pos.value);
REQUIRE_EQUAL(result.n_children, n_slots);
REQUIRE_EQUAL(result.child_offset, offsets.empty() ? -1 : offsets.front());
REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0);
}
testlog.trace("Test bti_walk_down_rightmost_path");
{
auto result = bti_walk_down_rightmost_path(pos.value, serialized);
REQUIRE_EQUAL(result.body_pos, pos.value);
REQUIRE_EQUAL(result.n_children, n_slots);
REQUIRE_EQUAL(result.child_offset, offsets.empty() ? -1 : offsets.back());
REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0);
}
testlog.trace("Test bti_walk_down_along_key");
{
for (int key_byte = 0; key_byte < 256; ++key_byte) {
//testlog.trace("key_byte={}", key_byte);
auto k = std::byte(key_byte);
auto result = bti_walk_down_along_key(pos.value, serialized, std::span<const std::byte>(&k, 1));
auto target_child = std::ranges::lower_bound(child_transitions, uint8_t(k)) - child_transitions.begin();
auto target_byte = target_child < int(child_transitions.size()) ? child_transitions[target_child] : -1;
auto target_offset = target_child < int(child_transitions.size()) ? offsets[target_child] : -1;
REQUIRE_EQUAL(result.found_byte, target_byte);
REQUIRE_EQUAL(result.traversed_key_bytes, 0);
REQUIRE_EQUAL(result.n_children, n_slots);
REQUIRE_LE(result.found_idx, result.n_children);
REQUIRE_GE(result.found_idx, 0);
REQUIRE_EQUAL(result.child_offset, target_offset);
REQUIRE_EQUAL(result.body_pos, pos.value);
REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0);
if (result.found_idx < result.n_children) {
auto child = get_child(pos.value, serialized, result.found_idx, true);
REQUIRE_EQUAL(result.found_byte, child.byte);
REQUIRE_EQUAL(result.found_idx, child.idx);
REQUIRE_EQUAL(result.child_offset, int64_t(child.offset));
}
}
}
testlog.trace("Test bti_get_child");
{
int closest_occupied_slot = -1;
int n_observed_children = 0;
testlog.trace("Forwards");
for (int i = n_slots - 1; i >= 0; --i) {
//testlog.trace("i={}", i);
auto result = bti_get_child(pos.value, serialized, i, true);
if (result.idx == i) {
closest_occupied_slot = i;
++n_observed_children;
REQUIRE_EQUAL(int64_t(result.offset), offsets[offsets.size() - n_observed_children]);
} else {
REQUIRE_EQUAL(result.idx, closest_occupied_slot);
}
}
REQUIRE_EQUAL(n_observed_children, int(offsets.size()));
testlog.trace("Backwards");
closest_occupied_slot = -1;
n_observed_children = 0;
for (int i = 0; i < n_slots; ++i) {
//testlog.trace("i={}", i);
auto result = bti_get_child(pos.value, serialized, i, false);
if (result.idx == i) {
closest_occupied_slot = i;
++n_observed_children;
REQUIRE_EQUAL(int64_t(result.offset), offsets[n_observed_children - 1]);
} else {
REQUIRE_EQUAL(result.idx, closest_occupied_slot);
}
}
REQUIRE_EQUAL(n_observed_children, int(offsets.size()));
}
}
}
}
// Tests the encoding of `writer_node`'s "body"
// (see the comment at the declaration fo bti_node_sink::write_body for what "body" means).
//
// Tries to cover all BTI node types and interesting node "shapes"
// (e.g. with and without gaps in the list of occupied child indexes).
//
// Generates various "interesting" nodes. For each such node, it BTI-encodes it,
// and checks that all BTI node traversal routines give the expected result
// on it.
SEASTAR_THREAD_TEST_CASE(test_body) {
std::vector<std::vector<uint8_t>> interesting_transition_sets = get_some_interesting_transition_sets();
// Arbitrary, but large enough to cover all interesting widths.
auto pos = sink_pos((uint64_t(1) << 60) + 1);
auto whatever = string_as_bytes("hahaha");
const auto custom_payload = trie_payload(0x7, string_as_bytes("lololo"));
for (int width = 1; width < 50; ++width)
for (const auto& child_transitions : interesting_transition_sets)
for (const auto& offsets : get_some_interesting_child_offsets(width, child_transitions.size()))
for (bool payload : {true, false}) {
auto payload_opt = payload ? std::optional<trie_payload>(custom_payload) : std::optional<trie_payload>();
test_one_body(child_transitions, offsets, whatever, payload_opt, pos);
}
}
static uint64_t bitwidth_mask(uint64_t width) {
if (width == 0) {
return 0;
}
return uint64_t(-1) >> (64 - width);
}
static uint64_t get_random_int_bitwidth_weighted(uint64_t max) {
if (max == 0) {
return 0;
}
auto x = tests::random::get_int<uint64_t>(1, max, tests::random::gen());
auto max_width = std::bit_width(x);
auto width = tests::random::get_int<uint64_t>(1, max_width, tests::random::gen());
return x & bitwidth_mask(width);
}
// Like `test_body` but with randomized parameters.
SEASTAR_THREAD_TEST_CASE(test_body_randomized) {
for (uint64_t trial = 0; trial < 1337; ++trial) {
auto n_children = get_random_int_bitwidth_weighted(256);
std::vector<int64_t> offsets;
auto max_offset_width = tests::random::get_int<uint64_t>(1, 63, tests::random::gen());
for (uint64_t i = 0; i < n_children; ++i) {
auto off = get_random_int_bitwidth_weighted(bitwidth_mask(max_offset_width));
offsets.push_back(std::max<int64_t>(1, off));
}
std::array<uint8_t, 256> possible_transitions;
std::ranges::iota(possible_transitions, 0);
std::array<uint8_t, 256> transitions_buf;
auto child_transitions = std::span(transitions_buf.begin(), n_children);
std::ranges::sample(possible_transitions, child_transitions.begin(), child_transitions.size(), tests::random::gen());
std::ranges::sort(child_transitions);
std::optional<trie_payload> payload_opt;
bool has_payload = tests::random::get_int<int>(0, 1, tests::random::gen());
if (has_payload) {
auto bits = tests::random::get_int<int>(1, 15, tests::random::gen());
auto length = tests::random::get_int<int>(1, trie_payload::MAX_PAYLOAD_SIZE, tests::random::gen());
auto bytes = tests::random::get_bytes(length);
payload_opt.emplace(bits, std::as_bytes(std::span(bytes)));
}
auto max_offset = offsets.size() ? std::ranges::max(offsets) : 0;
auto pos = tests::random::get_int<int64_t>(max_offset, std::numeric_limits<int64_t>::max(), tests::random::gen());
// The transition doesn't matter for body serialization.
auto incoming_transition = string_as_bytes("hahaha");
test_one_body(child_transitions, offsets, incoming_transition, payload_opt, sink_pos(pos));
}
}
// Tests the encoding of `writer_node`'s "chain"
// (see the comment at the declaration fo bti_node_sink::write_body for what "chain" means).
// Tries to cover all BTI node types and interesting node "shapes"
// (e.g. with and without gaps in the list of occupied child indexes).
SEASTAR_THREAD_TEST_CASE(test_chain) {
std::vector<bytes> interesting_transitions;
// 65 is supposed to be long enough to cover the SIMD code paths.
for (int i = 2; i < 65; ++i) {
interesting_transitions.push_back(tests::random::get_bytes(i));
}
std::vector<node_size> interesting_body_offsets;
interesting_body_offsets.emplace_back(1);
interesting_body_offsets.emplace_back(2);
interesting_body_offsets.emplace_back(15);
interesting_body_offsets.emplace_back(16);
interesting_body_offsets.emplace_back(17);
auto test_one = [] (const_bytes transition, node_size body_offset) {
bump_allocator alctr(128 * 1024);
auto node = writer_node::create(transition, alctr);
auto [serialized, starting_point] = serialize_chain(*node, body_offset);
auto deserialized = deserialize_chain(serialized, starting_point);
REQUIRE(deserialized.body_offset.value == body_offset.value);
REQUIRE(std::ranges::equal(deserialized.transition, transition.subspan(1)));
};
for (const auto& off : interesting_body_offsets) {
auto some_transition = std::as_bytes(std::span(interesting_transitions.back()));
test_one(some_transition, off);
}
for (const auto& transition : interesting_transitions) {
auto transition_view = std::as_bytes(std::span(transition));
auto some_offset = interesting_body_offsets.back();
test_one(transition_view, some_offset);
}
}
// Reproduces an issue with `max_offset_from_child` which I encountered
// during development and which wasn't detected by the previous unit tests.
// (It was only detected by randomized integration tests.
// `mutation_source_test`, IIRC).
//
// Specifically: I forgot to add an `if` for the special case when
// `chain_length == 2 && child->_node_size >= 16`.
// and this was only detected by random integration tests.
// The test hits that branch and would fail without it.
SEASTAR_THREAD_TEST_CASE(test_max_offset_from_child_consistent_across_write) {
auto test_one = [] (const_bytes transition, size_t payload_size) {
testlog.trace("Testing transition={} payload_size={}", fmt_hex(transition), payload_size);
// Dummy output stream.
memory_data_sink_buffers bufs;
sstables::file_writer fw(data_sink(std::make_unique<memory_data_sink>(bufs)));
fw.write(to_bytes_view(std::string_view("Let's offset the stream to make the test just a bit more general.")));
// BTI node serializer.
constexpr size_t page_size = 4096;
bump_allocator alctr(128 * 1024);
bti_node_sink serializer(fw, page_size);
auto starting_pos = serializer.pos();
// Create a parent and a child. Their node size and branch size are uninitialized.
auto parent = writer_node::create(string_as_bytes("abc"), alctr);
auto child = parent->add_child(transition, alctr);
child->set_payload(trie_payload(1, string_as_bytes(std::string(payload_size, 'z'))));
// max_offset_from_child needs to know the sizes (node size, branch size) of children.
// We could compute them manually, but we can use `recalc_sizes` too.
// But we have to set `_has_out_of_page_descendants`` to force `recalc_sizes`
// to reculculate the sizes.
child->_has_out_of_page_descendants = true;
auto child_size = writer_node::recalc_sizes(child, serializer, starting_pos);
child->_has_out_of_page_descendants = false;
// Check that the size prediction done by max_offset_from_child
// is consistent with the actual distance to the child after it is written.
auto before = max_offset_from_child(*parent, serializer.pos() + child_size);
writer_node::write(child, serializer, true);
auto after = max_offset_from_child(*parent, serializer.pos());
REQUIRE_EQUAL(before.value, after.value);
};
// Pick various transition lenghts and payload sizes to exercise all
// branches in max_offset_from_child()
std::vector<bytes> interesting_transitions;
for (int i = 1; i < 32; ++i) {
interesting_transitions.push_back(tests::random::get_bytes(i));
}
std::vector<size_t> interesting_payload_sizes;
interesting_payload_sizes.emplace_back(1);
interesting_payload_sizes.emplace_back(20);
for (const auto& transition : interesting_transitions)
for (const auto& payload_size : interesting_payload_sizes) {
auto transition_view = std::as_bytes(std::span(transition));
test_one(transition_view, payload_size);
}
}

View File

@@ -104,6 +104,10 @@ private:
SCYLLA_ASSERT(!_use_count);
}
size_t pos() const {
return idx * page_size;
}
void on_evicted() noexcept override;
temporary_buffer<char> get_buf() {
@@ -124,6 +128,12 @@ private:
return _lsa_buf.get();
}
// Returns a pointer to the contents of the page.
// The buffer can be invalidated when the page is evicted or when the owning LSA region invalidates references.
std::span<const std::byte> get_view() const {
return std::as_bytes(std::span<const char>(_lsa_buf.get(), _lsa_buf.size()));
}
size_t size_in_allocator() {
// lsa_buf occupies 4K in LSA even if the buf size is smaller.
// _buf is transient and not accounted here.
@@ -152,9 +162,21 @@ private:
offset_type _last_page_size;
page_idx_type _last_page;
public:
using ptr_type = cached_page::ptr_type;
struct page_read_result {
ptr_type ptr;
bool was_already_cached;
page_read_result(ptr_type p, bool cached)
: ptr(std::move(p))
, was_already_cached(cached)
{}
};
future<page_read_result> get_shared_page(size_t global_pos, tracing::trace_state_ptr trace_state) {
return get_page_ptr(global_pos / page_size, 1, trace_state);
}
private:
// Returns (page, true) if the page was cached, and (page, false) if the page was uncached.
future<std::pair<cached_page::ptr_type, bool>> get_page_ptr(page_idx_type idx,
future<page_read_result> get_page_ptr(page_idx_type idx,
page_count_type read_ahead,
tracing::trace_state_ptr trace_state,
std::optional<reader_permit> permit = {}) {
@@ -163,7 +185,7 @@ private:
++_metrics.page_hits;
tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx);
cached_page& cp = *i;
return make_ready_future<std::pair<cached_page::ptr_type, bool>>(cp.share(), true);
return make_ready_future<page_read_result>(cp.share(), true);
}
tracing::trace(trace_state, "page cache miss: file={}, page={}, readahead={}", _file_name, idx, read_ahead);
++_metrics.page_misses;
@@ -206,7 +228,7 @@ private:
utils::get_local_injector().inject("cached_file_get_first_page", []() {
throw std::bad_alloc();
});
return std::pair<cached_page::ptr_type, bool>(std::move(first_page), false);
return page_read_result{std::move(first_page), false};
});
}
// Returns (page, true) if the page was cached, and (page, false) if the page was uncached.
@@ -214,14 +236,14 @@ private:
page_count_type count,
tracing::trace_state_ptr trace_state,
std::optional<reader_permit> permit = {}) {
return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (std::pair<cached_page::ptr_type, bool> cp) mutable {
auto buf = cp.first->get_buf();
return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (page_read_result read_result) mutable {
auto buf = read_result.ptr->get_buf();
if (permit) {
auto units = permit->consume_memory(buf.size());
buf = temporary_buffer<char>(buf.get_write(), buf.size(),
make_object_deleter(buf.release(), std::move(units)));
}
return std::make_pair(std::move(buf), cp.second);
return std::make_pair(std::move(buf), read_result.was_already_cached);
});
}
public:
@@ -385,15 +407,15 @@ public:
}
page_count_type readahead = div_ceil(_size_hint, page_size);
return _cached_file->get_page_ptr(_page_idx, readahead, _trace_state, _permit).then(
[this] (std::pair<cached_page::ptr_type, bool> read_result) mutable {
auto page = std::move(read_result.first);
[this] (page_read_result read_result) mutable {
auto page = std::move(read_result.ptr);
size_t size = _page_idx == _cached_file->_last_page
? _cached_file->_last_page_size
: page_size;
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), get_page_units());
_offset_in_page = 0;
++_page_idx;
shrink_size_hint(read_result.second);
shrink_size_hint(read_result.was_already_cached);
return buf;
});
}