sstables/index_reader: short-circuit fast-forward-to when at EOF

Attempting to call advance_to() on the index, after it is positioned at
EOF, can result in an assert failure, because the operation results in
an attempt to move backwards in the index-file (to read the last index
page, which was already read). This only happens if the index cache
entry belonging to the last index page is evicted, otherwise the advance
operation just looks-up said entry and returns it.
To prevent this, we add an early return conditioned on eof() to all the
partition-level advance-to methods.
A regression unit test reproducing the above described crash is also
added.
This commit is contained in:
Botond Dénes
2022-05-05 14:34:28 +03:00
parent 98f3d516a2
commit e8f3d7dd13
2 changed files with 67 additions and 0 deletions

View File

@@ -774,6 +774,9 @@ public:
// Advance index_reader bounds to the bounds of the supplied range
future<> advance_to(const dht::partition_range& range) {
if (eof()) {
return make_ready_future<>();
}
return seastar::when_all_succeed(
advance_lower_to_start(range),
advance_upper_to_end(range)).discard_result();
@@ -911,6 +914,9 @@ public:
// If upper_bound is provided, the upper bound within position is looked up
future<bool> advance_lower_and_check_if_present(
dht::ring_position_view key, std::optional<position_in_partition_view> pos = {}) {
if (eof()) {
return make_ready_future<bool>(false);
}
return advance_to(_lower_bound, key).then([this, key, pos] {
if (eof()) {
return make_ready_future<bool>(false);
@@ -1037,6 +1043,9 @@ public:
// Positions the cursor on the first partition which is not smaller than pos (like std::lower_bound).
// Must be called for non-decreasing positions.
future<> advance_to(dht::ring_position_view pos) {
if (eof()) {
return make_ready_future<>();
}
return advance_to(_lower_bound, pos);
}

View File

@@ -3090,3 +3090,61 @@ SEASTAR_TEST_CASE(partial_sstable_deletion_test) {
sst->unlink().get();
});
}
SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
return test_setup::do_with_tmp_directory([&] (test_env& env, sstring tmpdir_path) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
auto permit = env.make_reader_permit();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sst = env.make_sstable(schema, tmpdir_path, 0, writable_sstable_versions.back(), big);
{
auto mr = make_flat_mutation_reader_from_mutations_v2(schema, permit, muts);
auto close_mr = deferred_close(mr);
sstable_writer_config cfg = env.manager().configure_writer();
auto wr = sst->get_writer(*schema, 1, cfg, encoding_stats{}, default_priority_class());
mr.consume_in_thread(std::move(wr));
sst->load().get();
}
const auto t1 = muts.front().decorated_key()._token;
const auto t2 = muts.back().decorated_key()._token;
dht::partition_range_vector prs;
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t1.raw() - 200}), dht::ring_position::ending_at({dht::token_kind::key, t1.raw() - 100}));
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t1.raw() + 2}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 2}));
// Should be at eof() after the above range is finished
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t2.raw() + 100}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 200}));
prs.emplace_back(dht::ring_position::starting_at({dht::token_kind::key, t2.raw() + 300}), dht::ring_position::ending_at({dht::token_kind::key, t2.raw() + 400}));
auto reader = sst->make_reader(schema, permit, prs.front(), schema->full_slice());
auto close_reader = deferred_close(reader);
while (reader().get());
auto& region = env.manager().get_cache_tracker().region();
for (auto it = std::next(prs.begin()); it != prs.end(); ++it) {
testlog.info("fast_forward_to({})", *it);
reader.fast_forward_to(*it).get();
while (reader().get());
// Make sure the index page linked into LRU after EOF is evicted.
while (region.evict_some() == memory::reclaiming_result::reclaimed_something);
}
return make_ready_future<>();
});
}