restricted_mutation_reader: restrict based-on memory consumption
Restrict readers based on their memory consumption, instead of the count of the top-level readers. To do this an interposer is installed at the input_stream level which tracks buffers emmited by the stream. This way we can have an accurate picture of the readers' actual memory consumption. New readers will consume 16k units from the semaphore up-front. This is to account their own memory-consumption, apart from the buffers they will allocate. Creating the reader will be deferred to when there are enough resources to create it. As before only new readers will be blocked on an exhausted semaphore, existing readers can continue to work.
This commit is contained in:
98
database.cc
98
database.cc
@@ -381,6 +381,7 @@ class incremental_reader_selector : public reader_selector {
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
const io_priority_class& _pc;
|
||||
const query::partition_slice& _slice;
|
||||
reader_resource_tracker _resource_tracker;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
@@ -391,7 +392,7 @@ class incremental_reader_selector : public reader_selector {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
|
||||
mutation_reader reader =
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, _s, *_pr, _slice, _pc, _fwd, _fwd_mr);
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, _s, *_pr, _slice, _pc, _resource_tracker, _fwd, _fwd_mr);
|
||||
if (sst->is_shared()) {
|
||||
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
|
||||
}
|
||||
@@ -404,6 +405,7 @@ public:
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
@@ -412,6 +414,7 @@ public:
|
||||
, _sstables(std::move(sstables))
|
||||
, _pc(pc)
|
||||
, _slice(slice)
|
||||
, _resource_tracker(std::move(resource_tracker))
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
@@ -484,6 +487,7 @@ class single_key_sstable_reader final : public mutation_reader::impl {
|
||||
// the priority changes.
|
||||
const io_priority_class& _pc;
|
||||
const query::partition_slice& _slice;
|
||||
reader_resource_tracker _resource_tracker;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
public:
|
||||
@@ -494,6 +498,7 @@ public:
|
||||
const dht::partition_range& pr, // must be singular
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _cf(cf)
|
||||
@@ -504,6 +509,7 @@ public:
|
||||
, _sstable_histogram(sstable_histogram)
|
||||
, _pc(pc)
|
||||
, _slice(slice)
|
||||
, _resource_tracker(std::move(resource_tracker))
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd(fwd)
|
||||
{ }
|
||||
@@ -516,7 +522,7 @@ public:
|
||||
return parallel_for_each(std::move(candidates),
|
||||
[this](const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(_trace_state, "Reading key {} from sstable {}", _pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row(_schema, _pr.start()->value(), _slice, _pc, _fwd).then([this](auto smo) {
|
||||
return sstable->read_row(_schema, _pr.start()->value(), _slice, _pc, _resource_tracker, _fwd).then([this](auto smo) {
|
||||
if (smo) {
|
||||
_mutations.emplace_back(std::move(*smo));
|
||||
}
|
||||
@@ -541,20 +547,9 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
|
||||
if (service::get_local_streaming_read_priority().id() == pc.id()) {
|
||||
return _config.streaming_read_concurrency_config;
|
||||
}
|
||||
return _config.read_concurrency_config;
|
||||
}();
|
||||
if (config.sem) {
|
||||
return make_restricted_reader(config, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
};
|
||||
auto& config = service::get_local_streaming_read_priority().id() == pc.id()
|
||||
? _config.streaming_read_concurrency_config
|
||||
: _config.read_concurrency_config;
|
||||
|
||||
// CAVEAT: if make_sstable_reader() is called on a single partition
|
||||
// we want to optimize and read exactly this partition. As a
|
||||
@@ -565,18 +560,46 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
}
|
||||
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
|
||||
|
||||
if (config.resources_sem) {
|
||||
auto ms = mutation_source([&config, sstables=std::move(sstables), this] (
|
||||
schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, reader_resource_tracker(config.resources_sem), std::move(trace_state), fwd);
|
||||
});
|
||||
return make_restricted_reader(config, std::move(ms), std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
} else {
|
||||
return make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, no_resource_tracking(), std::move(trace_state), fwd);
|
||||
}
|
||||
} else {
|
||||
return restrict_reader(make_mutation_reader<combined_mutation_reader>(
|
||||
std::make_unique<incremental_reader_selector>(std::move(s),
|
||||
std::move(sstables),
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr), fwd_mr));
|
||||
if (config.resources_sem) {
|
||||
auto ms = mutation_source([&config, sstables=std::move(sstables)] (
|
||||
schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_mutation_reader<combined_mutation_reader>(
|
||||
std::make_unique<incremental_reader_selector>(std::move(s), std::move(sstables), pr, slice, pc,
|
||||
reader_resource_tracker(config.resources_sem), std::move(trace_state), fwd, fwd_mr),
|
||||
fwd_mr);
|
||||
});
|
||||
return make_restricted_reader(config, std::move(ms), std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
} else {
|
||||
return make_mutation_reader<combined_mutation_reader>(
|
||||
std::make_unique<incremental_reader_selector>(std::move(s), std::move(sstables), pr, slice, pc,
|
||||
no_resource_tracking(), std::move(trace_state), fwd, fwd_mr),
|
||||
fwd_mr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2157,26 +2180,26 @@ database::setup_metrics() {
|
||||
sm::description("Counts the number of times the sstable read queue was overloaded. "
|
||||
"A non-zero value indicates that we have to drop read requests because they arrive faster than we can serve them.")),
|
||||
|
||||
sm::make_gauge("active_reads", [this] { return max_concurrent_reads() - _read_concurrency_sem.current(); },
|
||||
sm::make_gauge("active_reads", [this] { return max_memory_concurrent_reads() - _read_concurrency_sem.current(); },
|
||||
sm::description(seastar::format("Holds the number of currently active read operations. "
|
||||
"If this value gets close to {} we are likely to start dropping new read requests. "
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_concurrent_reads()))),
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_memory_concurrent_reads()))),
|
||||
|
||||
sm::make_gauge("queued_reads", [this] { return _read_concurrency_sem.waiters(); },
|
||||
sm::description("Holds the number of currently queued read operations.")),
|
||||
|
||||
sm::make_gauge("active_reads_streaming", [this] { return max_streaming_concurrent_reads() - _streaming_concurrency_sem.current(); },
|
||||
sm::make_gauge("active_reads_streaming", [this] { return max_memory_streaming_concurrent_reads() - _streaming_concurrency_sem.current(); },
|
||||
sm::description(seastar::format("Holds the number of currently active read operations issued on behalf of streaming "
|
||||
"If this value gets close to {} we are likely to start dropping new read requests. "
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_streaming_concurrent_reads()))),
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_memory_streaming_concurrent_reads()))),
|
||||
|
||||
sm::make_gauge("queued_reads_streaming", [this] { return _streaming_concurrency_sem.waiters(); },
|
||||
sm::description("Holds the number of currently queued read operations on behalf of streaming.")),
|
||||
|
||||
sm::make_gauge("active_reads_system_keyspace", [this] { return max_system_concurrent_reads() - _system_read_concurrency_sem.current(); },
|
||||
sm::make_gauge("active_reads_system_keyspace", [this] { return max_memory_system_concurrent_reads() - _system_read_concurrency_sem.current(); },
|
||||
sm::description(seastar::format("Holds the number of currently active read operations from \"system\" keyspace tables. "
|
||||
"If this value gets close to {} we are likely to start dropping new read requests. "
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_system_concurrent_reads()))),
|
||||
"In that case sstable_read_queue_overloads is going to get a non-zero value.", max_memory_system_concurrent_reads()))),
|
||||
|
||||
sm::make_gauge("queued_reads_system_keyspace", [this] { return _system_read_concurrency_sem.waiters(); },
|
||||
sm::description("Holds the number of currently queued read operations from \"system\" keyspace tables.")),
|
||||
@@ -3396,17 +3419,16 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
}
|
||||
cfg.dirty_memory_manager = &_dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
|
||||
cfg.read_concurrency_config.resources_sem = &_read_concurrency_sem;
|
||||
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
|
||||
// Assume a queued read takes up 10kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 10000;
|
||||
cfg.read_concurrency_config.max_queue_length = 100;
|
||||
cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] {
|
||||
++_stats->sstable_read_queue_overloaded;
|
||||
throw std::runtime_error("sstable inactive read queue overloaded");
|
||||
};
|
||||
// No timeouts or queue length limits - a failure here can kill an entire repair.
|
||||
// Trust the caller to limit concurrency.
|
||||
cfg.streaming_read_concurrency_config.sem = &_streaming_concurrency_sem;
|
||||
cfg.streaming_read_concurrency_config.resources_sem = &_streaming_concurrency_sem;
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
|
||||
@@ -4229,6 +4251,7 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
@@ -4238,6 +4261,7 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
pr,
|
||||
slice,
|
||||
pc,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr), fwd_mr);
|
||||
|
||||
13
database.hh
13
database.hh
@@ -837,6 +837,7 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
@@ -1018,9 +1019,9 @@ public:
|
||||
using timeout_clock = lowres_clock;
|
||||
private:
|
||||
::cf_stats _cf_stats;
|
||||
static constexpr size_t max_concurrent_reads() { return 100; }
|
||||
static constexpr size_t max_streaming_concurrent_reads() { return 10; } // They're rather heavyweight, so limit more
|
||||
static constexpr size_t max_system_concurrent_reads() { return 10; }
|
||||
static size_t max_memory_concurrent_reads() { return memory::stats().total_memory() * 0.02; }
|
||||
static size_t max_memory_streaming_concurrent_reads() { return memory::stats().total_memory() * 0.02; }
|
||||
static size_t max_memory_system_concurrent_reads() { return memory::stats().total_memory() * 0.02; };
|
||||
static constexpr size_t max_concurrent_sstable_loads() { return 3; }
|
||||
struct db_stats {
|
||||
uint64_t total_writes = 0;
|
||||
@@ -1046,10 +1047,10 @@ private:
|
||||
seastar::thread_scheduling_group _background_writer_scheduling_group;
|
||||
flush_cpu_controller _memtable_cpu_controller;
|
||||
|
||||
semaphore _read_concurrency_sem{max_concurrent_reads()};
|
||||
semaphore _streaming_concurrency_sem{max_streaming_concurrent_reads()};
|
||||
semaphore _read_concurrency_sem{max_memory_concurrent_reads()};
|
||||
semaphore _streaming_concurrency_sem{max_memory_streaming_concurrent_reads()};
|
||||
restricted_mutation_reader_config _read_concurrency_config;
|
||||
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
|
||||
semaphore _system_read_concurrency_sem{max_memory_system_concurrent_reads()};
|
||||
restricted_mutation_reader_config _system_read_concurrency_config;
|
||||
|
||||
semaphore _sstable_load_concurrency_sem{max_concurrent_sstable_loads()};
|
||||
|
||||
@@ -1577,7 +1577,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
kscfg.enable_commitlog = !volatile_testing_only;
|
||||
kscfg.enable_cache = true;
|
||||
// don't make system keyspace reads wait for user reads
|
||||
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
|
||||
kscfg.read_concurrency_config.resources_sem = &db.system_keyspace_read_concurrency_sem();
|
||||
kscfg.read_concurrency_config.timeout = {};
|
||||
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
|
||||
// don't make system keyspace writes wait for user writes (if under pressure)
|
||||
|
||||
@@ -253,47 +253,188 @@ mutation_reader make_empty_reader() {
|
||||
return make_mutation_reader<empty_reader>();
|
||||
}
|
||||
|
||||
// A file that tracks the memory usage of buffers resulting from read
|
||||
// operations.
|
||||
class tracking_file_impl : public file_impl {
|
||||
file _tracked_file;
|
||||
semaphore* _semaphore;
|
||||
|
||||
// Shouldn't be called if semaphore is NULL.
|
||||
temporary_buffer<uint8_t> make_tracked_buf(temporary_buffer<uint8_t> buf) {
|
||||
return seastar::temporary_buffer<uint8_t>(buf.get_write(),
|
||||
buf.size(),
|
||||
make_deleter(buf.release(), std::bind(&semaphore::signal, _semaphore, buf.size())));
|
||||
}
|
||||
|
||||
public:
|
||||
tracking_file_impl(file file, reader_resource_tracker resource_tracker)
|
||||
: _tracked_file(std::move(file))
|
||||
, _semaphore(resource_tracker.get_semaphore()) {
|
||||
}
|
||||
|
||||
tracking_file_impl(const tracking_file_impl&) = delete;
|
||||
tracking_file_impl& operator=(const tracking_file_impl&) = delete;
|
||||
tracking_file_impl(tracking_file_impl&&) = default;
|
||||
tracking_file_impl& operator=(tracking_file_impl&&) = default;
|
||||
|
||||
virtual future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->write_dma(pos, buffer, len, pc);
|
||||
}
|
||||
|
||||
virtual future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->write_dma(pos, std::move(iov), pc);
|
||||
}
|
||||
|
||||
virtual future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->read_dma(pos, buffer, len, pc);
|
||||
}
|
||||
|
||||
virtual future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->read_dma(pos, iov, pc);
|
||||
}
|
||||
|
||||
virtual future<> flush(void) override {
|
||||
return get_file_impl(_tracked_file)->flush();
|
||||
}
|
||||
|
||||
virtual future<struct stat> stat(void) override {
|
||||
return get_file_impl(_tracked_file)->stat();
|
||||
}
|
||||
|
||||
virtual future<> truncate(uint64_t length) override {
|
||||
return get_file_impl(_tracked_file)->truncate(length);
|
||||
}
|
||||
|
||||
virtual future<> discard(uint64_t offset, uint64_t length) override {
|
||||
return get_file_impl(_tracked_file)->discard(offset, length);
|
||||
}
|
||||
|
||||
virtual future<> allocate(uint64_t position, uint64_t length) override {
|
||||
return get_file_impl(_tracked_file)->allocate(position, length);
|
||||
}
|
||||
|
||||
virtual future<uint64_t> size(void) override {
|
||||
return get_file_impl(_tracked_file)->size();
|
||||
}
|
||||
|
||||
virtual future<> close() override {
|
||||
return get_file_impl(_tracked_file)->close();
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<file_handle_impl> dup() override {
|
||||
return get_file_impl(_tracked_file)->dup();
|
||||
}
|
||||
|
||||
virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override {
|
||||
return get_file_impl(_tracked_file)->list_directory(std::move(next));
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) override {
|
||||
return get_file_impl(_tracked_file)->dma_read_bulk(offset, range_size, pc).then([this] (temporary_buffer<uint8_t> buf) {
|
||||
if (_semaphore) {
|
||||
buf = make_tracked_buf(std::move(buf));
|
||||
_semaphore->consume(buf.size());
|
||||
}
|
||||
return make_ready_future<temporary_buffer<uint8_t>>(std::move(buf));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
file reader_resource_tracker::track(file f) const {
|
||||
return file(make_shared<tracking_file_impl>(f, *this));
|
||||
}
|
||||
|
||||
|
||||
class restricting_mutation_reader : public mutation_reader::impl {
|
||||
struct mutation_source_and_params {
|
||||
mutation_source _ms;
|
||||
schema_ptr _s;
|
||||
std::reference_wrapper<const dht::partition_range> _range;
|
||||
std::reference_wrapper<const query::partition_slice> _slice;
|
||||
std::reference_wrapper<const io_priority_class> _pc;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
|
||||
mutation_reader operator()() {
|
||||
return _ms(std::move(_s), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr);
|
||||
}
|
||||
};
|
||||
|
||||
const restricted_mutation_reader_config& _config;
|
||||
unsigned _weight = 0;
|
||||
bool _waited = false;
|
||||
mutation_reader _base;
|
||||
boost::variant<mutation_source_and_params, mutation_reader> _reader_or_mutation_source;
|
||||
|
||||
static const std::size_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
future<> create_reader() {
|
||||
auto f = _config.timeout.count() != 0
|
||||
? _config.resources_sem->wait(_config.timeout, new_reader_base_cost)
|
||||
: _config.resources_sem->wait(new_reader_base_cost);
|
||||
|
||||
return f.then([this] {
|
||||
mutation_reader reader = boost::get<mutation_source_and_params>(_reader_or_mutation_source)();
|
||||
_reader_or_mutation_source = std::move(reader);
|
||||
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
public:
|
||||
restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base)
|
||||
: _config(config), _weight(weight), _base(std::move(base)) {
|
||||
if (_config.sem->waiters() >= _config.max_queue_length) {
|
||||
restricting_mutation_reader(const restricted_mutation_reader_config& config,
|
||||
mutation_source ms,
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _config(config)
|
||||
, _reader_or_mutation_source(
|
||||
mutation_source_and_params{std::move(ms), std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr}) {
|
||||
if (_config.resources_sem->waiters() >= _config.max_queue_length) {
|
||||
_config.raise_queue_overloaded_exception();
|
||||
}
|
||||
}
|
||||
~restricting_mutation_reader() {
|
||||
if (_waited) {
|
||||
_config.sem->signal(_weight);
|
||||
if (boost::get<mutation_reader>(&_reader_or_mutation_source)) {
|
||||
_config.resources_sem->signal(new_reader_base_cost);
|
||||
}
|
||||
}
|
||||
future<streamed_mutation_opt> operator()() override {
|
||||
// FIXME: we should defer freeing until the mutation is freed, perhaps,
|
||||
// rather than just returned
|
||||
if (_waited) {
|
||||
return _base();
|
||||
if (auto* reader = boost::get<mutation_reader>(&_reader_or_mutation_source)) {
|
||||
return (*reader)();
|
||||
}
|
||||
auto waited = _config.timeout.count() != 0
|
||||
? _config.sem->wait(_config.timeout, _weight)
|
||||
: _config.sem->wait(_weight);
|
||||
return waited.then([this] {
|
||||
_waited = true;
|
||||
return _base();
|
||||
|
||||
return create_reader().then([this] {
|
||||
return boost::get<mutation_reader>(_reader_or_mutation_source)();
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
return _base.fast_forward_to(pr);
|
||||
if (auto* reader = boost::get<mutation_reader>(&_reader_or_mutation_source)) {
|
||||
return reader->fast_forward_to(pr);
|
||||
}
|
||||
|
||||
return create_reader().then([this, &pr] {
|
||||
return boost::get<mutation_reader>(_reader_or_mutation_source).fast_forward_to(pr);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
mutation_reader
|
||||
make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) {
|
||||
return make_mutation_reader<restricting_mutation_reader>(config, weight, std::move(base));
|
||||
make_restricted_reader(const restricted_mutation_reader_config& config,
|
||||
mutation_source ms,
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_mutation_reader<restricting_mutation_reader>(config, std::move(ms), std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
class multi_range_mutation_reader : public mutation_reader::impl {
|
||||
|
||||
@@ -374,8 +374,33 @@ public:
|
||||
mutation_source make_empty_mutation_source();
|
||||
snapshot_source make_empty_snapshot_source();
|
||||
|
||||
|
||||
class reader_resource_tracker {
|
||||
semaphore* _sem = nullptr;
|
||||
public:
|
||||
reader_resource_tracker() = default;
|
||||
explicit reader_resource_tracker(semaphore* sem)
|
||||
: _sem(sem) {
|
||||
}
|
||||
|
||||
bool operator==(const reader_resource_tracker& other) const {
|
||||
return _sem == other._sem;
|
||||
}
|
||||
|
||||
file track(file f) const;
|
||||
|
||||
semaphore* get_semaphore() const {
|
||||
return _sem;
|
||||
}
|
||||
};
|
||||
|
||||
inline reader_resource_tracker no_resource_tracking() {
|
||||
return reader_resource_tracker(nullptr);
|
||||
}
|
||||
|
||||
|
||||
struct restricted_mutation_reader_config {
|
||||
semaphore* sem = nullptr;
|
||||
semaphore* resources_sem = nullptr;
|
||||
std::chrono::nanoseconds timeout = {};
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max();
|
||||
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
|
||||
@@ -385,11 +410,25 @@ struct restricted_mutation_reader_config {
|
||||
}
|
||||
};
|
||||
|
||||
// Restricts a given `mutation_reader` to a concurrency limited according to settings in
|
||||
// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number
|
||||
// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for
|
||||
// inactive readers.
|
||||
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base);
|
||||
// Creates a restricted reader whose resource usages will be tracked
|
||||
// during it's lifetime. If there are not enough resources (dues to
|
||||
// existing readers) to create the new reader, it's construction will
|
||||
// be deferred until there are sufficient resources.
|
||||
// The internal reader once created will not be hindered in it's work
|
||||
// anymore. Reusorce limits are determined by the config which contains
|
||||
// a semaphore to track and limit the memory usage of readers. It also
|
||||
// contains a timeout and a maximum queue size for inactive readers
|
||||
// whose construction is blocked.
|
||||
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config,
|
||||
mutation_source ms,
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range = query::full_partition_range,
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
|
||||
|
||||
|
||||
template<>
|
||||
struct move_constructor_disengages<mutation_source> {
|
||||
|
||||
@@ -34,10 +34,11 @@ public:
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _sst(sst)
|
||||
, _smr(sst->read_range_rows(std::move(s), pr, slice, pc, fwd, fwd_mr)) {
|
||||
, _smr(sst->read_range_rows(std::move(s), pr, slice, pc, std::move(resource_tracker), fwd, fwd_mr)) {
|
||||
}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _smr.read();
|
||||
|
||||
@@ -242,6 +242,7 @@ private:
|
||||
query::full_partition_range,
|
||||
query::full_slice,
|
||||
service::get_local_compaction_priority(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
@@ -55,6 +55,7 @@ private:
|
||||
schema_ptr _schema;
|
||||
const io_priority_class& _pc;
|
||||
const query::partition_slice& _slice;
|
||||
reader_resource_tracker _resource_tracker;
|
||||
bool _out_of_range = false;
|
||||
stdx::optional<query::clustering_key_filter_ranges> _ck_ranges;
|
||||
stdx::optional<clustering_ranges_walker> _ck_ranges_walker;
|
||||
@@ -306,18 +307,21 @@ public:
|
||||
mp_row_consumer(const schema_ptr schema,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _schema(schema)
|
||||
, _pc(pc)
|
||||
, _slice(slice)
|
||||
, _resource_tracker(std::move(resource_tracker))
|
||||
, _fwd(fwd)
|
||||
, _range_tombstones(*_schema)
|
||||
{ }
|
||||
|
||||
mp_row_consumer(const schema_ptr schema,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: mp_row_consumer(schema, query::full_slice, pc, fwd) { }
|
||||
: mp_row_consumer(schema, query::full_slice, pc, std::move(resource_tracker), fwd) { }
|
||||
|
||||
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
|
||||
if (!_is_mutation_end) {
|
||||
@@ -672,6 +676,10 @@ public:
|
||||
return _pc;
|
||||
}
|
||||
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return _resource_tracker;
|
||||
}
|
||||
|
||||
// Returns true if the consumer is positioned at partition boundary,
|
||||
// meaning that after next read either get_mutation() will
|
||||
// return engaged mutation or end of stream was reached.
|
||||
@@ -977,11 +985,14 @@ sstables::sstable::read_row(schema_ptr schema,
|
||||
const sstables::key& key,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
{
|
||||
return do_with(dht::global_partitioner().decorate_key(*schema, key.to_partition_key(*schema)), [this, schema, &slice, &pc, fwd] (auto& dk) {
|
||||
return this->read_row(schema, dk, slice, pc, fwd);
|
||||
});
|
||||
return do_with(dht::global_partitioner().decorate_key(*schema,
|
||||
key.to_partition_key(*schema)),
|
||||
[this, schema, &slice, &pc, resource_tracker = std::move(resource_tracker), fwd] (auto& dk) {
|
||||
return this->read_row(schema, dk, slice, pc, std::move(resource_tracker), fwd);
|
||||
});
|
||||
}
|
||||
|
||||
static inline void ensure_len(bytes_view v, size_t len) {
|
||||
@@ -1048,17 +1059,19 @@ private:
|
||||
public:
|
||||
impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread, uint64_t last_end,
|
||||
const io_priority_class &pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), toread, last_end, &pc, fwd] {
|
||||
auto consumer = mp_row_consumer(s, query::full_slice, pc, fwd);
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), toread, last_end, &pc, resource_tracker = std::move(resource_tracker), fwd] {
|
||||
auto consumer = mp_row_consumer(s, query::full_slice, pc, std::move(resource_tracker), fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), std::move(toread), last_end);
|
||||
return make_ready_future<lw_shared_ptr<sstable_data_source>>(std::move(ds));
|
||||
}) { }
|
||||
impl(shared_sstable sst, schema_ptr schema,
|
||||
const io_priority_class &pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), &pc, fwd] {
|
||||
auto consumer = mp_row_consumer(s, query::full_slice, pc, fwd);
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), &pc, resource_tracker = std::move(resource_tracker), fwd] {
|
||||
auto consumer = mp_row_consumer(s, query::full_slice, pc, std::move(resource_tracker), fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer));
|
||||
return make_ready_future<lw_shared_ptr<sstable_data_source>>(std::move(ds));
|
||||
}) { }
|
||||
@@ -1067,16 +1080,17 @@ public:
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd,
|
||||
::mutation_reader::forwarding fwd_mr)
|
||||
: _get_data_source([this, pr, sst = std::move(sst), s = std::move(schema), &pc, &slice, fwd, fwd_mr] () mutable {
|
||||
: _get_data_source([this, pr, sst = std::move(sst), s = std::move(schema), &pc, &slice, resource_tracker = std::move(resource_tracker), fwd, fwd_mr] () mutable {
|
||||
auto lh_index = sst->get_index_reader(pc); // lh = left hand
|
||||
auto rh_index = sst->get_index_reader(pc);
|
||||
auto f = seastar::when_all_succeed(lh_index->advance_to_start(pr), rh_index->advance_to_end(pr));
|
||||
return f.then([this, lh_index = std::move(lh_index), rh_index = std::move(rh_index), sst = std::move(sst), s = std::move(s), &pc, &slice, fwd, fwd_mr] () mutable {
|
||||
return f.then([this, lh_index = std::move(lh_index), rh_index = std::move(rh_index), sst = std::move(sst), s = std::move(s), &pc, &slice, resource_tracker = std::move(resource_tracker), fwd, fwd_mr] () mutable {
|
||||
sstable::disk_read_range drr{lh_index->data_file_position(),
|
||||
rh_index->data_file_position()};
|
||||
auto consumer = mp_row_consumer(s, slice, pc, fwd);
|
||||
auto consumer = mp_row_consumer(s, slice, pc, std::move(resource_tracker), fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), drr, (fwd_mr ? sst->data_size() : drr.end), std::move(lh_index), std::move(rh_index));
|
||||
ds->_index_in_current_partition = true;
|
||||
ds->_will_likely_slice = sstable_data_source::will_likely_slice(slice);
|
||||
@@ -1238,7 +1252,7 @@ future<> mutation_reader::fast_forward_to(const dht::partition_range& pr) {
|
||||
}
|
||||
|
||||
mutation_reader sstable::read_rows(schema_ptr schema, const io_priority_class& pc, streamed_mutation::forwarding fwd) {
|
||||
return std::make_unique<mutation_reader::impl>(shared_from_this(), schema, pc, fwd);
|
||||
return std::make_unique<mutation_reader::impl>(shared_from_this(), schema, pc, no_resource_tracking(), fwd);
|
||||
}
|
||||
|
||||
static
|
||||
@@ -1256,11 +1270,12 @@ sstables::sstable::read_row(schema_ptr schema,
|
||||
dht::ring_position_view key,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd)
|
||||
{
|
||||
auto lh_index = get_index_reader(pc);
|
||||
auto f = lh_index->advance_and_check_if_present(key);
|
||||
return f.then([this, &slice, &pc, fwd, lh_index = std::move(lh_index), s = std::move(schema), key] (bool present) mutable {
|
||||
return f.then([this, &slice, &pc, resource_tracker = std::move(resource_tracker), fwd, lh_index = std::move(lh_index), s = std::move(schema), key] (bool present) mutable {
|
||||
if (!present) {
|
||||
_filter_tracker.add_false_positive();
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
@@ -1270,8 +1285,8 @@ sstables::sstable::read_row(schema_ptr schema,
|
||||
|
||||
auto rh_index = std::make_unique<index_reader>(*lh_index);
|
||||
auto f = advance_to_upper_bound(*rh_index, *_schema, slice, key);
|
||||
return f.then([this, &slice, &pc, fwd, lh_index = std::move(lh_index), rh_index = std::move(rh_index), s = std::move(s)] () mutable {
|
||||
auto consumer = mp_row_consumer(s, slice, pc, fwd);
|
||||
return f.then([this, &slice, &pc, resource_tracker = std::move(resource_tracker), fwd, lh_index = std::move(lh_index), rh_index = std::move(rh_index), s = std::move(s)] () mutable {
|
||||
auto consumer = mp_row_consumer(s, slice, pc, std::move(resource_tracker), fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(sstable_data_source::single_partition_tag(), std::move(s),
|
||||
shared_from_this(), std::move(consumer), std::move(lh_index), std::move(rh_index));
|
||||
ds->_will_likely_slice = sstable_data_source::will_likely_slice(slice);
|
||||
@@ -1286,10 +1301,11 @@ sstable::read_range_rows(schema_ptr schema,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
reader_resource_tracker resource_tracker,
|
||||
streamed_mutation::forwarding fwd,
|
||||
::mutation_reader::forwarding fwd_mr) {
|
||||
return std::make_unique<mutation_reader::impl>(
|
||||
shared_from_this(), std::move(schema), range, slice, pc, fwd, fwd_mr);
|
||||
shared_from_this(), std::move(schema), range, slice, pc, std::move(resource_tracker), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -458,14 +458,14 @@ data_consume_context sstable::data_consume_rows(
|
||||
// returned context, and may make small skips.
|
||||
return std::make_unique<data_consume_context::impl>(shared_from_this(),
|
||||
consumer, data_stream(toread.start, last_end - toread.start,
|
||||
consumer.io_priority(), _partition_range_history), toread.start, toread.end - toread.start);
|
||||
consumer.io_priority(), consumer.resource_tracker(), _partition_range_history), toread.start, toread.end - toread.start);
|
||||
}
|
||||
|
||||
data_consume_context sstable::data_consume_single_partition(
|
||||
row_consumer& consumer, sstable::disk_read_range toread) {
|
||||
return std::make_unique<data_consume_context::impl>(shared_from_this(),
|
||||
consumer, data_stream(toread.start, toread.end - toread.start,
|
||||
consumer.io_priority(), _single_partition_history), toread.start, toread.end - toread.start);
|
||||
consumer.io_priority(), consumer.resource_tracker(), _single_partition_history), toread.start, toread.end - toread.start);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "consumer.hh"
|
||||
#include "sstables/types.hh"
|
||||
|
||||
class reader_resource_tracker;
|
||||
|
||||
// sstables::data_consume_row feeds the contents of a single row into a
|
||||
// row_consumer object:
|
||||
//
|
||||
@@ -94,5 +96,8 @@ public:
|
||||
// Under which priority class to place I/O coming from this consumer
|
||||
virtual const io_priority_class& io_priority() = 0;
|
||||
|
||||
// The restriction that applies to this consumer
|
||||
virtual reader_resource_tracker resource_tracker() = 0;
|
||||
|
||||
virtual ~row_consumer() { }
|
||||
};
|
||||
|
||||
@@ -2553,22 +2553,27 @@ sstable::component_type sstable::component_from_sstring(sstring &s) {
|
||||
}
|
||||
}
|
||||
|
||||
input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc, lw_shared_ptr<file_input_stream_history> history) {
|
||||
input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc, reader_resource_tracker resource_tracker, lw_shared_ptr<file_input_stream_history> history) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.io_priority_class = pc;
|
||||
options.read_ahead = 4;
|
||||
options.dynamic_adjustments = std::move(history);
|
||||
|
||||
auto f = resource_tracker.track(_data_file);
|
||||
|
||||
input_stream<char> stream;
|
||||
if (_components->compression) {
|
||||
return make_compressed_file_input_stream(_data_file, &_components->compression,
|
||||
return make_compressed_file_input_stream(f, &_components->compression,
|
||||
pos, len, std::move(options));
|
||||
} else {
|
||||
return make_file_input_stream(_data_file, pos, len, std::move(options));
|
||||
|
||||
}
|
||||
|
||||
return make_file_input_stream(f, pos, len, std::move(options));
|
||||
}
|
||||
|
||||
future<temporary_buffer<char>> sstable::data_read(uint64_t pos, size_t len, const io_priority_class& pc) {
|
||||
return do_with(data_stream(pos, len, pc, { }), [len] (auto& stream) {
|
||||
return do_with(data_stream(pos, len, pc, no_resource_tracking(), {}), [len] (auto& stream) {
|
||||
return stream.read_exactly(len).finally([&stream] {
|
||||
return stream.close();
|
||||
});
|
||||
@@ -3042,7 +3047,7 @@ public:
|
||||
return make_ready_future<streamed_mutation_opt>(stdx::nullopt);
|
||||
}
|
||||
auto sst = std::move(_sst);
|
||||
return sst->read_row(_s, _key, _slice, _pc, _fwd);
|
||||
return sst->read_row(_s, _key, _slice, _pc, no_resource_tracking(), _fwd);
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||
throw std::bad_function_call();
|
||||
@@ -3065,7 +3070,7 @@ mutation_source sstable::as_mutation_source() {
|
||||
const dht::ring_position& pos = range.start()->value();
|
||||
return make_mutation_reader<single_partition_reader_adaptor>(sst, s, pos, slice, pc, fwd);
|
||||
} else {
|
||||
return make_mutation_reader<range_reader_adaptor>(sst, sst->read_range_rows(s, range, slice, pc, fwd, fwd_mr));
|
||||
return make_mutation_reader<range_reader_adaptor>(sst, sst->read_range_rows(s, range, slice, pc, no_resource_tracking(), fwd, fwd_mr));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -292,6 +292,7 @@ public:
|
||||
dht::ring_position_view key,
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
reader_resource_tracker resource_tracker = no_resource_tracking(),
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
|
||||
future<streamed_mutation_opt> read_row(
|
||||
@@ -299,6 +300,7 @@ public:
|
||||
const sstables::key& key,
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
reader_resource_tracker resource_tracker = no_resource_tracking(),
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
|
||||
// Returns a mutation_reader for given range of partitions
|
||||
@@ -307,6 +309,7 @@ public:
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
reader_resource_tracker resource_tracker = no_resource_tracking(),
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding fwd_mr = ::mutation_reader::forwarding::yes);
|
||||
|
||||
@@ -594,7 +597,7 @@ private:
|
||||
// about the buffer size to read, and where exactly to stop reading
|
||||
// (even when a large buffer size is used).
|
||||
input_stream<char> data_stream(uint64_t pos, size_t len, const io_priority_class& pc,
|
||||
lw_shared_ptr<file_input_stream_history> history);
|
||||
reader_resource_tracker resource_tracker, lw_shared_ptr<file_input_stream_history> history);
|
||||
|
||||
// Read exactly the specific byte range from the data file (after
|
||||
// uncompression, if the file is compressed). This can be used to read
|
||||
|
||||
@@ -151,6 +151,7 @@ SEASTAR_TEST_CASE(combined_mutation_reader_test) {
|
||||
query::full_partition_range,
|
||||
query::full_slice,
|
||||
seastar::default_priority_class(),
|
||||
no_resource_tracking(),
|
||||
streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes));
|
||||
}
|
||||
@@ -163,6 +164,7 @@ SEASTAR_TEST_CASE(combined_mutation_reader_test) {
|
||||
query::full_partition_range,
|
||||
query::full_slice,
|
||||
seastar::default_priority_class(),
|
||||
no_resource_tracking(),
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::yes);
|
||||
|
||||
@@ -442,6 +442,9 @@ public:
|
||||
virtual const io_priority_class& io_priority() override {
|
||||
return default_priority_class();
|
||||
}
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return no_resource_tracking();
|
||||
}
|
||||
virtual void reset(indexable_element) override { }
|
||||
};
|
||||
|
||||
@@ -554,6 +557,9 @@ public:
|
||||
virtual const io_priority_class& io_priority() override {
|
||||
return default_priority_class();
|
||||
}
|
||||
virtual reader_resource_tracker resource_tracker() override {
|
||||
return no_resource_tracking();
|
||||
}
|
||||
virtual void reset(indexable_element) override { }
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user