mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-27 11:55:15 +00:00
The reader_lifecycle_policy API was created around the idea of shard readers (optionally) being saved and reused on the next page. To do this, the lifecycle policy has to also be able to control the lifecycle of by-reference parameters of readers: the slice and the range. This was possible from day 1, as the readers are created through the lifecycle policy, which can intercept and replace the said parameters with copies that are created in stable storage. There was one whole in the design though: fast-forwarding, which can change the range of the read, without the lifecycle policy knowing about this. In practice this results in fast-forwarded readers being saved together with the wrong range, their range reference becoming stale. The only lifecycle implementation prone to this is the one in `multishard_mutation_query.cc`, as it is the only one actually saving readers. It will fast-forward its reader when the query happens over multiple ranges. There were no problems related to this so far because no one passes more than one range to said functions, but this is incidental. This patch solves this by adding an `update_read_range()` method to the lifecycle policy, allowing the shard reader to update the read range when being fast forwarded. To allow the shard reader to also have control over the lifecycle of this range, a shared pointer is used. This control is required because when an `evictable_reader` is the top-level reader on the shard, it can invoke `create_reader()` with an edited range after `update_read_range()`, replacing the fast-forwarded-to range with a new one, yanking it out from under the feet of the evictable reader itself. By using a shared pointer here, we can ensure the range stays alive while it is the current one.
116 lines
4.9 KiB
C++
116 lines
4.9 KiB
C++
/*
|
|
* Copyright (C) 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "mutation_reader.hh"
|
|
#include <seastar/core/gate.hh>
|
|
|
|
class test_reader_lifecycle_policy
|
|
: public reader_lifecycle_policy
|
|
, public enable_shared_from_this<test_reader_lifecycle_policy> {
|
|
using factory_function = std::function<flat_mutation_reader(
|
|
schema_ptr,
|
|
reader_permit,
|
|
const dht::partition_range&,
|
|
const query::partition_slice&,
|
|
const io_priority_class&,
|
|
tracing::trace_state_ptr,
|
|
mutation_reader::forwarding)>;
|
|
|
|
struct reader_context {
|
|
std::optional<reader_concurrency_semaphore> semaphore;
|
|
lw_shared_ptr<const dht::partition_range> range;
|
|
std::optional<const query::partition_slice> slice;
|
|
|
|
reader_context() = default;
|
|
reader_context(dht::partition_range range, query::partition_slice slice)
|
|
: range(make_lw_shared<const dht::partition_range>(std::move(range))), slice(std::move(slice)) {
|
|
}
|
|
};
|
|
|
|
factory_function _factory_function;
|
|
std::vector<foreign_ptr<std::unique_ptr<reader_context>>> _contexts;
|
|
std::vector<future<>> _destroy_futures;
|
|
bool _evict_paused_readers = false;
|
|
|
|
public:
|
|
explicit test_reader_lifecycle_policy(factory_function f, bool evict_paused_readers = false)
|
|
: _factory_function(std::move(f))
|
|
, _contexts(smp::count)
|
|
, _evict_paused_readers(evict_paused_readers) {
|
|
}
|
|
virtual flat_mutation_reader create_reader(
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
const io_priority_class& pc,
|
|
tracing::trace_state_ptr trace_state,
|
|
mutation_reader::forwarding fwd_mr) override {
|
|
const auto shard = this_shard_id();
|
|
if (_contexts[shard]) {
|
|
_contexts[shard]->range = make_lw_shared<const dht::partition_range>(range);
|
|
_contexts[shard]->slice.emplace(slice);
|
|
} else {
|
|
_contexts[shard] = make_foreign(std::make_unique<reader_context>(range, slice));
|
|
}
|
|
return _factory_function(std::move(schema), std::move(permit), *_contexts[shard]->range, *_contexts[shard]->slice, pc, std::move(trace_state), fwd_mr);
|
|
}
|
|
void update_read_range(lw_shared_ptr<const dht::partition_range> range) override {
|
|
const auto shard = this_shard_id();
|
|
assert(_contexts[shard]);
|
|
_contexts[shard]->range = std::move(range);
|
|
}
|
|
virtual future<> destroy_reader(stopped_reader reader) noexcept override {
|
|
auto& ctx = _contexts[this_shard_id()];
|
|
auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(reader.handle));
|
|
auto ret = reader_opt ? reader_opt->close() : make_ready_future<>();
|
|
return ret.finally([&ctx] {
|
|
return ctx->semaphore->stop().finally([&ctx] {
|
|
ctx.release();
|
|
});
|
|
});
|
|
}
|
|
virtual reader_concurrency_semaphore& semaphore() override {
|
|
const auto shard = this_shard_id();
|
|
if (!_contexts[shard]) {
|
|
_contexts[shard] = make_foreign(std::make_unique<reader_context>());
|
|
} else if (_contexts[shard]->semaphore) {
|
|
return *_contexts[shard]->semaphore;
|
|
}
|
|
// To support multiple reader life-cycle instances alive at the same
|
|
// time, incorporate `this` into the name, to make their names unique.
|
|
auto name = format("tests::reader_lifecycle_policy@{}@shard_id={}", fmt::ptr(this), shard);
|
|
if (_evict_paused_readers) {
|
|
// Create with no memory, so all inactive reads are immediately evicted.
|
|
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
|
|
} else {
|
|
_contexts[shard]->semaphore.emplace(reader_concurrency_semaphore::no_limits{}, std::move(name));
|
|
}
|
|
return *_contexts[shard]->semaphore;
|
|
}
|
|
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override {
|
|
return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout);
|
|
}
|
|
};
|
|
|