sstables/index_reader: short-circuit fast-forward-to when at EOF
Attempting to call advance_to() on the index, after it is positioned at EOF, can result in an assert failure, because the operation results in an attempt to move backwards in the index-file (to read the last index page, which was already read). This only happens if the index cache entry belonging to the last index page is evicted, otherwise the advance operation just looks-up said entry and returns it. To prevent this, we add an early return conditioned on eof() to all the partition-level advance-to methods. A regression unit test reproducing the above described crash is also added.
This commit is contained in:
@@ -774,6 +774,9 @@ public:
|
||||
|
||||
// Advance index_reader bounds to the bounds of the supplied range
|
||||
future<> advance_to(const dht::partition_range& range) {
|
||||
if (eof()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return seastar::when_all_succeed(
|
||||
advance_lower_to_start(range),
|
||||
advance_upper_to_end(range)).discard_result();
|
||||
@@ -911,6 +914,9 @@ public:
|
||||
// If upper_bound is provided, the upper bound within position is looked up
|
||||
future<bool> advance_lower_and_check_if_present(
|
||||
dht::ring_position_view key, std::optional<position_in_partition_view> pos = {}) {
|
||||
if (eof()) {
|
||||
return make_ready_future<bool>(false);
|
||||
}
|
||||
return advance_to(_lower_bound, key).then([this, key, pos] {
|
||||
if (eof()) {
|
||||
return make_ready_future<bool>(false);
|
||||
@@ -1037,6 +1043,9 @@ public:
|
||||
// Positions the cursor on the first partition which is not smaller than pos (like std::lower_bound).
|
||||
// Must be called for non-decreasing positions.
|
||||
future<> advance_to(dht::ring_position_view pos) {
|
||||
if (eof()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return advance_to(_lower_bound, pos);
|
||||
}
|
||||
|
||||
|
||||
@@ -3090,3 +3090,61 @@ SEASTAR_TEST_CASE(partial_sstable_deletion_test) {
|
||||
sst->unlink().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
|
||||
return test_setup::do_with_tmp_directory([&] (test_env& env, sstring tmpdir_path) {
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
auto random_spec = tests::make_random_schema_specification(
|
||||
get_name(),
|
||||
std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 8),
|
||||
std::uniform_int_distribution<size_t>(2, 8));
|
||||
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
|
||||
auto schema = random_schema.schema();
|
||||
auto permit = env.make_reader_permit();
|
||||
|
||||
testlog.info("Random schema:\n{}", random_schema.cql());
|
||||
|
||||
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
|
||||
|
||||
auto sst = env.make_sstable(schema, tmpdir_path, 0, writable_sstable_versions.back(), big);
|
||||
{
|
||||
auto mr = make_flat_mutation_reader_from_mutations_v2(schema, permit, muts);
|
||||
auto close_mr = deferred_close(mr);
|
||||
|
||||
sstable_writer_config cfg = env.manager().configure_writer();
|
||||
|
||||
auto wr = sst->get_writer(*schema, 1, cfg, encoding_stats{}, default_priority_class());
|
||||
mr.consume_in_thread(std::move(wr));
|
||||
|
||||
sst->load().get();
|
||||
}
|
||||
|
||||
const auto t1 = muts.front().decorated_key()._token;
|
||||
const auto t2 = muts.back().decorated_key()._token;
|
||||
dht::partition_range_vector prs;
|
||||
|
||||
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t1.raw() - 200}), dht::ring_position::ending_at({dht::token_kind::key, t1.raw() - 100}));
|
||||
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t1.raw() + 2}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 2}));
|
||||
// Should be at eof() after the above range is finished
|
||||
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t2.raw() + 100}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 200}));
|
||||
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t2.raw() + 300}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 400}));
|
||||
|
||||
auto reader = sst->make_reader(schema, permit, prs.front(), schema->full_slice());
|
||||
auto close_reader = deferred_close(reader);
|
||||
while (reader().get());
|
||||
|
||||
auto& region = env.manager().get_cache_tracker().region();
|
||||
|
||||
for (auto it = std::next(prs.begin()); it != prs.end(); ++it) {
|
||||
testlog.info("fast_forward_to({})", *it);
|
||||
reader.fast_forward_to(*it).get();
|
||||
while (reader().get());
|
||||
// Make sure the index page linked into LRU after EOF is evicted.
|
||||
while (region.evict_some() == memory::reclaiming_result::reclaimed_something);
|
||||
}
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user