evictable_reader: reset _range_override after fast-forwarding

`_range_override` is used to store the modified range the reader reads
after it has to be recreated (when recreating a reader it's read range
is reduced to account for partitions it already read). When engaged,
this field overrides the `_pr` field as the definitive range the reader
is supposed to be currently reading. Fast forwarding conceptually
overrides the range the reader is currently reading, however currently
it doesn't reset the `_range_override` field. This resulted in
`_range_override` (containing the modified pre-fast-forward range)
incorrectly overriding the fast-forwarded-to range in `_pr` when
validating the first partition produced by the just recreated reader,
resulting in a false-positive validation failure.

Fixes: #8059

Tests: unit(release)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20210217164744.420100-1-bdenes@scylladb.com>
This commit is contained in:
Botond Dénes
2021-02-17 18:47:44 +02:00
committed by Avi Kivity
parent 4b46793c19
commit c3b4c3f451
2 changed files with 114 additions and 7 deletions

View File

@@ -1472,18 +1472,18 @@ future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::t
_end_of_stream = false;
if (_reader) {
return _reader->fast_forward_to(pr, timeout);
co_await _reader->fast_forward_to(pr, timeout);
_range_override.reset();
co_return;
}
if (!_reader_created || !_irh) {
return make_ready_future<>();
co_return;
}
if (auto reader_opt = try_resume()) {
auto f = reader_opt->fast_forward_to(pr, timeout);
return f.then([this, reader = std::move(*reader_opt)] () mutable {
maybe_pause(std::move(reader));
});
co_await reader_opt->fast_forward_to(pr, timeout);
_range_override.reset();
maybe_pause(std::move(*reader_opt));
}
return make_ready_future<>();
}
evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r)

View File

@@ -3570,6 +3570,113 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
max_buffer_size);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) {
class test_reader : public flat_mutation_reader::impl {
simple_schema _s;
const std::vector<dht::decorated_key> _pkeys;
std::vector<dht::decorated_key>::const_iterator _it;
std::vector<dht::decorated_key>::const_iterator _end;
private:
void on_range_change(const dht::partition_range& pr) {
dht::ring_position_comparator cmp(*_schema);
_it = _pkeys.begin();
while (_it != _pkeys.end() && !pr.contains(*_it, cmp)) {
++_it;
}
_end = _it;
while (_end != _pkeys.end() && pr.contains(*_end, cmp)) {
++_end;
}
}
public:
test_reader(simple_schema s, reader_permit permit, const dht::partition_range& pr, std::vector<dht::decorated_key> pkeys)
: impl(s.schema(), std::move(permit))
, _s(std::move(s))
, _pkeys(std::move(pkeys)) {
on_range_change(pr);
}
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
if (_it == _end) {
_end_of_stream = true;
return make_ready_future<>();
}
push_mutation_fragment(*_schema, _permit, partition_start(*_it++, {}));
uint32_t ck = 0;
while (!is_buffer_full()) {
auto ckey = _s.make_ckey(ck);
push_mutation_fragment(*_schema, _permit, _s.make_row(_s.make_ckey(ck++), make_random_string(1024)));
++ck;
}
push_mutation_fragment(*_schema, _permit, partition_end());
return make_ready_future<>();
}
virtual future<> next_partition() override {
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point) override {
on_range_change(pr);
clear_buffer();
_end_of_stream = false;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(6);
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
auto ms = mutation_source([&] (schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
std::vector<dht::decorated_key> pkeys_with_data;
bool empty = false;
for (const auto& pkey : pkeys) {
empty = !empty;
if (empty) {
pkeys_with_data.push_back(pkey);
}
}
return make_flat_mutation_reader<test_reader>(
s,
std::move(permit),
range,
std::move(pkeys_with_data));
});
auto pr0 = dht::partition_range::make({pkeys[0], true}, {pkeys[3], true});
auto [reader, handle] = make_manually_paused_evictable_reader(std::move(ms), s.schema(), permit, pr0, s.schema()->full_slice(),
seastar::default_priority_class(), {}, mutation_reader::forwarding::yes);
auto reader_assert = assert_that(std::move(reader));
reader_assert.produces(pkeys[0]);
reader_assert.produces(pkeys[2]);
handle.pause();
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
reader_assert.produces_end_of_stream();
auto pr1 = dht::partition_range::make({pkeys[4], true}, {pkeys[5], true});
reader_assert.fast_forward_to(pr1);
// Failure will happen in the form of `on_internal_error()`.
reader_assert.produces(pkeys[4]);
}
struct mutation_bounds {
mutation m;
position_in_partition lower;