Files
scylladb/test/boost/index_reader_test.cc
Michał Chojnowski 622149a183 test/boost/index_reader_test: prepare for ms sstables
Adjust the incompatibilities between the test and the upcoming
`ms` sstables. Refer to individual test for comments.
2025-09-29 22:15:25 +02:00

275 lines
11 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/testing/test_case.hh>
#include "test/lib/simple_schema.hh"
#include "test/lib/sstable_test_env.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/make_random_string.hh"
#include "readers/from_mutations.hh"
using namespace sstables;
// For the main use case of index readers — queries —
// there's no need to support `abort_source`.
// An index query will complete after several milliseconds,
// so it won't meaningfully delay node shutdown.
//
// But `index_consume_entry_context` supports `abort_source`
// for the sake of the optional bloom filter rebuild (from Index) that
// happens before sstable sealing (see `sstable::maybe_rebuild_filter_from_index`).
// This is a test for this.
SEASTAR_TEST_CASE(test_abort_during_index_read) {
return test_env::do_with_async([](test_env& env) {
simple_schema ss;
auto schema_ptr = ss.schema();
// This test only is designed specifically for BIG index readers.
// It's not valid for BTI index readers.
// (And we don't even want to support `abort_source` in BTI index readers.
// There's no use case for that).
auto version = sstable_version_types::me;
auto mut = mutation(schema_ptr, ss.make_pkey());
auto mut_reader = make_mutation_reader_from_mutations(schema_ptr, env.make_reader_permit(), std::move(mut));
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version);
struct dummy_index_consumer {
dummy_index_consumer() {}
void consume_entry(parsed_partition_index_entry&& e) {
// should be aborted before reaching here
BOOST_ERROR("index_consume_entry_context was not aborted");
}
} consumer;
auto index_file = sst->index_file();
auto index_file_size = sst->index_size();
index_consume_entry_context<dummy_index_consumer> consumer_ctx(
*sst, env.make_reader_permit(), consumer, trust_promoted_index::no,
make_file_input_stream(index_file, 0, index_file_size), 0, index_file_size,
sst->get_column_translation(),
sst->manager().get_abort_source());
// request abort before starting to consume index, so that the consumer throws an exception as soon as it starts
env.request_abort();
BOOST_CHECK_THROW(consumer_ctx.consume_input().get(), seastar::abort_requested_exception);
consumer_ctx.close().get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_parsing_page_crossing_and_retries) {
return test_env::do_with_async([](test_env& env) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
testlog.info("Skipped because error injection is not enabled");
#else
simple_schema ss;
auto s = ss.schema();
// This test only is designed specifically for BIG index readers.
// It's not valid for BTI index readers.
auto version = sstable_version_types::me;
auto pk = ss.make_pkey();
auto mut = mutation(s, pk);
// enough to have same index block whose clustering key is split across pages
std::vector<clustering_key> keys;
const auto n_keys = 100;
auto key_size = cached_file::page_size / 3; // guarantees that index blocks are not congruent with page size.
keys.reserve(n_keys);
for (int i = 0; i < n_keys; ++i) {
keys.push_back(ss.make_ckey(make_random_string(key_size)));
ss.add_row(mut, keys[i], "v");
}
clustering_key::less_compare less(*s);
std::sort(keys.begin(), keys.end(), less);
env.manager().set_promoted_index_block_size(1); // force entry for each row
auto mut_reader = make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut));
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version);
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
tracing::trace_state_ptr trace = nullptr;
auto index = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
auto close_index = deferred_close(*index);
index->advance_to_definitely_present_partition(pk).get();
index->read_partition_data().get();
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
BOOST_REQUIRE(cur);
std::optional<cached_file::offset_type> prev_offset;
int crossed_page = 0;
utils::get_local_injector().enable("cached_promoted_index_parsing_invalidate_buf_across_page", false);
for (int i = 0; i < n_keys - 1; ++i) {
auto block_offset = cur->promoted_index().get_block_only_offset(i, trace).get()->offset;
auto next_block_offset = cur->promoted_index().get_block_only_offset(i + 1, trace).get()->offset;
auto start_page = block_offset / cached_file::page_size;
auto end_page = (next_block_offset - 1) / cached_file::page_size;
if (start_page != end_page) {
auto pos = position_in_partition::for_key(keys[i]);
position_in_partition::equal_compare eq(*s);
testlog.info("Crossed page at block {}, offset [{}, {})", i, block_offset, next_block_offset);
crossed_page++;
auto* block = cur->promoted_index().get_block(i, trace).get();
testlog.debug("key : {}", pos);
testlog.debug("start : {}", *block->start);
testlog.debug("end : {}", *block->end);
BOOST_REQUIRE(eq(*block->start, pos));
BOOST_REQUIRE(eq(*block->end, pos));
if (prev_offset) {
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
}
cur->promoted_index().clear();
utils::get_local_injector().enable("cached_promoted_index_bad_alloc_parsing_across_page", true);
block = cur->promoted_index().get_block(i, trace).get();
testlog.debug("start : {}", *block->start);
testlog.debug("end : {}", *block->end);
BOOST_REQUIRE(eq(*block->start, pos));
BOOST_REQUIRE(eq(*block->end, pos));
if (prev_offset) {
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
}
prev_offset = block->data_file_offset;
}
}
BOOST_REQUIRE_GE(crossed_page, 6); // If not, increase n_keys
#endif
});
}
SEASTAR_TEST_CASE(test_no_data_file_read_on_missing_clustering_keys_with_dense_index) {
return test_env::do_with_async([](test_env& env) {
simple_schema ss;
auto s = ss.schema();
// The general idea of this test could be applied to `ms` sstables
// as well, but the test uses APIs specific to `bsearch_clustered_cursor`,
// so it's not easy to adapt it to `ms`.
auto version = sstable_version_types::me;
auto pk = ss.make_pkey();
// enough to have same index block whose clustering key is split across pages
clustering_key::less_compare less(*s);
std::set<clustering_key, clustering_key::less_compare> keys(less); // present in data file
std::set<clustering_key, clustering_key::less_compare> missing_keys(less);
const auto n_keys = 13;
auto key_size = 32; // whatever
while (keys.size() < n_keys) {
auto key = ss.make_ckey(make_random_string(key_size));
keys.emplace(key);
}
while (missing_keys.size() < n_keys) {
auto key = ss.make_ckey(make_random_string(key_size));
if (!keys.contains(key)) {
missing_keys.emplace(key);
}
}
// Make sure there are missing keys before and after all present keys
// to stress edge cases.
{
auto first_key = *keys.begin();
auto last_key = *std::prev(keys.end());
missing_keys.emplace(first_key);
missing_keys.emplace(last_key);
keys.erase(first_key);
keys.erase(last_key);
}
auto last_key = *std::prev(keys.end());
auto mut = mutation(s, pk);
for (auto key : keys) {
ss.add_row(mut, key, "vvvv");
}
env.manager().set_promoted_index_block_size(1); // force entry for each row
auto mut_reader = make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut));
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version);
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
tracing::trace_state_ptr trace = nullptr;
auto index = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
auto close_index = deferred_close(*index);
index->advance_to_definitely_present_partition(pk).get();
index->read_partition_data().get();
auto index2 = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
auto close_index2 = deferred_close(*index2);
index2->advance_to_definitely_present_partition(pk).get();
index2->advance_to_next_partition().get();
auto next_partition_offset = index2->data_file_positions().start;
for (auto key : missing_keys) {
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
BOOST_REQUIRE(cur);
testlog.info("Testing missing key {}", key);
auto skip_lb = cur->advance_to(position_in_partition::before_key(key)).get();
auto skip_ub = cur->probe_upper_bound(position_in_partition::after_key(*s, key)).get();
uint64_t lb_offset = index->get_data_file_position();
uint64_t ub_offset = next_partition_offset;
if (skip_lb) {
lb_offset = skip_lb->offset;
}
if (skip_ub) {
ub_offset = *skip_ub;
}
BOOST_REQUIRE_EQUAL(lb_offset, ub_offset);
index->reset_clustered_cursor().get();
}
position_in_partition::equal_compare eq(*s);
// Check that for each present key, single-row reads will read a single block
for (auto key : keys) {
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
BOOST_REQUIRE(cur);
testlog.info("Testing present key {}", key);
auto skip_lb = cur->advance_to(position_in_partition::before_key(key)).get();
auto skip_ub = cur->probe_upper_bound(position_in_partition::after_key(*s, key)).get();
uint64_t lb_offset = index->get_data_file_position();
uint64_t ub_offset = next_partition_offset;
if (skip_lb) {
lb_offset = skip_lb->offset;
}
if (skip_ub) {
ub_offset = *skip_ub;
}
// All blocks have the same width, take any of them.
auto expected_width = cur->promoted_index().get_block(0, trace).get()->width;
BOOST_REQUIRE_EQUAL(ub_offset - lb_offset, expected_width);
index->reset_clustered_cursor().get();
}
});
}