diff --git a/configure.py b/configure.py index 7689dddd88..6c5d9d9f4f 100755 --- a/configure.py +++ b/configure.py @@ -565,6 +565,7 @@ scylla_tests = set([ 'test/boost/token_metadata_test', 'test/boost/top_k_test', 'test/boost/transport_test', + 'test/boost/bti_node_sink_test', 'test/boost/trie_writer_test', 'test/boost/symmetric_key_test', 'test/boost/types_test', @@ -866,6 +867,8 @@ scylla_core = (['message/messaging_service.cc', 'sstables/random_access_reader.cc', 'sstables/metadata_collector.cc', 'sstables/writer.cc', + 'sstables/trie/bti_node_reader.cc', + 'sstables/trie/bti_node_sink.cc', 'sstables/trie/trie_writer.cc', 'transport/cql_protocol_extension.cc', 'transport/event.cc', diff --git a/sstables/CMakeLists.txt b/sstables/CMakeLists.txt index 96be2dc50a..2b3d0fcd49 100644 --- a/sstables/CMakeLists.txt +++ b/sstables/CMakeLists.txt @@ -20,6 +20,8 @@ target_sources(sstables sstables_manager.cc sstable_version.cc storage.cc + trie/bti_node_reader.cc + trie/bti_node_sink.cc trie/trie_writer.cc writer.cc) target_include_directories(sstables diff --git a/sstables/exceptions.hh b/sstables/exceptions.hh index a78d001432..741a132531 100644 --- a/sstables/exceptions.hh +++ b/sstables/exceptions.hh @@ -29,6 +29,7 @@ public: }; [[noreturn]] void on_parse_error(sstring message, std::optional filename); +[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos); // Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables. // SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 7a4c80bad3..981afb2202 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -119,6 +119,10 @@ logging::logger sstlog("sstable"); on_internal_error(sstlog, std::move(ex)); } +[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos) { + on_internal_error(sstlog, fmt::format("BTI parse error for node at pos {}", pos)); +} + template const char* nullsafe_typename(T* x) noexcept { try { diff --git a/sstables/trie/bti_node_reader.cc b/sstables/trie/bti_node_reader.cc new file mode 100644 index 0000000000..6c3de3fb7e --- /dev/null +++ b/sstables/trie/bti_node_reader.cc @@ -0,0 +1,483 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "bti_node_reader.hh" +#include "bti_node_type.hh" + +namespace sstables::trie { + +get_child_result bti_get_child(uint64_t pos, const_bytes sp, int child_idx, bool forward) { + auto type = uint8_t(sp[0]) >> 4; + trie::get_child_result result; + auto single = [&](uint64_t offset) { + result.offset = offset; + result.idx = 0; + return result; + }; + auto sparse = [&] [[gnu::always_inline]] (int type) { + auto bpp = bits_per_pointer_arr[type]; + result.idx = child_idx; + result.offset = read_offset(sp.subspan(2 + int(sp[1])), child_idx, bpp); + return result; + }; + auto dense = [&] [[gnu::always_inline]] (int type) { + auto bpp = bits_per_pointer_arr[type]; + auto dense_span = uint64_t(sp[2]) + 1; + int idx = child_idx; + int increment = forward ? 1 : -1; + while (idx < int(dense_span) && idx >= 0) { + if (auto off = read_offset(sp.subspan(3), idx, bpp)) { + result.idx = idx; + result.offset = off; + return result; + } else { + idx += increment; + } + } + [[unlikely]] sstables::on_bti_parse_error(pos); + }; + switch (type) { + case PAYLOAD_ONLY: + [[unlikely]] sstables::on_bti_parse_error(pos); + case SINGLE_NOPAYLOAD_4: + return single(uint64_t(sp[0]) & 0xf); + case SINGLE_NOPAYLOAD_12: + return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1])); + case SINGLE_8: + return single(uint64_t(sp[2])); + case SINGLE_16: + return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3])); + case SPARSE_8: + return sparse(type); + case SPARSE_12: + return sparse(type); + case SPARSE_16: + return sparse(type); + case SPARSE_24: + return sparse(type); + case SPARSE_40: + return sparse(type); + case DENSE_12: + return dense(type); + case DENSE_16: + return dense(type); + case DENSE_24: + return dense(type); + case DENSE_32: + return dense(type); + case DENSE_40: + return dense(type); + case LONG_DENSE: + return dense(type); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +std::byte bti_get_child_transition(uint64_t pos, const_bytes raw, int idx) { + auto type = uint8_t(raw[0]) >> 4; + switch (type) { + case PAYLOAD_ONLY: + abort(); + case SINGLE_NOPAYLOAD_4: + return raw[1]; + case SINGLE_8: + return raw[1]; + case SINGLE_NOPAYLOAD_12: + return raw[2]; + case SINGLE_16: + return raw[1]; + case SPARSE_8: + case SPARSE_12: + case SPARSE_16: + case SPARSE_24: + case SPARSE_40: + return raw[2 + idx]; + case DENSE_12: + case DENSE_16: + case DENSE_24: + case DENSE_32: + case DENSE_40: + case LONG_DENSE: + return std::byte(uint8_t(raw[1]) + idx); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +load_final_node_result bti_read_node(int64_t pos, const_bytes sp) { + load_final_node_result result; + auto type = uint8_t(sp[0]) >> 4; + auto single = [&](uint8_t payload_bits) { + result.n_children = 1; + result.payload_bits = payload_bits; + return result; + }; + auto sparse = [&] [[gnu::always_inline]] (int type) { + int n_children = int(sp[1]); + result.n_children = n_children; + result.payload_bits = uint8_t(sp[0]) & 0xf; + return result; + }; + auto dense = [&] [[gnu::always_inline]] (int type) { + auto dense_span = uint64_t(sp[2]) + 1; + result.n_children = dense_span; + result.payload_bits = uint8_t(sp[0]) & 0xf; + return result; + }; + switch (type) { + case PAYLOAD_ONLY: + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.n_children = 0; + return result; + case SINGLE_NOPAYLOAD_4: + return single(0); + case SINGLE_NOPAYLOAD_12: + return single(0); + case SINGLE_8: + return single(uint8_t(sp[0]) & 0xf); + case SINGLE_16: + return single(uint8_t(sp[0]) & 0xf); + case SPARSE_8: + return sparse(type); + case SPARSE_12: + return sparse(type); + case SPARSE_16: + return sparse(type); + case SPARSE_24: + return sparse(type); + case SPARSE_40: + return sparse(type); + case DENSE_12: + return dense(type); + case DENSE_16: + return dense(type); + case DENSE_24: + return dense(type); + case DENSE_32: + return dense(type); + case DENSE_40: + return dense(type); + case LONG_DENSE: + return dense(type); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +const_bytes bti_get_payload(int64_t pos, const_bytes sp) { + auto type = uint8_t(sp[0]) >> 4; + switch (type) { + case PAYLOAD_ONLY: + return sp.subspan(1); + case SINGLE_NOPAYLOAD_4: + case SINGLE_NOPAYLOAD_12: + return sp.subspan(1 + div_ceil(bits_per_pointer_arr[type], 8)); + case SINGLE_8: + case SINGLE_16: + return sp.subspan(2 + div_ceil(bits_per_pointer_arr[type], 8)); + case SPARSE_8: + case SPARSE_12: + case SPARSE_16: + case SPARSE_24: + case SPARSE_40: { + auto n_children = uint8_t(sp[1]); + return sp.subspan(2 + div_ceil(n_children * (8 + bits_per_pointer_arr[type]), 8)); + } + case DENSE_12: + case DENSE_16: + case DENSE_24: + case DENSE_32: + case DENSE_40: + case LONG_DENSE: { + auto dense_span = uint8_t(sp[2]) + 1; + return sp.subspan(3 + div_ceil(dense_span * bits_per_pointer_arr[type], 8)); + } + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +node_traverse_result bti_walk_down_along_key(int64_t pos, const_bytes sp, const_bytes key) { + auto type = uint8_t(sp[0]) >> 4; + trie::node_traverse_result result; + result.body_pos = pos; + result.traversed_key_bytes = 0; + auto single = [&](std::byte edge, uint64_t offset, uint8_t payload_bits) { + result.n_children = 1; + result.payload_bits = payload_bits; + if (key[0] <= edge) { + result.found_idx = 0; + result.found_byte = int(edge); + result.child_offset = offset; + } else { + result.found_idx = 1; + result.found_byte = -1; + result.child_offset = -1; + } + return result; + }; + auto sparse = [&] [[gnu::always_inline]] (int type) { + int n_children = int(sp[1]); + auto idx = std::lower_bound(&sp[2], &sp[2 + n_children], key[0]) - &sp[2]; + result.n_children = n_children; + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.found_idx = idx; + if (idx < n_children) { + auto bpp = bits_per_pointer_arr[type]; + result.child_offset = read_offset(sp.subspan(2 + n_children), idx, bpp); + result.found_byte = int(sp[2 + idx]); + } else { + result.child_offset = -1; + result.found_byte = -1; + } + return result; + }; + auto dense = [&] [[gnu::always_inline]] (int type) { + auto start = int(sp[1]); + auto idx = std::max(0, int(key[0]) - start); + auto dense_span = uint64_t(sp[2]) + 1; + auto bpp = bits_per_pointer_arr[type]; + result.n_children = dense_span; + result.payload_bits = uint8_t(sp[0]) & 0xf; + while (idx < int(dense_span)) { + if (auto off = read_offset(sp.subspan(3), idx, bpp)) { + result.child_offset = off; + result.found_idx = idx; + result.found_byte = start + idx; + return result; + } else { + ++idx; + } + } + result.found_idx = dense_span; + result.child_offset = -1; + result.found_byte = -1; + return result; + }; + switch (type) { + case PAYLOAD_ONLY: + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.n_children = 0; + result.found_idx = 0; + result.found_byte = -1; + result.child_offset = -1; + return result; + case SINGLE_NOPAYLOAD_4: + return single(sp[1], uint64_t(sp[0]) & 0xf, 0); + case SINGLE_NOPAYLOAD_12: + return single(sp[2], (uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0); + case SINGLE_8: + return single(sp[1], uint64_t(sp[2]), uint8_t(sp[0]) & 0xf); + case SINGLE_16: + return single(sp[1], uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf); + case SPARSE_8: + return sparse(type); + case SPARSE_12: + return sparse(type); + case SPARSE_16: + return sparse(type); + case SPARSE_24: + return sparse(type); + case SPARSE_40: + return sparse(type); + case DENSE_12: + return dense(type); + case DENSE_16: + return dense(type); + case DENSE_24: + return dense(type); + case DENSE_32: + return dense(type); + case DENSE_40: + return dense(type); + case LONG_DENSE: + return dense(type); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +node_traverse_sidemost_result bti_walk_down_leftmost_path(int64_t pos, const_bytes sp) { + auto type = uint8_t(sp[0]) >> 4; + trie::node_traverse_sidemost_result result; + result.body_pos = pos; + auto single = [&](uint64_t offset, uint8_t payload_bits) { + result.n_children = 1; + result.payload_bits = payload_bits; + result.child_offset = offset; + return result; + }; + auto sparse = [&] [[gnu::always_inline]] (int type) { + int n_children = int(sp[1]); + auto bpp = bits_per_pointer_arr[type]; + result.n_children = n_children; + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.child_offset = read_offset(sp.subspan(2 + n_children), 0, bpp); + return result; + }; + auto dense = [&] [[gnu::always_inline]] (int type) { + auto dense_span = uint64_t(sp[2]) + 1; + auto bpp = bits_per_pointer_arr[type]; + result.n_children = dense_span; + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.child_offset = read_offset(sp.subspan(3), 0, bpp); + return result; + }; + switch (type) { + case PAYLOAD_ONLY: + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.n_children = 0; + result.child_offset = -1; + return result; + case SINGLE_NOPAYLOAD_4: + return single(uint64_t(sp[0]) & 0xf, 0); + case SINGLE_NOPAYLOAD_12: + return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0); + case SINGLE_8: + return single(uint64_t(sp[2]), uint8_t(sp[0]) & 0xf); + case SINGLE_16: + return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf); + case SPARSE_8: + return sparse(type); + case SPARSE_12: + return sparse(type); + case SPARSE_16: + return sparse(type); + case SPARSE_24: + return sparse(type); + case SPARSE_40: + return sparse(type); + case DENSE_12: + return dense(type); + case DENSE_16: + return dense(type); + case DENSE_24: + return dense(type); + case DENSE_32: + return dense(type); + case DENSE_40: + return dense(type); + case LONG_DENSE: + return dense(type); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +node_traverse_sidemost_result bti_walk_down_rightmost_path(int64_t pos, const_bytes sp) { + auto type = uint8_t(sp[0]) >> 4; + trie::node_traverse_sidemost_result result; + result.body_pos = pos; + auto single = [&](uint64_t offset, uint8_t payload_bits) { + result.n_children = 1; + result.payload_bits = payload_bits; + result.child_offset = offset; + return result; + }; + auto sparse = [&] [[gnu::always_inline]] (int type) { + int n_children = int(sp[1]); + auto bpp = bits_per_pointer_arr[type]; + result.n_children = n_children; + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.child_offset = read_offset(sp.subspan(2 + n_children), n_children - 1, bpp); + return result; + }; + auto dense = [&] [[gnu::always_inline]] (int type) { + auto dense_span = uint64_t(sp[2]) + 1; + auto bpp = bits_per_pointer_arr[type]; + result.n_children = dense_span; + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.child_offset = read_offset(sp.subspan(3), dense_span - 1, bpp); + return result; + }; + switch (type) { + case PAYLOAD_ONLY: + result.payload_bits = uint8_t(sp[0]) & 0xf; + result.n_children = 0; + result.child_offset = -1; + return result; + case SINGLE_NOPAYLOAD_4: + return single(uint64_t(sp[0]) & 0xf, 0); + case SINGLE_NOPAYLOAD_12: + return single((uint64_t(sp[0]) & 0xf) << 8 | uint64_t(sp[1]), 0); + case SINGLE_8: + return single(uint64_t(sp[2]), uint8_t(sp[0]) & 0xf); + case SINGLE_16: + return single(uint64_t(sp[2]) << 8 | uint64_t(sp[3]), uint8_t(sp[0]) & 0xf); + case SPARSE_8: + return sparse(type); + case SPARSE_12: + return sparse(type); + case SPARSE_16: + return sparse(type); + case SPARSE_24: + return sparse(type); + case SPARSE_40: + return sparse(type); + case DENSE_12: + return dense(type); + case DENSE_16: + return dense(type); + case DENSE_24: + return dense(type); + case DENSE_32: + return dense(type); + case DENSE_40: + return dense(type); + case LONG_DENSE: + return dense(type); + } + [[unlikely]] sstables::on_bti_parse_error(pos); +} + +bti_node_reader::bti_node_reader(cached_file& f) + : _file(f) { +} + +bool bti_node_reader::cached(int64_t pos) const { + return _cached_page && _cached_page->pos() / cached_file::page_size == pos / cached_file::page_size; +} + +seastar::future<> bti_node_reader::load(int64_t pos) { + return _file.get().get_shared_page(pos, nullptr).then([this](auto page) { + _cached_page = std::move(page.ptr); + }); +} + +trie::load_final_node_result bti_node_reader::read_node(int64_t pos) { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_read_node(pos, sp); +} + +trie::node_traverse_result bti_node_reader::walk_down_along_key(int64_t pos, const_bytes key) { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_walk_down_along_key(pos, sp, key); +} + +trie::node_traverse_sidemost_result bti_node_reader::walk_down_leftmost_path(int64_t pos) { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_walk_down_leftmost_path(pos, sp); +} + +trie::node_traverse_sidemost_result bti_node_reader::walk_down_rightmost_path(int64_t pos) { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_walk_down_rightmost_path(pos, sp); +} + +trie::get_child_result bti_node_reader::get_child(int64_t pos, int child_idx, bool forward) const { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_get_child(pos, sp, child_idx, forward); +} + +const_bytes bti_node_reader::get_payload(int64_t pos) const { + SCYLLA_ASSERT(cached(pos)); + auto sp = _cached_page->get_view().subspan(pos % cached_file::page_size); + return bti_get_payload(pos, sp); +} + +} // namespace sstables::trie diff --git a/sstables/trie/bti_node_reader.hh b/sstables/trie/bti_node_reader.hh new file mode 100644 index 0000000000..9094db39fa --- /dev/null +++ b/sstables/trie/bti_node_reader.hh @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "utils/cached_file.hh" +#include "node_reader.hh" + +namespace sstables { + [[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos); +} + +namespace sstables::trie { + +// Implementation of concept `node_reader`. +get_child_result bti_get_child(uint64_t pos, const_bytes sp, int child_idx, bool forward); +std::byte bti_get_child_transition(uint64_t pos, const_bytes raw, int idx); +load_final_node_result bti_read_node(int64_t pos, const_bytes sp); +const_bytes bti_get_payload(int64_t pos, const_bytes sp); +node_traverse_result bti_walk_down_along_key(int64_t pos, const_bytes sp, const_bytes key); +node_traverse_sidemost_result bti_walk_down_leftmost_path(int64_t pos, const_bytes sp); +node_traverse_sidemost_result bti_walk_down_rightmost_path(int64_t pos, const_bytes sp); + +// Deals with BTI-specific parts of trie traversal. +// (I.e. the parts which understand the BTI serialization format). +// +// (We talk about "traversal" and not "deserialization" because we don't actually +// want to deserialize the full nodes, we only want to walk over them.) +// +// See comments for concept `node_reader` for a description of the methods. +struct bti_node_reader { + using page_ptr = cached_file::ptr_type; + page_ptr _cached_page; + std::reference_wrapper _file; + + bti_node_reader(cached_file& f); + bool cached(int64_t pos) const; + future<> load(int64_t pos); + trie::load_final_node_result read_node(int64_t pos); + trie::node_traverse_result walk_down_along_key(int64_t pos, const_bytes key); + trie::node_traverse_sidemost_result walk_down_leftmost_path(int64_t pos); + trie::node_traverse_sidemost_result walk_down_rightmost_path(int64_t pos); + trie::get_child_result get_child(int64_t pos, int child_idx, bool forward) const; + const_bytes get_payload(int64_t pos) const; +}; +static_assert(node_reader); + +} // namespace sstables::trie diff --git a/sstables/trie/bti_node_sink.cc b/sstables/trie/bti_node_sink.cc new file mode 100644 index 0000000000..451309b44e --- /dev/null +++ b/sstables/trie/bti_node_sink.cc @@ -0,0 +1,515 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "bti_node_sink.hh" +#include "utils/div_ceil.hh" + +namespace sstables::trie { + +sink_offset max_offset_from_child(const writer_node& x, sink_pos xpos) { + expensive_log("max_offset_from_child: x={} xpos={}", fmt::ptr(&x), xpos.value); + // We iterate over children in reverse order and, for the ones which aren't written yet, + // we compute their expected output position based on the accumulated values of _node_size and _branch_size. + + // Total size of unwritten child branches which we iterated over so far. + // This is the offset between `x` and the next unwritten child branch we are going to see. + auto offset = sink_offset{0}; + // Max offset noticed so far during iteration. + auto result = sink_offset{0}; + for (auto it = x.get_children().rbegin(); it != x.get_children().rend(); ++it) { + if ((*it)->_pos.valid()) { + // This child has already been written out. It's position is set in stone. + expensive_log("max_offset_from_child: offset={} child={} _pos={} _node_size={}, _branch_size={}", + offset.value, fmt::ptr(&**it), (*it)->_pos.value, (*it)->_node_size.value, (*it)->_branch_size.value); + expensive_assert((*it)->_pos < xpos); + result = std::max(result, xpos - (*it)->_pos); + } else { + // Child `*it` hasn't been written out yet. + // We compute its offset from `x` based on the total size of all siblings after it + // and the node size of `*it`. + auto delta = offset; + auto chain_length = (*it)->_transition_length; + if (chain_length > 2) { + delta = delta + sink_offset(2); + } else if (chain_length == 2) { + delta = delta + ((*it)->_node_size.value < 16 ? sink_offset(2) : sink_offset(3)); + } else { + delta = delta + (*it)->_node_size; + } + expensive_log("max_offset_from_child: offset={} child={} _pos={} _node_size={}, _branch_size={} delta={}", + offset.value, fmt::ptr(&**it), (*it)->_pos.value, (*it)->_node_size.value, (*it)->_branch_size.value, delta.value); + result = std::max(result, delta); + } + offset = offset + (*it)->_node_size + (*it)->_branch_size; + } + expensive_log("max_offset_from_child: x={} xpos={} result={}", fmt::ptr(&x), xpos.value, result.value); + return result; +} + +// Chooses the best node type (i.e the one which will have the smallest valid on-disk representation) +// for the given writer_node, assuming that the values of _node_size, _branch_size and _output_pos +// in its children are accurate, +// and that the given node would be written at output position `xpos`. +static node_type choose_node_type_impl(const writer_node& x, sink_pos xpos) { + const auto n_children = x.get_children().size(); + if (n_children == 0) { + // If there is no children, the answer is obvious. + return PAYLOAD_ONLY; + } + auto max_offset = max_offset_from_child(x, xpos); + // For a given offset bitwidth, contains the index of the leanest node type + // in singles[], sparses[] and denses[] which can be used to represent a node with + // this offset bitwidth. + constexpr static uint8_t widths_lookup[] = { + 0, + 0, 0, 0, 0, + 1, 1, 1, 1, + 2, 2, 2, 2, + 3, 3, 3, 3, + 4, 4, 4, 4, + 4, 4, 4, 4, + 5, 5, 5, 5, + 5, 5, 5, 5, + 6, 6, 6, 6, + 6, 6, 6, 6, + 7, 7, 7, 7, + 7, 7, 7, 7, + 7, 7, 7, 7, + 7, 7, 7, 7, + 7, 7, 7, 7, + 7, 7, 7, 7, + }; + auto width_idx = widths_lookup[std::bit_width(max_offset.value)]; + // Nodes with a single close child are very common, so they have dedicated node types. + // For offset widths which don't fit into the dedicated types, + // singles[] returns the smallest valid non-dedicated (multi-child) type. + constexpr node_type singles[] = {SINGLE_NOPAYLOAD_4, SINGLE_8, SINGLE_NOPAYLOAD_12, SINGLE_16, DENSE_24, DENSE_32, DENSE_40, LONG_DENSE}; + if (n_children == 1) { + const auto has_payload = x._payload._payload_bits; + if (has_payload && (width_idx == 0 || width_idx == 2)) { + // SINGLE_NOPAYLOAD_4 and SINGLE_NOPAYLOAD_12 can't hold nodes with a payload. + // Fall back to the next smallest node type which can + // (SINGLE_8 and SINGLE_16 respectively). + return singles[width_idx + 1]; + } + return singles[width_idx]; + } + // For nodes with 2 or more children, we calculate the sizes that would result from choosing + // either the leanest dense node or the leanest sparse node, and we pick the one which + // results in smaller size. + constexpr node_type sparses[] = {SPARSE_8, SPARSE_8, SPARSE_12, SPARSE_16, SPARSE_24, SPARSE_40, SPARSE_40, LONG_DENSE}; + constexpr node_type denses[] = {DENSE_12, DENSE_12, DENSE_12, DENSE_16, DENSE_24, DENSE_32, DENSE_40, LONG_DENSE}; + const size_t dense_span = 1 + size_t(x.get_children().back()->transition()) - size_t(x.get_children().front()->transition()); + auto sparse_size = 16 + div_ceil((bits_per_pointer_arr[sparses[width_idx]] + 8) * n_children, 8); + auto dense_size = 24 + div_ceil(bits_per_pointer_arr[denses[width_idx]] * dense_span, 8); + // If the sizes are equal, pick the dense type, because dense nodes are easier for the reader. + if (sparse_size < dense_size) { + return sparses[width_idx]; + } else { + return denses[width_idx]; + } +} + +// This wrapper exists just to log the return value. +// (Logging the return value of a function with multiple `return` points +// is awkward). +static node_type choose_node_type(const writer_node& x, sink_pos xpos) { + auto res = choose_node_type_impl(x, xpos); + expensive_log("choose_node_type: x={} res={}", fmt::ptr(&x), int(res)); + return res; +} + +bti_node_sink::bti_node_sink(sstables::file_writer& w, size_t page_size) : _w(w), _page_size(page_size) { + expensive_assert(_page_size <= max_page_size); +} + +void bti_node_sink::write_int(uint64_t x, size_t bytes) { + uint64_t be = cpu_to_be(x); + expensive_log("write_int: {}", fmt_hex({reinterpret_cast(&be) + sizeof(be) - bytes, bytes})); + _w.write(reinterpret_cast(&be) + sizeof(be) - bytes, bytes); +} + +void bti_node_sink::write_bytes(const_bytes x) { + expensive_log("write_bytes: {}", fmt_hex({reinterpret_cast(x.data()), x.size()})); + _w.write(reinterpret_cast(x.data()), x.size()); +} + +[[gnu::always_inline]] +size_t bti_node_sink::write_sparse(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos xpos) { + write_int((type << 4) | x._payload._payload_bits, 1); + write_int(x.get_children().size(), 1); + for (const auto& c : x.get_children()) { + write_int(uint8_t(c->transition()), 1); + } + for (const auto& c : x.get_children()) { + uint64_t offset = (xpos - c->_pos).value; + write_int(offset, bytes_per_pointer); + } + write_bytes(x._payload.blob()); + return 2 + x.get_children().size() * (1+bytes_per_pointer) + x._payload.blob().size(); +} + +[[gnu::always_inline]] +node_size bti_node_sink::size_sparse(const writer_node& x, int bits_per_pointer) const { + return node_size(2 + div_ceil(x.get_children().size() * (8+bits_per_pointer), 8) + x._payload.blob().size()); +} + +[[gnu::always_inline]] +size_t bti_node_sink::write_dense(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos xpos) { + int start = int(x.get_children().front()->transition()); + auto dense_span = 1 + int(x.get_children().back()->transition()) - int(x.get_children().front()->transition()); + write_int((type << 4) | x._payload._payload_bits, 1); + write_int(start, 1); + write_int(dense_span - 1, 1); + auto it = x.get_children().begin(); + auto end_it = x.get_children().end(); + for (int next = start; next < start + dense_span; ++next) { + uint64_t offset = 0; + if (it != end_it && int((*it)->transition()) == next) { + offset = (xpos - (*it)->_pos).value; + ++it; + } + write_int(offset, bytes_per_pointer); + } + write_bytes(x._payload.blob()); + return 3 + dense_span * (bytes_per_pointer) + x._payload.blob().size(); +} + +[[gnu::always_inline]] +node_size bti_node_sink::size_dense(const writer_node& x, int bits_per_pointer) const { + int first = int(x.get_children().front()->transition()); + int last = int(x.get_children().back()->transition()); + return node_size(3 + div_ceil(bits_per_pointer * (1 + last - first), 8) + x._payload.blob().size()); +} + +void bti_node_sink::write_body(const writer_node& x, sink_pos xpos, node_type type) { + switch (type) { + case PAYLOAD_ONLY: { + write_int(type << 4 | x._payload._payload_bits, 1); + write_bytes(x._payload.blob()); + return; + } + case SINGLE_NOPAYLOAD_4: { + uint64_t offset = (xpos - x.get_children().front()->_pos).value; + uint8_t transition = uint8_t(x.get_children().front()->transition()); + uint8_t arr[2]; + arr[0] = (type << 4) | offset; + arr[1] = transition; + write_bytes({reinterpret_cast(arr), 2}); + return; + } + case SINGLE_8: { + uint64_t offset = (xpos - x.get_children().front()->_pos).value; + uint8_t transition = uint8_t(x.get_children().front()->transition()); + uint8_t arr[3 + sizeof(trie_payload::_payload_buf)]; + arr[0] = (type << 4) | x._payload._payload_bits; + arr[1] = transition; + arr[2] = offset; + auto sz = x._payload.blob().size(); + memcpy(&arr[3], x._payload.blob().data(), sz); + write_bytes({reinterpret_cast(arr), 3 + sz}); + return; + } + case SINGLE_NOPAYLOAD_12: { + uint64_t offset = (xpos - x.get_children().front()->_pos).value; + uint8_t transition = uint8_t(x.get_children().front()->transition()); + write_int((type << 4) | (offset >> 8), 1); + write_int(offset & 0xff, 1); + write_int(transition, 1); + return; + } + case SINGLE_16: { + uint64_t offset = (xpos - x.get_children().front()->_pos).value; + uint8_t transition = uint8_t(x.get_children().front()->transition()); + write_int((type << 4) | x._payload._payload_bits, 1); + write_int(transition, 1); + write_int(offset, 2); + write_bytes(x._payload.blob()); + return; + } + case SPARSE_8: { + write_sparse(x, type, 1, xpos); + return; + } + case SPARSE_12: { + write_int((type << 4) | x._payload._payload_bits, 1); + write_int(x.get_children().size(), 1); + for (const auto& c : x.get_children()) { + write_int(uint8_t(c->transition()), 1); + } + size_t i; + for (i = 0; i + 1 < x.get_children().size(); i += 2) { + uint64_t offset1 = (xpos - x.get_children()[i]->_pos).value; + uint64_t offset2 = (xpos - x.get_children()[i+1]->_pos).value; + write_int(offset1 << 12 | offset2, 3); + } + if (i < x.get_children().size()) { + uint64_t offset = (xpos - x.get_children()[i]->_pos).value; + write_int(offset << 4, 2); + } + write_bytes(x._payload.blob()); + return; + } + case SPARSE_16: { + write_sparse(x, type, 2, xpos); + return; + } + case SPARSE_24: { + write_sparse(x, type, 3, xpos); + return; + } + case SPARSE_40: { + write_sparse(x, type, 5, xpos); + return; + } + case DENSE_12: { + int start = int(x.get_children().front()->transition()); + auto dense_span = 1 + int(x.get_children().back()->transition()) - int(x.get_children().front()->transition()); + write_int((type << 4) | x._payload._payload_bits, 1); + write_int(start, 1); + write_int(dense_span - 1, 1); + auto it = x.get_children().begin(); + auto end_it = x.get_children().end(); + int next = start; + for (; next + 1 < start + dense_span; next += 2) { + uint64_t offset_1 = 0; + uint64_t offset_2 = 0; + if (it != end_it && int((*it)->transition()) == next) { + offset_1 = (xpos - (*it)->_pos).value; + ++it; + } + if (it != end_it && int((*it)->transition()) == next + 1) { + offset_2 = (xpos - (*it)->_pos).value; + ++it; + } + write_int(offset_1 << 12 | offset_2, 3); + } + if (next < start + dense_span) { + uint64_t offset = 0; + if (it != end_it && int((*it)->transition()) == next) { + offset = (xpos - (*it)->_pos).value; + ++it; + } + write_int(offset << 4, 2); + } + write_bytes(x._payload.blob()); + return; + } + case DENSE_16: { + write_dense(x, type, 2, xpos); + return; + } + case DENSE_24: { + write_dense(x, type, 3, xpos); + return; + } + case DENSE_32: { + write_dense(x, type, 4, xpos); + return; + } + case DENSE_40: { + write_dense(x, type, 5, xpos); + return; + } + case LONG_DENSE: { + write_dense(x, type, 8, xpos); + return; + } + case NODE_TYPE_COUNT: { + abort(); + } + } + abort(); +} + +sink_pos bti_node_sink::write_chain(const writer_node& x, node_size body_offset) { + int i = x._transition_length; + expensive_assert(i >= 2); + + const std::byte* __restrict__ transition = x._transition.get(); + sink_pos c1_pos = pos(); + + // Second-to-last node in the chain (i.e. last node before the body) can have size 2 or 3 bytes, + // depending on how big the last node (body) is. + uint64_t offset = body_offset.value; + if (offset >= 16) { + expensive_assert(offset < 4096); + write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_12 << 20), 3); + } else { + write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_4 << 12), 2); + } + + i -= 1; + if (i == 1) { + return c1_pos; + } + + // Third-to-last node in the chain has always size 2, but the offset can be equal to 3 or 2, depending + // on how big the second-to-last node was. + offset = (pos() - c1_pos).value; + write_int(uint64_t(transition[i - 1]) | offset << 8 | uint64_t(SINGLE_NOPAYLOAD_4 << 12), 2); + + i -= 1; + + // Fourth-to-last and earlier nodes in the chain always have the form 0x12??, where ?? is the transition byte. + // This is SIMDable -- we can load a vector of transition bytes from memory, + // reverse the order (earlier bytes in the transition chain become later nodes in the file) + // and add a 0x12 byte before every transition byte. + constexpr int s = 16; // Vector size. + typedef unsigned char vector2x __attribute__((__vector_size__(s*2))); // Output vector. + typedef unsigned char vector1x __attribute__((__vector_size__(s))); // Input vector. + auto z = uint8_t(SINGLE_NOPAYLOAD_4 << 4 | 2); // 0x12 + vector1x zv = {z}; + + // An extra buffer above the file_writer. It allows us to do constexpr-sized + // memcpy, but incurs an extra copy overall. Is it worth it? + std::array outbuf; + static_assert(std::size(outbuf) % s == 0); + size_t outbuf_pos = 0; + + // Serialize the chain in SIMD blocks of `s` transition bytes. + for (; i - s > 0; i -= s) { + vector1x v; + memcpy(&v, &transition[i - s], s); + vector2x d = __builtin_shufflevector(v, zv, + s, 15, s, 14, s, 13, s, 12, s, 11, s, 10, s, 9, s, 8, + s, 7, s, 6, s, 5, s, 4, s, 3, s, 2, s, 1, s, 0 + ); + memcpy(&outbuf[outbuf_pos], &d, sizeof(d)); + outbuf_pos += sizeof(d); + if (outbuf_pos == 1024) [[unlikely]] { + write_bytes(outbuf); + outbuf_pos = 0; + } + } + + // Write the remaining `i - 1` first bytes (excluding idx 0) of the transition chain. + // This is separated from the previous loop so that the previous loop enjoys the benefit + // of constexpr-sized memory ops. + { + vector1x v; + // Load into the `i - 1` last bytes of `v`. + memcpy(reinterpret_cast(&v) + sizeof(v) - (i - 1), &transition[1], (i - 1)); + vector2x d = __builtin_shufflevector(v, zv, + s, 15, s, 14, s, 13, s, 12, s, 11, s, 10, s, 9, s, 8, + s, 7, s, 6, s, 5, s, 4, s, 3, s, 2, s, 1, s, 0 + ); + // Write out the first `2*(i-1)` first bytes of `v`. + memcpy(&outbuf[outbuf_pos], &d, 2 * (i - 1)); + outbuf_pos += 2 * (i - 1); + write_bytes(std::span(outbuf).subspan(0, outbuf_pos)); + } + + return pos() - sink_offset(2); +} + +sink_pos bti_node_sink::write(const writer_node& x, sink_pos xpos) { + expensive_assert(x._transition_length >= 1); + + // Write last node in the chain. + sink_pos start_pos = pos(); + auto type = choose_node_type(x, xpos); + write_body(x, xpos, type); + + if (x._transition_length == 1) { + return start_pos; + } + + return write_chain(x, node_size((pos() - start_pos).value)); +} + +node_size bti_node_sink::serialized_size(const writer_node& x, sink_pos xpos) const { + expensive_assert(x._transition_length >= 1); + auto inner = serialized_size_body(x, xpos); + auto outer = serialized_size_chain(x, inner); + return node_size((sink_offset(inner) + outer).value); +} + +node_size bti_node_sink::serialized_size_chain(const writer_node& x, node_size body_offset) const { + return node_size(x._transition_length == 1 ? 0 : (body_offset.value >= 16 ? 1 : 0) + (x._transition_length - 1) * 2); +} + +node_size bti_node_sink::serialized_size_body_type(const writer_node& x, node_type type) const { + switch (type) { + case PAYLOAD_ONLY: { + return node_size(1 + x._payload.blob().size()); + } + case SINGLE_NOPAYLOAD_4: { + return node_size(2); + } + case SINGLE_8: { + return node_size(3 + x._payload.blob().size()); + } + case SINGLE_NOPAYLOAD_12: { + return node_size(3); + } + case SINGLE_16: { + return node_size(4 + x._payload.blob().size()); + } + case SPARSE_8: { + return size_sparse(x, 8); + } + case SPARSE_12: { + return size_sparse(x, 12); + } + case SPARSE_16: { + return size_sparse(x, 16); + } + case SPARSE_24: { + return size_sparse(x, 24); + } + case SPARSE_40: { + return size_sparse(x, 40); + } + case DENSE_12: { + return size_dense(x, 12); + } + case DENSE_16: { + return size_dense(x, 16); + } + case DENSE_24: { + return size_dense(x, 24); + } + case DENSE_32: { + return size_dense(x, 32); + } + case DENSE_40: { + return size_dense(x, 40); + } + case LONG_DENSE: { + return size_dense(x, 64); + } + case NODE_TYPE_COUNT: { + abort(); + } + } + abort(); +} + +node_size bti_node_sink::serialized_size_body(const writer_node& x, sink_pos xpos) const { + return serialized_size_body_type(x, choose_node_type(x, xpos)); +} + +uint64_t bti_node_sink::page_size() const { + return _page_size; +} + +uint64_t bti_node_sink::bytes_left_in_page() const { + return round_up(pos().value + 1, page_size()) - pos().value; +}; + +void bti_node_sink::pad_to_page_boundary() { + const static std::array zero_page = {}; + _w.write(reinterpret_cast(zero_page.data()), bytes_left_in_page()); +} + +sink_pos bti_node_sink::pos() const { + return sink_pos(_w.offset()); +} + +} // namespace sstables::trie diff --git a/sstables/trie/bti_node_sink.hh b/sstables/trie/bti_node_sink.hh new file mode 100644 index 0000000000..037f3bc77d --- /dev/null +++ b/sstables/trie/bti_node_sink.hh @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "writer_node.hh" +#include "bti_node_type.hh" +#include "sstables/file_writer.hh" + +namespace sstables::trie { + +// Assuming the values of _node_size, _branch_size and _id in children are accurate, +// computes the distance between `x` and its farthest child. +// +// This is needed to pick the right BTI node type for `x`. +// (The node type must represent child offsets with enough bits to fit the biggest offset). +sink_offset max_offset_from_child(const writer_node& x, sink_pos pos); + +// This object a stream of writer_node nodes (abstract nodes in RAM) +// into a stream of bytes on disk as defined by the BTI format at: +// https://github.com/apache/cassandra/blob/f16fb6765b8a3ff8f49accf61c908791520c0d6e/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.md +// +class bti_node_sink { + sstables::file_writer& _w; + size_t _page_size; + constexpr static size_t max_page_size = 64 * 1024; +public: + bti_node_sink(sstables::file_writer& w, size_t page_size); +private: + // Truncates `x` to the given number of least significant bytes, + // and writes out the result in big endian. + // E.g. write_int(0x1122334455667788ull, 3) writes 0x66 0x77 0x88 to the file. + void write_int(uint64_t x, size_t bytes); + // Writes raw bytes to the output file. + void write_bytes(const_bytes x); + // Writes a sparse-type node (SPARSE_*) to the file. + size_t write_sparse(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos pos); + // Computes the size of a sparse-type node (SPARSE_*). + node_size size_sparse(const writer_node& x, int bits_per_pointer) const; + // Writes a dense-type node (SPARSE_*) to the file. + size_t write_dense(const writer_node& x, node_type type, int bytes_per_pointer, sink_pos pos); + // Computes the size of a dense-type node (DENSE_*). + node_size size_dense(const writer_node& x, int bits_per_pointer) const; +public: + // Writes the final BTI node of the chain corresponding to the writer_node, + // with the given BTI type. + // Offsets to children are computed by taking the difference between `pos` + // and the `_pos` of children. + // + // Note: + // writer_node represents a trie node and the multi-character edge leading to it. + // But BTI doesn't have a compact representation for a multi-character edge. + // Each character in the trie gets its own BTI node, + // and the character corresponding to this node's parent edge is stored in the parent node. + // + // For example, given a `writer_node` X, corresponding to edge "abc", + // `write_body` would write the BTI node containing offsets + // to X's children (and the first byte of their edges), + // while `write_chain` would write the tiny BTI nodes + // holding characters 'b' and 'c'. + // (Character 'a' is held by the parent of X). + // + // Note: + // For the purposes of serialization, `pos` is an abstract ID + // used only to compute the delta between this node's ID and children's ID. + // In real usage, this "abstract" node ID is of course the node's position + // in the file, so that the reader can trivially use the "ID" + // to jump directly to the node's position. + // So the `pos` passed here is always equal to `current_pos()`. + // + // But in tests, the `pos` really is abstract, and the user + // of bti_node_sink can fill the `_pos` of children and the `pos` passed + // here via an argument with whatever it wants, something possibly + // unrelated to `current_pos()`. + void write_body(const writer_node& x, sink_pos pos, node_type type); + // Writes all nodes, except the final node, of the chain corresponding to the writer_node. + // (See write_body for what "chain" means). + sink_pos write_chain(const writer_node& x, node_size body_offset); + // Writes both the body and the chain. + sink_pos write(const writer_node& x, sink_pos pos); + // Computes the size of the given node after serialization, in bytes, + // if it's serialized with the given BTI type. + node_size serialized_size_body_type(const writer_node& x, node_type type) const; + // Computes the size of the body (see write_body notes about "body" vs "chain") + // of the given node, in bytes, assuming it's serialized with the optimal + // BTI type for the given "position" of the body (see write_body notes about `pos`). + node_size serialized_size_body(const writer_node& x, sink_pos pos) const; + // Computes the size of the chain (see write_body notes about "body" vs "chain") + // of the given node, in bytes, assuming it's serialized with the optimal + // BTI types for the given offset between chain and body. + // (Note: the chain is always written immediately after the body, so the offset between + // body and chain is always equal to body size). + node_size serialized_size_chain(const writer_node& x, node_size body_offset) const; + // Computes the total size (body and chain, see write_body notes about "body" vs "chain") + // of the given node, in bytes, assuming it's serialized with the optimal + // BTI type for the given "position" of the body (see write_body notes about `pos`). + node_size serialized_size(const writer_node& x, sink_pos pos) const; + // Page size, same as the one passed via the constructor. + uint64_t page_size() const; + // Returns the number of bytes between current file position + // and the start of the next page. + // In range [0; page_size). + uint64_t bytes_left_in_page() const; + // Appends bytes_left_in_page() nul bytes to the file. + void pad_to_page_boundary(); + // Returns the position reported by the underlying `file_writer`. + sink_pos pos() const; +}; +static_assert(trie_writer_sink); + +} // namespace sstables::trie diff --git a/sstables/trie/bti_node_type.hh b/sstables/trie/bti_node_type.hh new file mode 100644 index 0000000000..5f0d422807 --- /dev/null +++ b/sstables/trie/bti_node_type.hh @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include +#include +#include +#include + +using const_bytes = std::span; + +namespace sstables::trie { + +// Each node type has a 4-bit identifier which is used in the on-disk format. +// The order of this enum mustn't be modified. +enum node_type { + PAYLOAD_ONLY, + SINGLE_NOPAYLOAD_4, + SINGLE_8, + SINGLE_NOPAYLOAD_12, + SINGLE_16, + SPARSE_8, + SPARSE_12, + SPARSE_16, + SPARSE_24, + SPARSE_40, + DENSE_12, + DENSE_16, + DENSE_24, + DENSE_32, + DENSE_40, + LONG_DENSE, + NODE_TYPE_COUNT, // Not a valid value. +}; + +// For each node type, contains the number of bits used per child offset. +constexpr static const uint8_t bits_per_pointer_arr[] = { + 0, + 4, + 8, + 12, + 16, + 8, + 12, + 16, + 24, + 40, + 12, + 16, + 24, + 32, + 40, + 64, +}; +static_assert(std::size(bits_per_pointer_arr) == NODE_TYPE_COUNT); + +// Given an array of offsets, each of size bits_per_pointer, read the one with index `idx`. +// +// Assumes that bits_per_pointer is divisible by 8 or equal to 12. +// +// Ordering note: an array of 12-bit offsets [0x123, 0x456, 0x789, 0xabc] is +// represented as the byte array 123456789abc. +// +// We want this to be always inlined so that bits_per_pointer is substituted with a constant, +// and this compiles to a simple load, not to a full-fledged memcpy. +[[gnu::always_inline]] +inline uint64_t read_offset(const_bytes sp, int idx, int bits_per_pointer) { + if (bits_per_pointer % 8 == 0) { + auto n = bits_per_pointer / 8; + uint64_t be = 0; + memcpy((char*)&be + 8 - n, (const char*)sp.data() + n * idx, n); + return seastar::be_to_cpu(be); + } else { + if (idx % 2 == 0) { + return seastar::read_be((const char*)sp.data() + 3 * (idx / 2)) >> 4; + } else { + return seastar::read_be((const char*)sp.data() + 3 * (idx / 2) + 1) & 0xfff; + } + } +} + +} // namespace sstables::trie diff --git a/sstables/trie/node_reader.hh b/sstables/trie/node_reader.hh new file mode 100644 index 0000000000..ce0732cdfb --- /dev/null +++ b/sstables/trie/node_reader.hh @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include + +// This file defines an interface between the format-agnostic part of a trie +// reader (the cursor which can be set to a specific key and then +// stepped forwards and backwards) and the format-specific part +// (which actually understand the bytes and bits of the nodes on disk). + +namespace sstables::trie { + +using const_bytes = std::span; + +// Contains information about the result of the (partial) walk down. +// +// For example, if the trie contains strings ("abc", "abd", "bc"), +// then the trie looks like this (where * is the root node): +// +// * +// a-----b +// b c +// c-d +// +// and if node "abd" is on a different page than node "ab", then +// `walk_down_along_key(pos_of_root, "abc")` might +// return node "ab" as the result, with +// +// payload_bits = 0 +// n_children = 2 +// found_idx = idx_of_child_d_in_node_ab +// found_byte = 'd' +// traversed_key_bytes = 2 +// body_pos = pos_of_node_ab +// child_offset = pos_of_node_ab - pos_of_node_abc +struct node_traverse_result { + // Payload bits for this node. + uint8_t payload_bits; + // Number of child indexes (slots) for this node. + // ("Slots" because not all indexes have to be occupied). + int n_children; + // Index of first child whose transition is greater or equal to the next key byte. + // (Or, if such children don't exist, this is equal to n_children). + int found_idx; + // The transition byte of the child at found_idx. + // (Or, if there is no such child, this is equal to -1). + int found_byte; + // The number of edges traversed while walking down. + // I.e. the number of edges between this node and the ancestor + // at which the walk started. + // + // E.g. if we started at node X, and the + // at which the walk started. + int traversed_key_bytes; + // File position of this node. + int64_t body_pos; + // Position of this node minus position of the child at found_idx. + // Only valid if found_idx is valid in range [0, n_children). + int64_t child_offset; +}; + +// Fields have same meaning as in node_traverse_result. +struct node_traverse_sidemost_result { + uint8_t payload_bits; + int n_children; + int64_t body_pos; + int64_t child_offset; +}; + +// Contains information about the result of the get_child call. +struct get_child_result { + // The index of the child selected by the call. + int idx; + // The offset of the selected child from the parent. + // (A children always has position smaller than its parent). + uint64_t offset; +}; + +// Fields have same meaning as in node_traverse_result. +struct load_final_node_result { + uint16_t n_children; + uint8_t payload_bits; +}; + +// This concept contains all operations which abstract trie nodes +// needs to provide in order so that a trie cursor can be implemented +// over them. +template +concept node_reader = requires(T& o, int64_t pos, const_bytes key, int child_idx, bool forwards) { + // Checks if the page containing the given file position has been loaded. + // + // Note: forcing the caller to explicitly call `load` before use + // is error-prone, but it lets us keep the performance-sensitive methods + // synchronous. + { o.cached(pos) } -> std::same_as; + // Loads the page containing the given file position, and potentially + // unloads previously loaded pages. + // + // Ensures `cached(pos)` until the next `load(pos)` call. + // + // Precondition: pos lies within the file. + { o.load(pos) } -> std::same_as>; + // Reads some basic information (payload and number of children) + // about the node. + // + // Precondition: cached(pos) + { o.read_node(pos) } -> std::same_as; + // Walks some distance down the trie + // from node at `pos` along `key`. + // Might walk over any number of *unimportant* nodes, + // but does not walk past the first *important* node. + // + // An *important* node is one which fullfills at least one of the following conditions: + // 1. Has a payload. + // 2. Has more than one child. + // + // (A trie iterator has to remember the important nodes on the path from the root + // to the current position, because stepping forwards or backwards might involve + // going up to the closest important node. Unimportant nodes don't have to be revisited). + // + // Note: we allow this method to traverse many *unimportant* nodes + // at once for optimization reasons. As of this writing, the method always + // walks over one node, but we might want to add an optimization which + // walks over long chains of SINGLE_NOPAYLOAD_4 nodes in batches. + // + // Precondition: cached(pos), key.size() > 0 + { o.walk_down_along_key(pos, key) } -> std::same_as; + // Like walk_down_along_key, but always chooses the leftmost child instead + // of following a key. + // + // Precondition: cached(pos) + { o.walk_down_leftmost_path(pos) } -> std::same_as; + // Like walk_down_along_key, but always chooses the rightmost child instead + // of following a key. + // + // Precondition: cached(pos) + { o.walk_down_rightmost_path(pos) } -> std::same_as; + // Returns some information about a child of node at `pos`. + // + // The child is specified by an index, but not all child indexes must be occupied by a child. + // If an unoccupied index is passed, the method will return information for the nearest + // occupied child index which is greater (if `forwards`) or smaller (if `!forwards`) + // + // (This is used for stepping forwards/backwards over the trie). + // + // Precondition: cached(pos), child_idx lies between the smallest and the largest + // occupied child indexes (inclusive) for this node. + { o.get_child(pos, child_idx, forwards) } -> std::same_as; + // Returns the payload for the payload-carrying node at `pos`. + // + // The child is specified by an index, but not all child indexes must be occupied by a child. + // If an unoccupied index is passed, the method will return information for the nearest + // occupied child index which is greater (if `forwards`) or smaller (if `!forwards`) + // + // (This is used for stepping forwards/backwards over the trie). + // + // Precondition: cached(pos), the node carries a payload + { o.get_payload(pos) } -> std::same_as; +}; + +} // namespace sstables::trie diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 681fbb1bf3..38ab2043a7 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -266,6 +266,8 @@ add_scylla_test(top_k_test KIND BOOST) add_scylla_test(transport_test KIND SEASTAR) +add_scylla_test(bti_node_sink_test + KIND BOOST) add_scylla_test(trie_writer_test KIND BOOST) add_scylla_test(types_test diff --git a/test/boost/bti_node_sink_test.cc b/test/boost/bti_node_sink_test.cc new file mode 100644 index 0000000000..2d12f26e6c --- /dev/null +++ b/test/boost/bti_node_sink_test.cc @@ -0,0 +1,605 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include + +#include +#include +#include "test/lib/log.hh" +#include "test/lib/random_utils.hh" +#include "utils/memory_data_sink.hh" +#include "sstables/trie/bti_node_reader.hh" +#include "sstables/trie/bti_node_sink.hh" +// For instantiation of `writer_node::recalc_sizes`. +#include "sstables/trie/writer_node.impl.hh" // IWYU pragma: keep + +// Calling BOOST_REQUIRE unconditionally is stupidly expensive. +// Checking the condition first, and only calling the BOOST assertions +// if the condition is false, makes the test orders of magnitude faster. +#define REQUIRE(a) do if (!(a)) BOOST_REQUIRE(a); while (0) +#define REQUIRE_EQUAL(a, b) do if (!((a) == (b))) BOOST_REQUIRE_EQUAL(a, b); while (0) +#define REQUIRE_GE(a, b) do if (!((a) >= (b))) BOOST_REQUIRE_GE(a, b); while (0) +#define REQUIRE_LE(a, b) do if (!((a) <= (b))) BOOST_REQUIRE_LE(a, b); while (0) + +using namespace sstables::trie; + +inline const_bytes string_as_bytes(std::string_view sv) { + return std::as_bytes(std::span(sv.data(), sv.size())); +} + +inline std::string_view bytes_as_string(const_bytes sv) { + return {reinterpret_cast(sv.data()), sv.size()}; +} + +static std::vector linearize(const memory_data_sink_buffers& bufs) { + std::vector retval; + for (const auto& frag : bufs.buffers()) { + auto v = std::as_bytes(std::span(frag)); + retval.insert(retval.end(), v.begin(), v.end()); + } + return retval; +} + +std::vector unpack_bitstring(const_bytes packed) { + std::vector unpacked; + for (const auto byte : packed) { + for (int i = 7; i >= 0; --i) { + unpacked.push_back((uint8_t(byte) >> i) & 1); + } + } + return unpacked; +} + +// Test read_offset() on a random blob, by unpacking the bits +// of the blob and the bits of read_offset() and checking that +// the relevant bitstrings are equal. +SEASTAR_THREAD_TEST_CASE(test_read_offset) { + auto test_blob_buf = tests::random::get_bytes(256); + auto test_string = std::as_bytes(std::span(test_blob_buf)); + auto test_bitstring = unpack_bitstring(test_string); + + for (const int width : {8, 12, 16, 24, 32, 48, 56, 64}) { + for (int idx = 0; idx * width + width <= int(test_string.size()); ++idx) { + uint64_t result = read_offset(test_string, idx, width); + auto read_blob = unpack_bitstring(object_representation(seastar::cpu_to_be(result))); + auto actual_bitstring = std::span(read_blob).subspan(64 - width, width); + auto expected_bitstring = std::span(test_bitstring).subspan(idx * width, width); + REQUIRE(std::ranges::equal(actual_bitstring, expected_bitstring)); + } + } +} + +std::vector serialize_body(const writer_node& node, sink_pos pos, node_type type) { + memory_data_sink_buffers bufs; + constexpr size_t page_size = 4096; + sstables::file_writer fw(data_sink(std::make_unique(bufs))); + + bti_node_sink serializer(fw, page_size); + auto sz = serializer.serialized_size_body_type(node, type); + serializer.write_body(node, pos, type); + fw.close(); + REQUIRE(fw.offset() <= page_size); + REQUIRE(fw.offset() == uint64_t(sz.value)); + + return linearize(bufs); +} + +struct serialize_chain_result { + std::vector serialized; + size_t starting_point; +}; +serialize_chain_result serialize_chain(const writer_node& node, node_size body_offset) { + memory_data_sink_buffers bufs; + constexpr size_t page_size = 4096; + sstables::file_writer fw(data_sink(std::make_unique(bufs))); + + bti_node_sink serializer(fw, page_size); + auto sz = serializer.serialized_size_chain(node, body_offset); + auto starting_point = serializer.write_chain(node, body_offset); + fw.close(); + REQUIRE(fw.offset() <= page_size); + REQUIRE(fw.offset() == uint64_t(sz.value)); + + return {linearize(bufs), starting_point.value}; +} + +struct payload_result { + uint8_t bits; + const_bytes bytes; +}; + +struct deserialize_node_result { + std::vector offsets; + std::vector transitions; + payload_result payload; +}; + +struct lookup_result { + int idx; + int byte; + uint64_t offset; +}; + +inline lookup_result get_child(int64_t pos, const_bytes raw, int idx, bool forward) { + auto n_children = bti_read_node(pos, raw).n_children; + auto found_child = bti_get_child(pos, raw, idx, forward); + lookup_result result; + result.idx = found_child.idx; + result.offset = found_child.offset; + if (0 <= result.idx && result.idx < n_children) { + result.byte = int(bti_get_child_transition(pos, raw, result.idx)); + } else { + result.byte = -1; + } + return result; +} + +inline payload_result get_payload(int64_t pos, const_bytes raw) { + auto bits = bti_read_node(pos, raw).payload_bits; + auto bytes = bti_get_payload(pos, raw); + return {bits, bytes}; +} + +inline int get_n_children(int64_t pos, const_bytes raw) { + return bti_read_node(pos, raw).n_children; +} + +deserialize_node_result deserialize_body(int64_t pos, const_bytes raw) { + deserialize_node_result result; + auto n_children = get_n_children(pos, raw); + for (int i = 0; i < n_children; ++i) { + auto child = get_child(pos, raw, i, true); + i = child.idx; + result.offsets.push_back(child.offset); + REQUIRE_GE(child.byte, 0); + result.transitions.push_back(std::byte(child.byte)); + } + result.payload = get_payload(pos, raw); + return result; +} + +struct deserialize_chain_result { + std::vector transition; + node_size body_offset; +}; + +deserialize_chain_result deserialize_chain(const_bytes raw, size_t start_point) { + REQUIRE(start_point < raw.size()); + deserialize_chain_result result; + while (true) { + auto n_children = get_n_children(start_point, raw.subspan(start_point)); + REQUIRE(n_children == 1); + auto child = get_child(start_point, raw.subspan(start_point), 0, true); + REQUIRE(child.idx == 0); + REQUIRE(child.offset > 0); + REQUIRE(child.offset < 4096); + REQUIRE_GE(child.byte, 0); + result.transition.push_back(std::byte(child.byte)); + if (child.offset > start_point) { + REQUIRE(start_point == 0); + result.body_offset = node_size(child.offset); + return result; + } + start_point -= child.offset; + } +} + +bool eligible(node_type type, const writer_node& node, sink_pos pos) { + auto max_offset = max_offset_from_child(node, pos); + REQUIRE(max_offset.valid()); + auto width = std::bit_width(max_offset.value); + switch (type) { + case PAYLOAD_ONLY: + return node.get_children().size() == 0; + case SINGLE_NOPAYLOAD_4: + return node.get_children().size() == 1 && width <= 4 && node._payload._payload_bits == 0; + case SINGLE_8: + return node.get_children().size() == 1 && width <= 8; + case SINGLE_NOPAYLOAD_12: + return node.get_children().size() == 1 && width <= 12 && node._payload._payload_bits == 0; + case SINGLE_16: + return node.get_children().size() == 1 && width <= 16; + case SPARSE_8: + return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 8; + case SPARSE_12: + return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 12; + case SPARSE_16: + return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 16; + case SPARSE_24: + return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 24; + case SPARSE_40: + return node.get_children().size() >= 1 && node.get_children().size() < 256 && width <= 40; + case DENSE_12: + return node.get_children().size() >= 1 && width <= 12; + case DENSE_16: + return node.get_children().size() >= 1 && width <= 16; + case DENSE_24: + return node.get_children().size() >= 1 && width <= 24; + case DENSE_32: + return node.get_children().size() >= 1 && width <= 32; + case DENSE_40: + return node.get_children().size() >= 1 && width <= 40; + case LONG_DENSE: + return node.get_children().size() >= 1; + default: abort(); + } +} + +// Generates multiple interesting sets of child edges for a trie node, +// (which try to stress various conditionals in the encoding). +std::vector> get_some_interesting_transition_sets() { + std::vector> result; + // 0 children. Important edge case. + result.push_back({}); + // 1 child, at both extremes and in the middle. (Extremes are useful to ensure that there is no weird wrapping). + result.push_back({0x00}); + result.push_back({0x7f}); + result.push_back({0xff}); + // 2 children, at both extremes, and also with a gap between them (to test unused child slots in DENSE nodes). + result.push_back({0x00, 0x01}); + result.push_back({0xfe, 0xff}); + result.push_back({0x00, 0xff}); + // 256 children, with all possible transition bytes. + // Edge case. + auto full = std::ranges::iota_view(0x00, 0x100) | std::ranges::to>(); + result.push_back(full); + // 255 children, with all possible transition bytes except one. + auto almost_full = full; + almost_full.erase(almost_full.begin() + 100); + result.push_back(almost_full); + return result; +} + +// For the given number of children and the given max supported integer width, +// generates a few sets of interesting child offsets. +// (Where "interesting" in this case means that they are just close to extremes of the supported integer +// range. This checks that no bits are lost). +std::vector> get_some_interesting_child_offsets(int width, int n_children) { + std::vector> result; + int64_t max = (int64_t(1) << width) - 1; + auto clamp_to_legal = [=] (int64_t v) { + return std::clamp(v, 1, max); + }; + auto clamped_iota = [=] (int64_t a, int64_t b) { + return std::ranges::iota_view(a, b) | std::views::transform(clamp_to_legal) | std::ranges::to(); + }; + result.emplace_back(clamped_iota(1, n_children + 1)); + result.emplace_back(clamped_iota(max - n_children + 1, max + 1)); + return result; +} + +writer_node::ptr make_node( + sink_pos pos, + const_bytes transition, + std::span child_transitions, + std::span child_offsets, + std::optional payload, + bump_allocator& alctr +) { + auto node = writer_node::create(transition, alctr); + for (size_t i = 0; i < child_transitions.size(); ++i) { + std::byte transition[] = {std::byte(child_transitions[i])}; + auto child = node->add_child(transition, alctr); + child->_pos = pos - sink_offset(child_offsets[i]); + child->_node_size = node_size(1); + child->_branch_size = sink_offset(0); + child->_first_transition_byte = transition[0]; + } + if (payload) { + node->set_payload(*payload); + } + return node; +} + +void test_one_body( + std::span child_transitions, + std::span offsets, + std::span incoming_transition, + const std::optional& payload_opt, + sink_pos pos +) { + testlog.trace("transitions={} offsets={} payload={}", child_transitions, offsets, bool(payload_opt)); + SCYLLA_ASSERT(child_transitions.size() == offsets.size()); + bump_allocator alctr(128 * 1024); + auto node = make_node(pos, incoming_transition, child_transitions, offsets, payload_opt, alctr); + for (node_type type = node_type(0); type < NODE_TYPE_COUNT; type = node_type(int(type) + 1)) { + testlog.trace("type={}", int(type)); + if (eligible(type, *node, pos)) { + auto serialized = serialize_body(*node, pos, type); + + testlog.trace("serialized={}", fmt_hex(serialized)); + auto deserialized = deserialize_body(pos.value, serialized); + testlog.trace("deserialized_transitions={:x}", fmt::join(deserialized.transitions, ", ")); + REQUIRE_EQUAL(deserialized.offsets.size(), child_transitions.size()); + for (size_t i = 0; i < child_transitions.size(); ++i) { + REQUIRE_EQUAL(child_transitions[i], int(deserialized.transitions[i])); + REQUIRE_EQUAL(uint64_t(offsets[i]), deserialized.offsets[i]); + } + if (payload_opt) { + REQUIRE_EQUAL(deserialized.payload.bits, payload_opt->_payload_bits); + REQUIRE(std::ranges::equal( + deserialized.payload.bytes.subspan(0, payload_opt->blob().size()), + payload_opt->blob())); + } else { + REQUIRE_EQUAL(deserialized.payload.bits, 0); + } + + int n_slots = 0; + + testlog.trace("Test bti_read_node"); + { + auto result = bti_read_node(pos.value, serialized); + REQUIRE_GE(result.n_children, child_transitions.size()); + n_slots = result.n_children; + REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0); + } + + testlog.trace("Test bti_walk_down_leftmost_path"); + { + auto result = bti_walk_down_leftmost_path(pos.value, serialized); + REQUIRE_EQUAL(result.body_pos, pos.value); + REQUIRE_EQUAL(result.n_children, n_slots); + REQUIRE_EQUAL(result.child_offset, offsets.empty() ? -1 : offsets.front()); + REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0); + } + + testlog.trace("Test bti_walk_down_rightmost_path"); + { + auto result = bti_walk_down_rightmost_path(pos.value, serialized); + REQUIRE_EQUAL(result.body_pos, pos.value); + REQUIRE_EQUAL(result.n_children, n_slots); + REQUIRE_EQUAL(result.child_offset, offsets.empty() ? -1 : offsets.back()); + REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0); + } + + testlog.trace("Test bti_walk_down_along_key"); + { + for (int key_byte = 0; key_byte < 256; ++key_byte) { + //testlog.trace("key_byte={}", key_byte); + auto k = std::byte(key_byte); + auto result = bti_walk_down_along_key(pos.value, serialized, std::span(&k, 1)); + + auto target_child = std::ranges::lower_bound(child_transitions, uint8_t(k)) - child_transitions.begin(); + auto target_byte = target_child < int(child_transitions.size()) ? child_transitions[target_child] : -1; + auto target_offset = target_child < int(child_transitions.size()) ? offsets[target_child] : -1; + + REQUIRE_EQUAL(result.found_byte, target_byte); + REQUIRE_EQUAL(result.traversed_key_bytes, 0); + REQUIRE_EQUAL(result.n_children, n_slots); + REQUIRE_LE(result.found_idx, result.n_children); + REQUIRE_GE(result.found_idx, 0); + REQUIRE_EQUAL(result.child_offset, target_offset); + REQUIRE_EQUAL(result.body_pos, pos.value); + REQUIRE_EQUAL(result.payload_bits, payload_opt ? payload_opt->_payload_bits : 0); + + if (result.found_idx < result.n_children) { + auto child = get_child(pos.value, serialized, result.found_idx, true); + REQUIRE_EQUAL(result.found_byte, child.byte); + REQUIRE_EQUAL(result.found_idx, child.idx); + REQUIRE_EQUAL(result.child_offset, int64_t(child.offset)); + } + } + } + + testlog.trace("Test bti_get_child"); + { + int closest_occupied_slot = -1; + int n_observed_children = 0; + testlog.trace("Forwards"); + for (int i = n_slots - 1; i >= 0; --i) { + //testlog.trace("i={}", i); + auto result = bti_get_child(pos.value, serialized, i, true); + if (result.idx == i) { + closest_occupied_slot = i; + ++n_observed_children; + REQUIRE_EQUAL(int64_t(result.offset), offsets[offsets.size() - n_observed_children]); + } else { + REQUIRE_EQUAL(result.idx, closest_occupied_slot); + } + } + REQUIRE_EQUAL(n_observed_children, int(offsets.size())); + testlog.trace("Backwards"); + closest_occupied_slot = -1; + n_observed_children = 0; + for (int i = 0; i < n_slots; ++i) { + //testlog.trace("i={}", i); + auto result = bti_get_child(pos.value, serialized, i, false); + if (result.idx == i) { + closest_occupied_slot = i; + ++n_observed_children; + REQUIRE_EQUAL(int64_t(result.offset), offsets[n_observed_children - 1]); + } else { + REQUIRE_EQUAL(result.idx, closest_occupied_slot); + } + } + REQUIRE_EQUAL(n_observed_children, int(offsets.size())); + } + } + } +} + +// Tests the encoding of `writer_node`'s "body" +// (see the comment at the declaration fo bti_node_sink::write_body for what "body" means). +// +// Tries to cover all BTI node types and interesting node "shapes" +// (e.g. with and without gaps in the list of occupied child indexes). +// +// Generates various "interesting" nodes. For each such node, it BTI-encodes it, +// and checks that all BTI node traversal routines give the expected result +// on it. +SEASTAR_THREAD_TEST_CASE(test_body) { + std::vector> interesting_transition_sets = get_some_interesting_transition_sets(); + + // Arbitrary, but large enough to cover all interesting widths. + auto pos = sink_pos((uint64_t(1) << 60) + 1); + auto whatever = string_as_bytes("hahaha"); + const auto custom_payload = trie_payload(0x7, string_as_bytes("lololo")); + + for (int width = 1; width < 50; ++width) + for (const auto& child_transitions : interesting_transition_sets) + for (const auto& offsets : get_some_interesting_child_offsets(width, child_transitions.size())) + for (bool payload : {true, false}) { + auto payload_opt = payload ? std::optional(custom_payload) : std::optional(); + test_one_body(child_transitions, offsets, whatever, payload_opt, pos); + } +} + +static uint64_t bitwidth_mask(uint64_t width) { + if (width == 0) { + return 0; + } + return uint64_t(-1) >> (64 - width); +} + +static uint64_t get_random_int_bitwidth_weighted(uint64_t max) { + if (max == 0) { + return 0; + } + auto x = tests::random::get_int(1, max, tests::random::gen()); + auto max_width = std::bit_width(x); + auto width = tests::random::get_int(1, max_width, tests::random::gen()); + return x & bitwidth_mask(width); +} + +// Like `test_body` but with randomized parameters. +SEASTAR_THREAD_TEST_CASE(test_body_randomized) { + for (uint64_t trial = 0; trial < 1337; ++trial) { + auto n_children = get_random_int_bitwidth_weighted(256); + + std::vector offsets; + auto max_offset_width = tests::random::get_int(1, 63, tests::random::gen()); + for (uint64_t i = 0; i < n_children; ++i) { + auto off = get_random_int_bitwidth_weighted(bitwidth_mask(max_offset_width)); + offsets.push_back(std::max(1, off)); + } + + std::array possible_transitions; + std::ranges::iota(possible_transitions, 0); + + std::array transitions_buf; + auto child_transitions = std::span(transitions_buf.begin(), n_children); + std::ranges::sample(possible_transitions, child_transitions.begin(), child_transitions.size(), tests::random::gen()); + std::ranges::sort(child_transitions); + + std::optional payload_opt; + bool has_payload = tests::random::get_int(0, 1, tests::random::gen()); + if (has_payload) { + auto bits = tests::random::get_int(1, 15, tests::random::gen()); + auto length = tests::random::get_int(1, trie_payload::MAX_PAYLOAD_SIZE, tests::random::gen()); + auto bytes = tests::random::get_bytes(length); + payload_opt.emplace(bits, std::as_bytes(std::span(bytes))); + } + + auto max_offset = offsets.size() ? std::ranges::max(offsets) : 0; + auto pos = tests::random::get_int(max_offset, std::numeric_limits::max(), tests::random::gen()); + + // The transition doesn't matter for body serialization. + auto incoming_transition = string_as_bytes("hahaha"); + + test_one_body(child_transitions, offsets, incoming_transition, payload_opt, sink_pos(pos)); + } +} + +// Tests the encoding of `writer_node`'s "chain" +// (see the comment at the declaration fo bti_node_sink::write_body for what "chain" means). +// Tries to cover all BTI node types and interesting node "shapes" +// (e.g. with and without gaps in the list of occupied child indexes). +SEASTAR_THREAD_TEST_CASE(test_chain) { + std::vector interesting_transitions; + // 65 is supposed to be long enough to cover the SIMD code paths. + for (int i = 2; i < 65; ++i) { + interesting_transitions.push_back(tests::random::get_bytes(i)); + } + + std::vector interesting_body_offsets; + interesting_body_offsets.emplace_back(1); + interesting_body_offsets.emplace_back(2); + interesting_body_offsets.emplace_back(15); + interesting_body_offsets.emplace_back(16); + interesting_body_offsets.emplace_back(17); + + auto test_one = [] (const_bytes transition, node_size body_offset) { + bump_allocator alctr(128 * 1024); + auto node = writer_node::create(transition, alctr); + auto [serialized, starting_point] = serialize_chain(*node, body_offset); + auto deserialized = deserialize_chain(serialized, starting_point); + REQUIRE(deserialized.body_offset.value == body_offset.value); + REQUIRE(std::ranges::equal(deserialized.transition, transition.subspan(1))); + }; + + for (const auto& off : interesting_body_offsets) { + auto some_transition = std::as_bytes(std::span(interesting_transitions.back())); + test_one(some_transition, off); + } + for (const auto& transition : interesting_transitions) { + auto transition_view = std::as_bytes(std::span(transition)); + auto some_offset = interesting_body_offsets.back(); + test_one(transition_view, some_offset); + } +} + +// Reproduces an issue with `max_offset_from_child` which I encountered +// during development and which wasn't detected by the previous unit tests. +// (It was only detected by randomized integration tests. +// `mutation_source_test`, IIRC). +// +// Specifically: I forgot to add an `if` for the special case when +// `chain_length == 2 && child->_node_size >= 16`. +// and this was only detected by random integration tests. +// The test hits that branch and would fail without it. +SEASTAR_THREAD_TEST_CASE(test_max_offset_from_child_consistent_across_write) { + auto test_one = [] (const_bytes transition, size_t payload_size) { + testlog.trace("Testing transition={} payload_size={}", fmt_hex(transition), payload_size); + // Dummy output stream. + memory_data_sink_buffers bufs; + sstables::file_writer fw(data_sink(std::make_unique(bufs))); + fw.write(to_bytes_view(std::string_view("Let's offset the stream to make the test just a bit more general."))); + + // BTI node serializer. + constexpr size_t page_size = 4096; + bump_allocator alctr(128 * 1024); + bti_node_sink serializer(fw, page_size); + auto starting_pos = serializer.pos(); + + // Create a parent and a child. Their node size and branch size are uninitialized. + auto parent = writer_node::create(string_as_bytes("abc"), alctr); + auto child = parent->add_child(transition, alctr); + child->set_payload(trie_payload(1, string_as_bytes(std::string(payload_size, 'z')))); + + // max_offset_from_child needs to know the sizes (node size, branch size) of children. + // We could compute them manually, but we can use `recalc_sizes` too. + // But we have to set `_has_out_of_page_descendants`` to force `recalc_sizes` + // to reculculate the sizes. + child->_has_out_of_page_descendants = true; + auto child_size = writer_node::recalc_sizes(child, serializer, starting_pos); + child->_has_out_of_page_descendants = false; + + // Check that the size prediction done by max_offset_from_child + // is consistent with the actual distance to the child after it is written. + auto before = max_offset_from_child(*parent, serializer.pos() + child_size); + writer_node::write(child, serializer, true); + auto after = max_offset_from_child(*parent, serializer.pos()); + REQUIRE_EQUAL(before.value, after.value); + }; + + // Pick various transition lenghts and payload sizes to exercise all + // branches in max_offset_from_child() + std::vector interesting_transitions; + for (int i = 1; i < 32; ++i) { + interesting_transitions.push_back(tests::random::get_bytes(i)); + } + std::vector interesting_payload_sizes; + interesting_payload_sizes.emplace_back(1); + interesting_payload_sizes.emplace_back(20); + + for (const auto& transition : interesting_transitions) + for (const auto& payload_size : interesting_payload_sizes) { + auto transition_view = std::as_bytes(std::span(transition)); + test_one(transition_view, payload_size); + } +} diff --git a/utils/cached_file.hh b/utils/cached_file.hh index 3b320a10ab..fe41fa31ec 100644 --- a/utils/cached_file.hh +++ b/utils/cached_file.hh @@ -104,6 +104,10 @@ private: SCYLLA_ASSERT(!_use_count); } + size_t pos() const { + return idx * page_size; + } + void on_evicted() noexcept override; temporary_buffer get_buf() { @@ -124,6 +128,12 @@ private: return _lsa_buf.get(); } + // Returns a pointer to the contents of the page. + // The buffer can be invalidated when the page is evicted or when the owning LSA region invalidates references. + std::span get_view() const { + return std::as_bytes(std::span(_lsa_buf.get(), _lsa_buf.size())); + } + size_t size_in_allocator() { // lsa_buf occupies 4K in LSA even if the buf size is smaller. // _buf is transient and not accounted here. @@ -152,9 +162,21 @@ private: offset_type _last_page_size; page_idx_type _last_page; +public: + using ptr_type = cached_page::ptr_type; + struct page_read_result { + ptr_type ptr; + bool was_already_cached; + page_read_result(ptr_type p, bool cached) + : ptr(std::move(p)) + , was_already_cached(cached) + {} + }; + future get_shared_page(size_t global_pos, tracing::trace_state_ptr trace_state) { + return get_page_ptr(global_pos / page_size, 1, trace_state); + } private: - // Returns (page, true) if the page was cached, and (page, false) if the page was uncached. - future> get_page_ptr(page_idx_type idx, + future get_page_ptr(page_idx_type idx, page_count_type read_ahead, tracing::trace_state_ptr trace_state, std::optional permit = {}) { @@ -163,7 +185,7 @@ private: ++_metrics.page_hits; tracing::trace(trace_state, "page cache hit: file={}, page={}", _file_name, idx); cached_page& cp = *i; - return make_ready_future>(cp.share(), true); + return make_ready_future(cp.share(), true); } tracing::trace(trace_state, "page cache miss: file={}, page={}, readahead={}", _file_name, idx, read_ahead); ++_metrics.page_misses; @@ -206,7 +228,7 @@ private: utils::get_local_injector().inject("cached_file_get_first_page", []() { throw std::bad_alloc(); }); - return std::pair(std::move(first_page), false); + return page_read_result{std::move(first_page), false}; }); } // Returns (page, true) if the page was cached, and (page, false) if the page was uncached. @@ -214,14 +236,14 @@ private: page_count_type count, tracing::trace_state_ptr trace_state, std::optional permit = {}) { - return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (std::pair cp) mutable { - auto buf = cp.first->get_buf(); + return get_page_ptr(idx, count, std::move(trace_state), permit).then([permit] (page_read_result read_result) mutable { + auto buf = read_result.ptr->get_buf(); if (permit) { auto units = permit->consume_memory(buf.size()); buf = temporary_buffer(buf.get_write(), buf.size(), make_object_deleter(buf.release(), std::move(units))); } - return std::make_pair(std::move(buf), cp.second); + return std::make_pair(std::move(buf), read_result.was_already_cached); }); } public: @@ -385,15 +407,15 @@ public: } page_count_type readahead = div_ceil(_size_hint, page_size); return _cached_file->get_page_ptr(_page_idx, readahead, _trace_state, _permit).then( - [this] (std::pair read_result) mutable { - auto page = std::move(read_result.first); + [this] (page_read_result read_result) mutable { + auto page = std::move(read_result.ptr); size_t size = _page_idx == _cached_file->_last_page ? _cached_file->_last_page_size : page_size; page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), get_page_units()); _offset_in_page = 0; ++_page_idx; - shrink_size_hint(read_result.second); + shrink_size_hint(read_result.was_already_cached); return buf; }); }