/* * Copyright (C) 2024-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include "test/lib/simple_schema.hh" #include "test/lib/sstable_test_env.hh" #include "test/lib/sstable_utils.hh" #include "test/lib/make_random_string.hh" #include "readers/from_mutations.hh" using namespace sstables; // For the main use case of index readers — queries — // there's no need to support `abort_source`. // An index query will complete after several milliseconds, // so it won't meaningfully delay node shutdown. // // But `index_consume_entry_context` supports `abort_source` // for the sake of the optional bloom filter rebuild (from Index) that // happens before sstable sealing (see `sstable::maybe_rebuild_filter_from_index`). // This is a test for this. SEASTAR_TEST_CASE(test_abort_during_index_read) { return test_env::do_with_async([](test_env& env) { simple_schema ss; auto schema_ptr = ss.schema(); // This test only is designed specifically for BIG index readers. // It's not valid for BTI index readers. // (And we don't even want to support `abort_source` in BTI index readers. // There's no use case for that). auto version = sstable_version_types::me; auto mut = mutation(schema_ptr, ss.make_pkey()); auto mut_reader = make_mutation_reader_from_mutations(schema_ptr, env.make_reader_permit(), std::move(mut)); auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version); struct dummy_index_consumer { dummy_index_consumer() {} void consume_entry(parsed_partition_index_entry&& e) { // should be aborted before reaching here BOOST_ERROR("index_consume_entry_context was not aborted"); } } consumer; auto index_file = sst->index_file(); auto index_file_size = sst->index_size(); index_consume_entry_context consumer_ctx( *sst, env.make_reader_permit(), consumer, trust_promoted_index::no, make_file_input_stream(index_file, 0, index_file_size), 0, index_file_size, sst->get_column_translation(), sst->manager().get_abort_source()); // request abort before starting to consume index, so that the consumer throws an exception as soon as it starts env.request_abort(); BOOST_CHECK_THROW(consumer_ctx.consume_input().get(), seastar::abort_requested_exception); consumer_ctx.close().get(); }); } SEASTAR_TEST_CASE(test_promoted_index_parsing_page_crossing_and_retries) { return test_env::do_with_async([](test_env& env) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION testlog.info("Skipped because error injection is not enabled"); #else simple_schema ss; auto s = ss.schema(); // This test only is designed specifically for BIG index readers. // It's not valid for BTI index readers. auto version = sstable_version_types::me; auto pk = ss.make_pkey(); auto mut = mutation(s, pk); // enough to have same index block whose clustering key is split across pages std::vector keys; const auto n_keys = 100; auto key_size = cached_file::page_size / 3; // guarantees that index blocks are not congruent with page size. keys.reserve(n_keys); for (int i = 0; i < n_keys; ++i) { keys.push_back(ss.make_ckey(make_random_string(key_size))); ss.add_row(mut, keys[i], "v"); } clustering_key::less_compare less(*s); std::sort(keys.begin(), keys.end(), less); env.manager().set_promoted_index_block_size(1); // force entry for each row auto mut_reader = make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)); auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); tracing::trace_state_ptr trace = nullptr; auto index = std::make_unique(sst, permit, trace, use_caching::yes, true); auto close_index = deferred_close(*index); index->advance_to_definitely_present_partition(pk).get(); index->read_partition_data().get(); auto cur = dynamic_cast(index->current_clustered_cursor()); BOOST_REQUIRE(cur); std::optional prev_offset; int crossed_page = 0; utils::get_local_injector().enable("cached_promoted_index_parsing_invalidate_buf_across_page", false); for (int i = 0; i < n_keys - 1; ++i) { auto block_offset = cur->promoted_index().get_block_only_offset(i, trace).get()->offset; auto next_block_offset = cur->promoted_index().get_block_only_offset(i + 1, trace).get()->offset; auto start_page = block_offset / cached_file::page_size; auto end_page = (next_block_offset - 1) / cached_file::page_size; if (start_page != end_page) { auto pos = position_in_partition::for_key(keys[i]); position_in_partition::equal_compare eq(*s); testlog.info("Crossed page at block {}, offset [{}, {})", i, block_offset, next_block_offset); crossed_page++; auto* block = cur->promoted_index().get_block(i, trace).get(); testlog.debug("key : {}", pos); testlog.debug("start : {}", *block->start); testlog.debug("end : {}", *block->end); BOOST_REQUIRE(eq(*block->start, pos)); BOOST_REQUIRE(eq(*block->end, pos)); if (prev_offset) { BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset); } cur->promoted_index().clear(); utils::get_local_injector().enable("cached_promoted_index_bad_alloc_parsing_across_page", true); block = cur->promoted_index().get_block(i, trace).get(); testlog.debug("start : {}", *block->start); testlog.debug("end : {}", *block->end); BOOST_REQUIRE(eq(*block->start, pos)); BOOST_REQUIRE(eq(*block->end, pos)); if (prev_offset) { BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset); } prev_offset = block->data_file_offset; } } BOOST_REQUIRE_GE(crossed_page, 6); // If not, increase n_keys #endif }); } SEASTAR_TEST_CASE(test_no_data_file_read_on_missing_clustering_keys_with_dense_index) { return test_env::do_with_async([](test_env& env) { simple_schema ss; auto s = ss.schema(); // The general idea of this test could be applied to `ms` sstables // as well, but the test uses APIs specific to `bsearch_clustered_cursor`, // so it's not easy to adapt it to `ms`. auto version = sstable_version_types::me; auto pk = ss.make_pkey(); // enough to have same index block whose clustering key is split across pages clustering_key::less_compare less(*s); std::set keys(less); // present in data file std::set missing_keys(less); const auto n_keys = 13; auto key_size = 32; // whatever while (keys.size() < n_keys) { auto key = ss.make_ckey(make_random_string(key_size)); keys.emplace(key); } while (missing_keys.size() < n_keys) { auto key = ss.make_ckey(make_random_string(key_size)); if (!keys.contains(key)) { missing_keys.emplace(key); } } // Make sure there are missing keys before and after all present keys // to stress edge cases. { auto first_key = *keys.begin(); auto last_key = *std::prev(keys.end()); missing_keys.emplace(first_key); missing_keys.emplace(last_key); keys.erase(first_key); keys.erase(last_key); } auto last_key = *std::prev(keys.end()); auto mut = mutation(s, pk); for (auto key : keys) { ss.add_row(mut, key, "vvvv"); } env.manager().set_promoted_index_block_size(1); // force entry for each row auto mut_reader = make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)); auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer(), version); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); tracing::trace_state_ptr trace = nullptr; auto index = std::make_unique(sst, permit, trace, use_caching::yes, true); auto close_index = deferred_close(*index); index->advance_to_definitely_present_partition(pk).get(); index->read_partition_data().get(); auto index2 = std::make_unique(sst, permit, trace, use_caching::yes, true); auto close_index2 = deferred_close(*index2); index2->advance_to_definitely_present_partition(pk).get(); index2->advance_to_next_partition().get(); auto next_partition_offset = index2->data_file_positions().start; for (auto key : missing_keys) { auto cur = dynamic_cast(index->current_clustered_cursor()); BOOST_REQUIRE(cur); testlog.info("Testing missing key {}", key); auto skip_lb = cur->advance_to(position_in_partition::before_key(key)).get(); auto skip_ub = cur->probe_upper_bound(position_in_partition::after_key(*s, key)).get(); uint64_t lb_offset = index->get_data_file_position(); uint64_t ub_offset = next_partition_offset; if (skip_lb) { lb_offset = skip_lb->offset; } if (skip_ub) { ub_offset = *skip_ub; } BOOST_REQUIRE_EQUAL(lb_offset, ub_offset); index->reset_clustered_cursor().get(); } position_in_partition::equal_compare eq(*s); // Check that for each present key, single-row reads will read a single block for (auto key : keys) { auto cur = dynamic_cast(index->current_clustered_cursor()); BOOST_REQUIRE(cur); testlog.info("Testing present key {}", key); auto skip_lb = cur->advance_to(position_in_partition::before_key(key)).get(); auto skip_ub = cur->probe_upper_bound(position_in_partition::after_key(*s, key)).get(); uint64_t lb_offset = index->get_data_file_position(); uint64_t ub_offset = next_partition_offset; if (skip_lb) { lb_offset = skip_lb->offset; } if (skip_ub) { ub_offset = *skip_ub; } // All blocks have the same width, take any of them. auto expected_width = cur->promoted_index().get_block(0, trace).get()->width; BOOST_REQUIRE_EQUAL(ub_offset - lb_offset, expected_width); index->reset_clustered_cursor().get(); } }); }