Somehow, the line of code responsible for freeing flushed nodes in `trie_writer` is missing from the implementation. This effectively means that `trie_writer` keeps the whole index in memory until the index writer is closed, which for many dataset is a guaranteed OOM. Fix that, and add some test that catches this. Fixes scylladb/scylladb#27082 Closes scylladb/scylladb#27083
315 lines
12 KiB
C++
315 lines
12 KiB
C++
/*
|
|
* Copyright (C) 2024-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/memory.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <xxhash.h>
|
|
#include <fmt/ranges.h>
|
|
#include <numeric>
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
#include "utils/bit_cast.hh"
|
|
#include "sstables/trie/trie_writer.hh"
|
|
|
|
using namespace sstables::trie;
|
|
|
|
inline const_bytes string_as_bytes(std::string_view sv) {
|
|
return std::as_bytes(std::span(sv.data(), sv.size()));
|
|
}
|
|
|
|
inline std::string_view bytes_as_string(const_bytes sv) {
|
|
return {reinterpret_cast<const char*>(sv.data()), sv.size()};
|
|
}
|
|
|
|
// generate_all_subsets(4, 2) = {{0, 1}, {0, 2}, {0, 3}, {1, 2}, {1, 3}, {2, 3}}
|
|
std::vector<std::vector<size_t>> generate_all_subsets(size_t n, size_t k) {
|
|
if (k == 0) {
|
|
return {std::vector<size_t>()};
|
|
}
|
|
using sample = std::vector<size_t>;
|
|
std::vector<size_t> wksp(k);
|
|
auto first = wksp.begin();
|
|
auto last = wksp.end();
|
|
// Fill wksp with first possible sample.
|
|
std::ranges::iota(wksp, 0);
|
|
std::vector<sample> samples;
|
|
while (true) {
|
|
samples.push_back(wksp);
|
|
// Advance wksp to next possible sample.
|
|
auto mt = last;
|
|
--mt;
|
|
while (mt > first && *mt == n - (last - mt)) {
|
|
--mt;
|
|
}
|
|
if (mt == first && *mt == n - (last - mt)) {
|
|
break;
|
|
}
|
|
++(*mt);
|
|
while (++mt != last) {
|
|
*mt = *(mt - 1) + 1;
|
|
}
|
|
}
|
|
return samples;
|
|
}
|
|
|
|
// Checks that the stream of nodes produced by a trie_writer
|
|
// with given parameters is consistent with the inputs.
|
|
void test_one_set_of_strings(
|
|
const std::vector<std::string_view>& inputs,
|
|
size_t branching_factor,
|
|
size_t page_size,
|
|
size_t max_chain_len,
|
|
size_t start_pos,
|
|
uint64_t seed
|
|
) {
|
|
testlog.debug("test_one_set_of_strings: inputs={}, page_size={}, max_chain_len={}", inputs, page_size, max_chain_len);
|
|
assert(std::ranges::is_sorted(inputs));
|
|
struct trie_output_stream {
|
|
sink_pos _pos{0};
|
|
size_t _page_size = 0;
|
|
uint64_t _seed = 0;
|
|
uint64_t _padding = 0;
|
|
struct serialized {
|
|
trie_payload payload;
|
|
std::vector<std::byte> transition;
|
|
std::vector<sink_pos> children;
|
|
};
|
|
std::map<sink_pos, serialized> _output;
|
|
|
|
trie_output_stream(size_t page_size, uint64_t seed) : _page_size(page_size), _seed(seed) {
|
|
}
|
|
node_size serialized_size(const writer_node& x, sink_pos start_pos) const {
|
|
// Returns a "random" size in range [1, page_size].
|
|
//
|
|
// With enough test cases, this should be enough to test the full variety
|
|
// of interesting situations in the writer.
|
|
//
|
|
// The writer assumes that the size depends only on the array of offsets of the
|
|
// node to its children. Therefore we take the hash of the offsets as the "random" number.
|
|
std::vector<uint64_t> relevant_data;
|
|
size_t total_offset = 0;
|
|
relevant_data.reserve(x.get_children().size() * 2);
|
|
std::span children = x.get_children();
|
|
for (auto it = children.rbegin(); it != children.rend(); ++it) {
|
|
const auto& c = *it;
|
|
testlog.trace("serialized_size: child={}, startpos={} ns={}, bs={}, pos={}", fmt::ptr(&c), start_pos.value, c->_node_size.value, c->_branch_size.value, c->_pos.value);
|
|
|
|
if (c->_pos.valid()) {
|
|
relevant_data.push_back((start_pos - c->_pos).value);
|
|
} else {
|
|
total_offset += c->_node_size.value;
|
|
relevant_data.push_back(total_offset - 1);
|
|
total_offset += c->_branch_size.value;
|
|
}
|
|
}
|
|
auto hash = XXH64(relevant_data.data(), std::span(relevant_data).size_bytes(), _seed);
|
|
auto result = 1 + hash % _page_size;
|
|
testlog.trace("serialized_size: x={}, result={}, offsets={}", fmt::ptr(&x), result, relevant_data);
|
|
return node_size(result);
|
|
}
|
|
sink_pos write(const writer_node& x, sink_pos start_pos) {
|
|
serialized s;
|
|
for (const auto& c : x.get_children()) {
|
|
BOOST_REQUIRE(c->_pos.valid());
|
|
s.children.push_back(c->_pos);
|
|
}
|
|
const_bytes transition_view{x._transition.get(), x._transition_length};
|
|
s.transition = std::vector<std::byte>(transition_view.begin(), transition_view.end());
|
|
s.payload = x._payload;
|
|
sink_pos result = _pos + sink_offset(1);
|
|
auto sz = serialized_size(x, _pos);
|
|
_pos = _pos + sz;
|
|
_output.insert({result, std::move(s)});
|
|
testlog.trace("write: {}", sz.value);
|
|
return result;
|
|
}
|
|
size_t page_size() const {
|
|
return _page_size;
|
|
};
|
|
void pad_to_page_boundary() {
|
|
size_t pad = bytes_left_in_page();
|
|
_padding += pad;
|
|
testlog.trace("pad: {}", pad);
|
|
_pos = _pos + sink_offset(pad);
|
|
};
|
|
size_t bytes_left_in_page() const {
|
|
return round_up(_pos.value + 1, page_size()) - _pos.value;
|
|
};
|
|
sink_pos pos() const {
|
|
return _pos;
|
|
}
|
|
void skip(size_t sz) {
|
|
_pos = _pos + sink_offset(sz);
|
|
}
|
|
};
|
|
static_assert(trie_writer_sink<trie_output_stream>);
|
|
|
|
trie_output_stream out(page_size, seed);
|
|
|
|
out.skip(start_pos);
|
|
|
|
const auto max_expected_alloc = branching_factor * sizeof(writer_node) + sizeof(std::max_align_t);
|
|
auto wr = trie_writer(out, max_chain_len, std::bit_ceil(max_expected_alloc));
|
|
|
|
std::vector<trie_payload> payloads;
|
|
|
|
std::string_view prev = {};
|
|
for (size_t i = 0; i < inputs.size(); ++i) {
|
|
size_t depth = std::ranges::mismatch(inputs[i], prev).in1 - inputs[i].begin();
|
|
payloads.push_back(trie_payload(1 + i%15, object_representation(42 + i)));
|
|
wr.add(depth, string_as_bytes(inputs[i]).subspan(depth), payloads.back());
|
|
prev = inputs[i];
|
|
}
|
|
|
|
auto root_pos = wr.finish();
|
|
|
|
if (inputs.size() == 0) {
|
|
BOOST_REQUIRE(!root_pos.valid());
|
|
BOOST_REQUIRE_EQUAL(out.pos().value, start_pos);
|
|
return;
|
|
}
|
|
|
|
assert(root_pos.valid());
|
|
|
|
std::vector<std::string> outputs;
|
|
std::vector<trie_payload> output_payloads;
|
|
struct local_state {
|
|
sink_pos _idx;
|
|
int _stage;
|
|
};
|
|
std::vector<size_t> transition_lengths;
|
|
std::vector<std::byte> transition_stack;
|
|
std::vector<local_state> stack;
|
|
stack.push_back({root_pos, -1});
|
|
while (stack.size()) {
|
|
auto& [idx, stage] = stack.back();
|
|
const auto& node = out._output.at(idx);
|
|
if (stage < 0) {
|
|
transition_stack.insert(transition_stack.end(), node.transition.begin(), node.transition.end());
|
|
transition_lengths.push_back(node.transition.size());
|
|
if (node.payload._payload_bits) {
|
|
outputs.push_back(std::string(bytes_as_string(transition_stack).substr(1)));
|
|
output_payloads.push_back(node.payload);
|
|
}
|
|
stage += 1;
|
|
}
|
|
if (stage < int(node.children.size())) {
|
|
stage += 1;
|
|
stack.push_back({node.children[stage - 1], -1});
|
|
continue;
|
|
}
|
|
transition_stack.resize(transition_stack.size() - transition_lengths.back());
|
|
transition_lengths.pop_back();
|
|
stack.pop_back();
|
|
}
|
|
testlog.debug("Outputs: {}", outputs);
|
|
BOOST_REQUIRE(std::ranges::equal(inputs, outputs));
|
|
BOOST_REQUIRE(std::ranges::equal(payloads, output_payloads));
|
|
}
|
|
|
|
// Checks that the writer produces the right results with all possible datasets,
|
|
// smaller (w.r.t. various parameters) than some arbitrary choice.
|
|
//
|
|
// The choice should be small enough to finish in reasonable time,
|
|
// and large enough to provide coverage for all logic.
|
|
// (If you modify the limits, you should probably at least check
|
|
// that you didn't make code coverage lower).
|
|
SEASTAR_THREAD_TEST_CASE(test_exhaustive) {
|
|
// testlog.set_level(seastar::log_level::trace);
|
|
// trie_logger.set_level(seastar::log_level::trace);
|
|
size_t max_input_length = 3;
|
|
size_t max_set_size = 3;
|
|
size_t max_page_size = 3;
|
|
size_t max_start_pos = 1;
|
|
const char chars[] = "abc";
|
|
auto all_strings = tests::generate_all_strings(chars, max_input_length);
|
|
size_t case_counter = 0;
|
|
testlog.info("test_exhaustive: start");
|
|
for (size_t set_size = 0; set_size <= max_set_size; ++set_size) {
|
|
auto subsets = generate_all_subsets(all_strings.size(), set_size);
|
|
std::vector<std::string_view> test_set;
|
|
for (const auto& x : subsets) {
|
|
test_set.clear();
|
|
for (const auto& i : x) {
|
|
test_set.push_back(all_strings[i]);
|
|
}
|
|
for (size_t page_size = 1; page_size <= max_page_size; ++page_size)
|
|
for (size_t max_chain_len = 1; max_chain_len <= max_input_length; ++max_chain_len)
|
|
for (size_t start_pos = 0; start_pos <= max_start_pos; ++start_pos) {
|
|
if (case_counter % 1000 == 0) {
|
|
testlog.debug("test_exhaustive: in progress: cases={}", case_counter);
|
|
}
|
|
test_one_set_of_strings(test_set, std::size(chars), page_size, max_chain_len, start_pos, case_counter);
|
|
case_counter += 1;
|
|
}
|
|
}
|
|
}
|
|
testlog.info("test_exhaustive: cases={}", case_counter);
|
|
}
|
|
|
|
struct null_trie_output_stream {
|
|
sink_pos _pos{0};
|
|
size_t _page_size = 0;
|
|
null_trie_output_stream(size_t page_size) : _page_size(page_size) {
|
|
}
|
|
node_size serialized_size(const writer_node& x, sink_pos start_pos) const {
|
|
return node_size(1);
|
|
}
|
|
sink_pos write(const writer_node& x, sink_pos start_pos) {
|
|
_pos = _pos + sink_offset(1);
|
|
return _pos;
|
|
}
|
|
size_t page_size() const {
|
|
return _page_size;
|
|
};
|
|
void pad_to_page_boundary() {
|
|
size_t pad = bytes_left_in_page();
|
|
_pos = _pos + sink_offset(pad);
|
|
};
|
|
size_t bytes_left_in_page() const {
|
|
return round_up(_pos.value + 1, page_size()) - _pos.value;
|
|
};
|
|
sink_pos pos() const {
|
|
return _pos;
|
|
}
|
|
void skip(size_t sz) {
|
|
_pos = _pos + sink_offset(sz);
|
|
}
|
|
};
|
|
static_assert(trie_writer_sink<null_trie_output_stream>);
|
|
|
|
// Reproducer for scylladb/scylladb#27082
|
|
//
|
|
// Feeds many keys to a trie_writer, checks that memory usage doesn't grow without bounds.
|
|
SEASTAR_THREAD_TEST_CASE(test_finite_memory_usage) {
|
|
// trie_logger.set_level(seastar::log_level::trace);
|
|
|
|
const size_t n_keys = 1000000;
|
|
const size_t memory_usage_before = seastar::memory::stats().allocated_memory();
|
|
|
|
// Max memory usage for a dense dataset is around 256 * 256 * 80.
|
|
// (~80 bytes per leaf node, 256 leaf node per internal node, 256 internal nodes).
|
|
// With TRIE_SANITIZE_BUMP_ALLOCATOR, we allow way more memory for extra allocator metadata.
|
|
const size_t max_allowed_memory_usage = memory_usage_before + 8000000 * (TRIE_SANITIZE_BUMP_ALLOCATOR ? 10 : 1);
|
|
|
|
auto out = null_trie_output_stream(4096);
|
|
auto wr = trie_writer(out);
|
|
auto dummy_payload = trie_payload(1, std::array<std::byte, 1>{});
|
|
std::vector<std::byte> prev_key;
|
|
for (uint32_t i = 0; i < n_keys; ++i) {
|
|
std::vector<std::byte> curr_key(std::from_range, object_representation(seastar::cpu_to_be(i)));
|
|
size_t depth = std::ranges::mismatch(curr_key, prev_key).in1 - curr_key.begin();
|
|
wr.add(depth, std::span(curr_key).subspan(depth), dummy_payload);
|
|
prev_key = curr_key;
|
|
tests::require_greater_equal(seastar::memory::stats().allocated_memory(), memory_usage_before);
|
|
tests::require_less_equal(seastar::memory::stats().allocated_memory(), max_allowed_memory_usage);
|
|
}
|
|
}
|