treewide: replace reader_resource_tracer with reader_permit

The former was never really more than a reader_permit with one
additional method. Currently using it doesn't even save one from any
includes. Now that readers will be using reader_permit we would have to
pass down both to mutation_source. Instead get rid of
reader_resource_tracker and just use reader_permit. Instead of making it
a last and optional parameter that is easy to ignore, make it a
first class parameter, right after schema, to signify that permits are
now a prominent part of the reader API.

This -- mostly mechanical -- patch essentially refactors mutation_source
to ask for the reader_permit instead of reader_resource_tracking and
updates all usage sites.
This commit is contained in:
Botond Dénes
2019-11-15 09:54:27 +02:00
parent dea24ca859
commit dfc8b2fc45
38 changed files with 297 additions and 266 deletions

View File

@@ -1960,6 +1960,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
}
};
auto ms = mutation_source([&db, &partitioner] (schema_ptr s,
reader_permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,

View File

@@ -1043,22 +1043,22 @@ using sstable_reader_factory_type = std::function<flat_mutation_reader(sstables:
// Filters out mutation that doesn't belong to current shard.
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,

View File

@@ -58,6 +58,7 @@ private:
struct virtual_reader {
flat_mutation_reader operator()(schema_ptr schema,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -188,6 +188,7 @@ public:
flat_mutation_reader operator()(
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -1231,11 +1231,11 @@ void view_builder::initialize_reader_at_current_token(build_step& step) {
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
step.reader = make_local_shard_sstable_reader(
step.base->schema(),
no_reader_permit(),
make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())),
step.prange,
step.pslice,
default_priority_class(),
no_resource_tracking(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);

View File

@@ -37,7 +37,7 @@ future<> view_update_generator::start() {
auto& [sst, t] = _sstables_with_tables.front();
try {
schema_ptr s = t->schema();
flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s);
flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s, no_reader_permit());
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, _proxy, sst, _as), db::no_timeout);
if (result == stop_iteration::yes) {
break;

View File

@@ -613,7 +613,7 @@ public:
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
if (!_reader) {
_reader = _source.make_reader(_schema, pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no,
_reader = _source.make_reader(_schema, no_reader_permit(), pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes);
_end_of_stream = false;
return make_ready_future<>();
@@ -663,7 +663,7 @@ public:
tracing::trace_state_ptr trace_state)
: impl(s)
, _generator(std::move(generator))
, _reader(source.make_reader(s, first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
, _reader(source.make_reader(s, no_reader_permit(), first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
{
}
@@ -737,7 +737,7 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
return make_empty_flat_reader(std::move(s));
}
} else if (ranges.size() == 1) {
return source.make_reader(std::move(s), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
return source.make_reader(std::move(s), no_reader_permit(), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
} else {
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(source),
ranges.front(), adapter(std::next(ranges.cbegin()), ranges.cend()), slice, pc, std::move(trace_state));

View File

@@ -118,6 +118,7 @@ public:
flat_mutation_reader operator()(
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -346,7 +346,7 @@ protected:
const io_priority_class& pc,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
auto ret = _memtable->_underlying->make_reader(_schema, delegate, slice, pc, nullptr, fwd, fwd_mr);
auto ret = _memtable->_underlying->make_reader(_schema, no_reader_permit(), delegate, slice, pc, nullptr, fwd, fwd_mr);
_memtable = {};
_last = {};
return ret;
@@ -742,6 +742,7 @@ logalloc::occupancy_stats memtable::occupancy() const {
mutation_source memtable::as_data_source() {
return mutation_source([mt = shared_from_this()] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -295,7 +295,8 @@ flat_mutation_reader read_context::create_reader(
rm.rparts->read_operation = table.read_in_progress();
rm.state = reader_state::used;
return table.as_mutation_source().make_reader(std::move(schema), *rm.rparts->range, *rm.rparts->slice, pc, std::move(trace_state));
return table.as_mutation_source().make_reader(std::move(schema), no_reader_permit(), *rm.rparts->range, *rm.rparts->slice, pc,
std::move(trace_state));
}
void read_context::destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept {
@@ -584,6 +585,7 @@ static future<reconcilable_result> do_query_mutations(
return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout,
accounter = std::move(accounter)] () mutable {
auto ms = mutation_source([&] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
const io_priority_class& pc,
@@ -592,8 +594,8 @@ static future<reconcilable_result> do_query_mutations(
mutation_reader::forwarding fwd_mr) {
return make_multishard_combining_reader(ctx, dht::global_partitioner(), std::move(s), pr, ps, pc, std::move(trace_state), fwd_mr);
});
auto reader = make_flat_multi_range_reader(s, std::move(ms), ranges, cmd.slice, service::get_local_sstable_query_read_priority(),
trace_state, mutation_reader::forwarding::no);
auto reader = make_flat_multi_range_reader(s, std::move(ms), ranges, cmd.slice,
service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no);
auto compaction_state = make_lw_shared<compact_for_mutation_query_state>(*s, cmd.timestamp, cmd.slice, cmd.row_limit,
cmd.partition_limit);

View File

@@ -2511,7 +2511,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
const query::partition_slice& slice,
tracing::trace_state_ptr trace_ptr)
: range(dht::partition_range::make_singular(dk))
, reader(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(),
, reader(source.make_reader(s, no_reader_permit(), range, slice, service::get_local_sstable_query_read_priority(),
std::move(trace_ptr), streamed_mutation::forwarding::no,
mutation_reader::forwarding::no))
{ }

View File

@@ -673,8 +673,8 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
streamed_mutation::forwarding _fwd;
mutation_reader::forwarding _fwd_mr;
flat_mutation_reader operator()(reader_resource_tracker tracker) {
return _ms.make_reader(std::move(_s), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr, tracker);
flat_mutation_reader operator()(reader_permit permit) {
return _ms.make_reader(std::move(_s), std::move(permit), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr);
}
};
@@ -683,7 +683,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
mutation_source_and_params reader_factory;
};
struct admitted_state {
reader_permit permit;
flat_mutation_reader reader;
};
std::variant<pending_state, admitted_state> _state;
@@ -705,7 +704,7 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
return std::get<pending_state>(_state).semaphore.wait_admission(new_reader_base_cost,
timeout).then([this, fn = std::move(fn)] (reader_permit permit) mutable {
auto reader_factory = std::move(std::get<pending_state>(_state).reader_factory);
_state.emplace<admitted_state>(admitted_state{permit, reader_factory(reader_resource_tracker(permit))});
_state.emplace<admitted_state>(admitted_state{reader_factory(std::move(permit))});
return fn(std::get<admitted_state>(_state).reader);
});
}
@@ -786,13 +785,13 @@ snapshot_source make_empty_snapshot_source() {
mutation_source make_empty_mutation_source() {
return mutation_source([](schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding,
reader_resource_tracker) {
mutation_reader::forwarding) {
return make_empty_flat_reader(s);
}, [] {
return [] (const dht::decorated_key& key) {
@@ -803,6 +802,7 @@ mutation_source make_empty_mutation_source() {
mutation_source make_combined_mutation_source(std::vector<mutation_source> addends) {
return mutation_source([addends = std::move(addends)] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -811,7 +811,7 @@ mutation_source make_combined_mutation_source(std::vector<mutation_source> adden
std::vector<flat_mutation_reader> rd;
rd.reserve(addends.size());
for (auto&& ms : addends) {
rd.emplace_back(ms.make_reader(s, pr, slice, pc, tr, fwd));
rd.emplace_back(ms.make_reader(s, permit, pr, slice, pc, tr, fwd));
}
return make_combined_reader(s, std::move(rd), fwd);
});

View File

@@ -171,13 +171,13 @@ class mutation_source {
using partition_range = const dht::partition_range&;
using io_priority = const io_priority_class&;
using flat_reader_factory_type = std::function<flat_mutation_reader(schema_ptr,
reader_permit,
partition_range,
const query::partition_slice&,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
reader_resource_tracker)>;
mutation_reader::forwarding)>;
// We could have our own version of std::function<> that is nothrow
// move constructible and save some indirection and allocation.
// Probably not worth the effort though.
@@ -193,70 +193,54 @@ public:
, _presence_checker_factory(make_lw_shared(std::move(pcf)))
{ }
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&, io_priority,
tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding)> fn,
std::function<partition_presence_checker()> pcf = [] { return make_default_partition_presence_checker(); })
: mutation_source([fn = std::move(fn)] (schema_ptr s,
partition_range range,
const query::partition_slice& slice,
io_priority pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
reader_resource_tracker) {
return fn(s, range, slice, pc, std::move(tr), fwd, fwd_mr);
}
, std::move(pcf))
{ }
// For sources which don't care about the mutation_reader::forwarding flag (always fast forwardable)
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&, io_priority,
mutation_source(std::function<flat_mutation_reader(schema_ptr, reader_permit, partition_range, const query::partition_slice&, io_priority,
tracing::trace_state_ptr, streamed_mutation::forwarding)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding,
reader_resource_tracker) {
return fn(s, range, slice, pc, std::move(tr), fwd);
mutation_reader::forwarding) {
return fn(std::move(s), std::move(permit), range, slice, pc, std::move(tr), fwd);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&, io_priority)> fn)
mutation_source(std::function<flat_mutation_reader(schema_ptr, reader_permit, partition_range, const query::partition_slice&, io_priority)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding,
reader_resource_tracker) {
mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range, slice, pc);
return fn(std::move(s), std::move(permit), range, slice, pc);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range, const query::partition_slice&)> fn)
mutation_source(std::function<flat_mutation_reader(schema_ptr, reader_permit, partition_range, const query::partition_slice&)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding,
reader_resource_tracker) {
mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range, slice);
return fn(std::move(s), std::move(permit), range, slice);
}) {}
mutation_source(std::function<flat_mutation_reader(schema_ptr, partition_range range)> fn)
mutation_source(std::function<flat_mutation_reader(schema_ptr, reader_permit, partition_range range)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice&,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding,
reader_resource_tracker) {
mutation_reader::forwarding) {
assert(!fwd);
return fn(s, range);
return fn(std::move(s), std::move(permit), range);
}) {}
mutation_source(const mutation_source& other) = default;
@@ -271,24 +255,25 @@ public:
flat_mutation_reader
make_reader(
schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
reader_resource_tracker tracker = no_resource_tracking()) const
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const
{
return (*_fn)(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr, tracker);
return (*_fn)(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
flat_mutation_reader
make_reader(
schema_ptr s,
reader_permit permit = no_reader_permit(),
partition_range range = query::full_partition_range) const
{
auto& full_slice = s->full_slice();
return this->make_reader(std::move(s), range, full_slice);
return this->make_reader(std::move(s), std::move(permit), range, full_slice);
}
partition_presence_checker make_partition_presence_checker() {

View File

@@ -155,7 +155,7 @@ public:
: _schema(schema)
, _range(std::make_unique<dht::partition_range>(std::move(range)))
, _slice(std::make_unique<query::partition_slice>(std::move(slice)))
, _reader(ms.make_reader(schema, *_range, *_slice, pc, std::move(trace_ptr),
, _reader(ms.make_reader(schema, no_reader_permit(), *_range, *_slice, pc, std::move(trace_ptr),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
, _compaction_state(make_lw_shared<compact_for_query_state<OnlyLive>>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) {
}

View File

@@ -47,7 +47,7 @@ using namespace cache;
flat_mutation_reader
row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) {
ctx.on_underlying_created();
return src.make_reader(_schema, pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
return src.make_reader(_schema, no_reader_permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
}
static thread_local mutation_application_stats dummy_app_stats;

View File

@@ -710,11 +710,11 @@ public:
flat_mutation_reader make_sstable_reader() const override {
return ::make_local_shard_sstable_reader(_schema,
no_reader_permit(),
_compacting,
query::full_partition_range,
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
@@ -967,11 +967,11 @@ public:
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader() const override {
return ::make_range_sstable_reader(_schema,
no_reader_permit(),
_compacting,
query::full_partition_range,
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);

View File

@@ -180,14 +180,14 @@ inline data_consume_context<DataConsumeRowsContext> data_consume_rows(const sche
// can be beneficial if the user wants to fast_forward_to() on the
// returned context, and may make small skips.
auto input = sst->data_stream(toread.start, last_end - toread.start, consumer.io_priority(),
consumer.resource_tracker(), consumer.trace_state(), sst->_partition_range_history);
consumer.permit(), consumer.trace_state(), sst->_partition_range_history);
return {s, std::move(sst), consumer, std::move(input), toread.start, toread.end - toread.start };
}
template <typename DataConsumeRowsContext>
inline data_consume_context<DataConsumeRowsContext> data_consume_single_partition(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer, sstable::disk_read_range toread) {
auto input = sst->data_stream(toread.start, toread.end - toread.start, consumer.io_priority(),
consumer.resource_tracker(), consumer.trace_state(), sst->_single_partition_history);
consumer.permit(), consumer.trace_state(), sst->_single_partition_history);
return {s, std::move(sst), consumer, std::move(input), toread.start, toread.end - toread.start };
}

View File

@@ -381,13 +381,13 @@ public:
mp_row_consumer_k_l(mp_row_consumer_reader* reader,
const schema_ptr schema,
reader_permit permit,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: row_consumer(std::move(resource_tracker), std::move(trace_state), pc)
: row_consumer(std::move(permit), std::move(trace_state), pc)
, _reader(reader)
, _schema(schema)
, _slice(slice)
@@ -398,12 +398,12 @@ public:
mp_row_consumer_k_l(mp_row_consumer_reader* reader,
const schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: mp_row_consumer_k_l(reader, schema, schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, sst) { }
: mp_row_consumer_k_l(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst) { }
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
if (!_is_mutation_end) {
@@ -991,13 +991,13 @@ public:
mp_row_consumer_m(mp_row_consumer_reader* reader,
const schema_ptr schema,
reader_permit permit,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: consumer_m(resource_tracker, std::move(trace_state), pc)
: consumer_m(std::move(permit), std::move(trace_state), pc)
, _reader(reader)
, _schema(schema)
, _slice(slice)
@@ -1010,12 +1010,12 @@ public:
mp_row_consumer_m(mp_row_consumer_reader* reader,
const schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: mp_row_consumer_m(reader, schema, schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, sst)
: mp_row_consumer_m(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst)
{ }
virtual ~mp_row_consumer_m() {}

View File

@@ -139,13 +139,13 @@ class sstable_mutation_reader : public mp_row_consumer_reader {
read_monitor& _monitor;
public:
sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
const io_priority_class &pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
read_monitor& mon)
: mp_row_consumer_reader(std::move(schema), std::move(sst))
, _consumer(this, _schema, _schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst)
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), pc, std::move(trace_state), fwd, _sst)
, _initialize([this] {
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer);
_monitor.on_read_started(_context->reader_position());
@@ -155,16 +155,16 @@ public:
, _monitor(mon) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon)
: mp_row_consumer_reader(std::move(schema), std::move(sst))
, _consumer(this, _schema, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst)
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _initialize([this, pr, &pc, &slice, fwd_mr] () mutable {
auto f = get_index_reader().advance_to(pr);
return f.then([this, &pc, &slice, fwd_mr] () mutable {
@@ -183,16 +183,16 @@ public:
, _monitor(mon) { }
sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
dht::ring_position_view key,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon)
: mp_row_consumer_reader(std::move(schema), std::move(sst))
, _consumer(this, _schema, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst)
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
, _single_partition_read(true)
, _initialize([this, key = std::move(key), &pc, &slice, fwd_mr] () mutable {
position_in_partition_view pos = get_slice_upper_bound(*_schema, slice, key);
@@ -479,22 +479,23 @@ void mp_row_consumer_reader::on_next_partition(dht::decorated_key key, tombstone
_sst->get_stats().on_partition_read();
}
flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, const io_priority_class& pc, streamed_mutation::forwarding fwd) {
flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, reader_permit permit, const io_priority_class& pc,
streamed_mutation::forwarding fwd) {
get_stats().on_sstable_partition_read();
if (_version == version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), pc, no_resource_tracking(), tracing::trace_state_ptr(), fwd, default_read_monitor());
shared_from_this(), std::move(schema), std::move(permit), pc, tracing::trace_state_ptr(), fwd, default_read_monitor());
}
return make_flat_mutation_reader<sstable_mutation_reader<>>(shared_from_this(), std::move(schema), pc,
no_resource_tracking(), tracing::trace_state_ptr(), fwd, default_read_monitor());
return make_flat_mutation_reader<sstable_mutation_reader<>>(shared_from_this(), std::move(schema), std::move(permit), pc,
tracing::trace_state_ptr(), fwd, default_read_monitor());
}
flat_mutation_reader
sstables::sstable::read_row_flat(schema_ptr schema,
reader_permit permit,
dht::ring_position_view key,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
read_monitor& mon)
@@ -502,19 +503,19 @@ sstables::sstable::read_row_flat(schema_ptr schema,
get_stats().on_single_partition_read();
if (_version == version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), std::move(key), slice, pc,
std::move(resource_tracker), std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc,
std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
}
return make_flat_mutation_reader<sstable_mutation_reader<>>(shared_from_this(), std::move(schema), std::move(key), slice, pc,
std::move(resource_tracker), std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
return make_flat_mutation_reader<sstable_mutation_reader<>>(shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc,
std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
}
flat_mutation_reader
sstable::read_range_rows_flat(schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
@@ -522,10 +523,10 @@ sstable::read_range_rows_flat(schema_ptr schema,
get_stats().on_range_partition_read();
if (_version == version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), range, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, fwd_mr, mon);
shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
}
return make_flat_mutation_reader<sstable_mutation_reader<>>(
shared_from_this(), std::move(schema), range, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, fwd_mr, mon);
shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
}
}

View File

@@ -65,7 +65,7 @@
// row into one buffer, the byte_views remain valid until consume_row_end()
// is called.]
class row_consumer {
reader_resource_tracker _resource_tracker;
reader_permit _permit;
tracing::trace_state_ptr _trace_state;
const io_priority_class& _pc;
@@ -78,8 +78,8 @@ public:
*/
constexpr static bool is_setting_range_tombstone_start_supported = false;
row_consumer(reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, const io_priority_class& pc)
: _resource_tracker(resource_tracker)
row_consumer(reader_permit permit, tracing::trace_state_ptr trace_state, const io_priority_class& pc)
: _permit(std::move(permit))
, _trace_state(std::move(trace_state))
, _pc(pc) {
}
@@ -133,9 +133,9 @@ public:
return _pc;
}
// The restriction that applies to this consumer
reader_resource_tracker resource_tracker() const {
return _resource_tracker;
// The permit for this read
reader_permit permit() const {
return _permit;
}
tracing::trace_state_ptr trace_state() const {
@@ -144,7 +144,7 @@ public:
};
class consumer_m {
reader_resource_tracker _resource_tracker;
reader_permit _permit;
tracing::trace_state_ptr _trace_state;
const io_priority_class& _pc;
public:
@@ -162,8 +162,8 @@ public:
skip_row
};
consumer_m(reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, const io_priority_class& pc)
: _resource_tracker(resource_tracker)
consumer_m(reader_permit permit, tracing::trace_state_ptr trace_state, const io_priority_class& pc)
: _permit(std::move(permit))
, _trace_state(std::move(trace_state))
, _pc(pc) {
}
@@ -227,9 +227,9 @@ public:
return _pc;
}
// The restriction that applies to this consumer
reader_resource_tracker resource_tracker() const {
return _resource_tracker;
// The permit for this read
reader_permit permit() const {
return _permit;
}
tracing::trace_state_ptr trace_state() const {

View File

@@ -2789,14 +2789,14 @@ component_type sstable::component_from_sstring(version_types v, sstring &s) {
}
input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc,
reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history) {
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history) {
file_input_stream_options options;
options.buffer_size = sstable_buffer_size;
options.io_priority_class = pc;
options.read_ahead = 4;
options.dynamic_adjustments = std::move(history);
file f = resource_tracker.track(_data_file);
file f = make_tracked_file(_data_file, std::move(permit));
if (trace_state) {
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
}
@@ -2816,7 +2816,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
}
future<temporary_buffer<char>> sstable::data_read(uint64_t pos, size_t len, const io_priority_class& pc) {
return do_with(data_stream(pos, len, pc, no_resource_tracking(), tracing::trace_state_ptr(), {}), [len] (auto& stream) {
return do_with(data_stream(pos, len, pc, no_reader_permit(), tracing::trace_state_ptr(), {}), [len] (auto& stream) {
return stream.read_exactly(len).finally([&stream] {
return stream.close();
});
@@ -3431,6 +3431,7 @@ future<> init_metrics() {
mutation_source sstable::as_mutation_source() {
return mutation_source([sst = shared_from_this()] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -3442,9 +3443,9 @@ mutation_source sstable::as_mutation_source() {
// consequence, fast_forward_to() will *NOT* work on the result,
// regardless of what the fwd_mr parameter says.
if (range.is_singular() && range.start()->value().has_key()) {
return sst->read_row_flat(s, range.start()->value(), slice, pc, no_resource_tracking(), std::move(trace_state), fwd);
return sst->read_row_flat(s, std::move(permit), range.start()->value(), slice, pc, std::move(trace_state), fwd);
} else {
return sst->read_range_rows_flat(s, range, slice, pc, no_resource_tracking(), std::move(trace_state), fwd, fwd_mr);
return sst->read_range_rows_flat(s, std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
});
}

View File

@@ -206,34 +206,34 @@ public:
// returned in the result.
flat_mutation_reader read_row_flat(
schema_ptr schema,
reader_permit permit,
dht::ring_position_view key,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
reader_resource_tracker resource_tracker = no_resource_tracking(),
tracing::trace_state_ptr trace_state = {},
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
read_monitor& monitor = default_read_monitor());
flat_mutation_reader read_row_flat(schema_ptr schema, dht::ring_position_view key) {
flat_mutation_reader read_row_flat(schema_ptr schema, reader_permit permit, dht::ring_position_view key) {
auto& full_slice = schema->full_slice();
return read_row_flat(std::move(schema), std::move(key), full_slice);
return read_row_flat(std::move(schema), std::move(permit), std::move(key), full_slice);
}
// Returns a mutation_reader for given range of partitions
flat_mutation_reader read_range_rows_flat(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
reader_resource_tracker resource_tracker = no_resource_tracking(),
tracing::trace_state_ptr trace_state = {},
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
read_monitor& monitor = default_read_monitor());
flat_mutation_reader read_range_rows_flat(schema_ptr schema, const dht::partition_range& range) {
flat_mutation_reader read_range_rows_flat(schema_ptr schema, reader_permit permit, const dht::partition_range& range) {
auto& full_slice = schema->full_slice();
return read_range_rows_flat(std::move(schema), range, full_slice);
return read_range_rows_flat(std::move(schema), std::move(permit), range, full_slice);
}
// read_rows_flat() returns each of the rows in the sstable, in sequence,
@@ -248,6 +248,7 @@ public:
// as well as the sstable, remains alive as long as a read() is in
// progress (i.e., returned a future which hasn't completed yet).
flat_mutation_reader read_rows_flat(schema_ptr schema,
reader_permit permit,
const io_priority_class& pc = default_priority_class(),
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
@@ -628,7 +629,7 @@ private:
// about the buffer size to read, and where exactly to stop reading
// (even when a large buffer size is used).
input_stream<char> data_stream(uint64_t pos, size_t len, const io_priority_class& pc,
reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history);
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history);
// Read exactly the specific byte range from the data file (after
// uncompression, if the file is compressed). This can be used to read

View File

@@ -277,12 +277,12 @@ public:
static flat_mutation_reader
create_single_key_sstable_reader(column_family* cf,
schema_ptr schema,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
utils::estimated_histogram& sstable_histogram,
const dht::partition_range& pr, // must be singular
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
@@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf,
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd);
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
})
);
if (readers.empty()) {
@@ -303,19 +303,19 @@ create_single_key_sstable_reader(column_family* cf,
}
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator)
{
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
std::move(sstables),
@@ -349,41 +349,41 @@ table::make_sstable_reader(schema_ptr s,
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return mutation_source([] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
reader_resource_tracker tracker) {
mutation_reader::forwarding fwd_mr) {
return make_empty_flat_reader(s); // range doesn't belong to this shard
});
}
return mutation_source([semaphore, this, sstables=std::move(sstables)] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
reader_resource_tracker tracker) {
return create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(sstables),
_stats.estimated_sstable_per_read, pr, slice, pc, tracker, std::move(trace_state), fwd, fwd_mr);
mutation_reader::forwarding fwd_mr) {
return create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit), std::move(sstables),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
} else {
return mutation_source([semaphore, sstables=std::move(sstables)] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
reader_resource_tracker tracker) {
return make_local_shard_sstable_reader(std::move(s), std::move(sstables), pr, slice, pc,
tracker, std::move(trace_state), fwd, fwd_mr);
mutation_reader::forwarding fwd_mr) {
return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr);
});
}
}();
@@ -391,7 +391,7 @@ table::make_sstable_reader(schema_ptr s,
if (semaphore) {
return make_restricted_flat_reader(*semaphore, std::move(ms), std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
} else {
return ms.make_reader(std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
return ms.make_reader(std::move(s), no_reader_permit(), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
}
@@ -440,7 +440,7 @@ table::make_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
if (_virtual_reader) {
return (*_virtual_reader).make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr);
return (*_virtual_reader).make_reader(s, no_reader_permit(), range, slice, pc, trace_state, fwd, fwd_mr);
}
std::vector<flat_mutation_reader> readers;
@@ -500,7 +500,7 @@ table::make_streaming_reader(schema_ptr s,
auto& slice = s->full_slice();
auto& pc = service::get_local_streaming_read_priority();
auto source = mutation_source([this] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice,
auto source = mutation_source([this] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader> readers;
readers.reserve(_memtables->size() + 1);
@@ -574,20 +574,20 @@ static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
}
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator)
{
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc,
resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc,
trace_state, fwd, fwd_mr, monitor_generator(sst));
if (sst->is_shared()) {
using sig = bool (&)(const dht::decorated_key&);
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
@@ -1611,6 +1611,7 @@ table::sstables_as_snapshot_source() {
return snapshot_source([this] () {
auto sst_set = _sstables;
return mutation_source([this, sst_set] (schema_ptr s,
reader_permit,
const dht::partition_range& r,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2439,6 +2440,7 @@ table::query(schema_ptr s,
mutation_source
table::as_mutation_source() const {
return mutation_source([this] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2579,7 +2581,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this
@@ -2601,6 +2603,7 @@ future<row_locker::lock_holder> table::stream_view_replica_updates(const schema_
mutation_source
table::as_mutation_source_excluding(sstables::shared_sstable sst) const {
return mutation_source([this, sst = std::move(sst)] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -43,7 +43,7 @@ static void broken_sst(sstring dir, unsigned long generation, schema_ptr s, sstr
try {
sstables::test_env env;
sstable_ptr sstp = std::get<0>(env.reusable_sst(s, dir, generation, version).get());
auto r = sstp->read_rows_flat(s);
auto r = sstp->read_rows_flat(s, no_reader_permit());
r.consume(my_consumer{}, db::no_timeout).get();
BOOST_FAIL("expecting exception");
} catch (malformed_sstable_exception& e) {

View File

@@ -143,6 +143,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
cf.flush().get();
cf.get_row_cache().invalidate([] {}).get();
return mutation_source([&] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -412,7 +412,7 @@ SEASTAR_TEST_CASE(test_multi_range_reader) {
return m;
}));
auto source = mutation_source([&] (schema_ptr, const dht::partition_range& range) {
auto source = mutation_source([&] (schema_ptr, reader_permit, const dht::partition_range& range) {
return flat_mutation_reader_from_mutations(ms, range);
});
@@ -745,6 +745,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_mutations_as_mutation_source)
auto populate = [] (schema_ptr, const std::vector<mutation> &muts) {
return mutation_source([=] (
schema_ptr schema,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class&,
@@ -761,6 +762,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_fragments_as_mutation_source)
auto populate = [] (schema_ptr, const std::vector<mutation> &muts) {
return mutation_source([=] (
schema_ptr schema,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class&,

View File

@@ -117,6 +117,7 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
}
return mutation_source([memtables] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,

View File

@@ -61,7 +61,7 @@ struct mutation_less_cmp {
}
};
mutation_source make_source(std::vector<mutation> mutations) {
return mutation_source([mutations = std::move(mutations)] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice,
return mutation_source([mutations = std::move(mutations)] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
assert(range.is_full()); // slicing not implemented yet
for (auto&& m : mutations) {

View File

@@ -632,6 +632,7 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
sstable_mutation_readers.emplace_back(
sst->as_mutation_source().make_reader(
s.schema(),
no_reader_permit(),
query::full_partition_range,
s.schema()->full_slice(),
seastar::default_priority_class(),
@@ -645,11 +646,11 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
auto incremental_reader = make_local_shard_sstable_reader(
s.schema(),
no_reader_permit(),
sstable_set,
query::full_partition_range,
s.schema()->full_slice(),
seastar::default_priority_class(),
no_resource_tracking(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
@@ -906,14 +907,14 @@ class tracking_reader : public flat_mutation_reader::impl {
std::size_t _call_count{0};
std::size_t _ff_count{0};
public:
tracking_reader(schema_ptr schema, lw_shared_ptr<sstables::sstable> sst, reader_resource_tracker tracker)
tracking_reader(schema_ptr schema, reader_permit permit, lw_shared_ptr<sstables::sstable> sst)
: impl(schema)
, _reader(sst->read_range_rows_flat(
schema,
permit,
query::full_partition_range,
schema->full_slice(),
default_priority_class(),
tracker,
tracing::trace_state_ptr(),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes)) {
@@ -972,14 +973,14 @@ public:
, _timeout(timeout)
{
auto ms = mutation_source([this, sst=std::move(sst)] (schema_ptr schema,
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
reader_resource_tracker res_tracker) {
auto tracker_ptr = std::make_unique<tracking_reader>(std::move(schema), std::move(sst), res_tracker);
mutation_reader::forwarding) {
auto tracker_ptr = std::make_unique<tracking_reader>(std::move(schema), std::move(permit), std::move(sst));
_tracker = tracker_ptr.get();
return flat_mutation_reader(std::move(tracker_ptr));
});
@@ -1336,13 +1337,13 @@ SEASTAR_TEST_CASE(test_restricted_reader_as_mutation_source) {
auto ms = mt->as_data_source();
return mutation_source([&semaphore, ms = std::move(ms)](schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
reader_resource_tracker res_tracker) {
mutation_reader::forwarding fwd_mr) {
return make_restricted_flat_reader(semaphore, std::move(ms), std::move(schema), range, slice, pc, tr,
fwd, fwd_mr);
});
@@ -1383,6 +1384,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
}
mutation_source ds = create_sstable(env, s, muts)->as_mutation_source();
readers.push_back(ds.make_reader(s,
no_reader_permit(),
dht::partition_range::make({keys[0]}, {keys[0]}),
s->full_slice(), default_priority_class(), nullptr,
streamed_mutation::forwarding::yes,
@@ -1457,8 +1459,8 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
{
auto slice = partition_slice_builder(*s).with_range(range).build();
readers.push_back(ds1.make_reader(s, query::full_partition_range, slice));
readers.push_back(ds2.make_reader(s, query::full_partition_range, slice));
readers.push_back(ds1.make_reader(s, no_reader_permit(), query::full_partition_range, slice));
readers.push_back(ds2.make_reader(s, no_reader_permit(), query::full_partition_range, slice));
auto rd = make_combined_reader(s, std::move(readers),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
@@ -1480,9 +1482,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
// Check fast_forward_to()
{
readers.push_back(ds1.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
readers.push_back(ds1.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::yes));
readers.push_back(ds2.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
readers.push_back(ds2.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::yes));
auto rd = make_combined_reader(s, std::move(readers),
@@ -1600,6 +1602,7 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
auto reader_factory_ptr = make_lw_shared<decltype(reader_factory)>(std::move(reader_factory));
return mutation_source([reader_factory_ptr] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2118,6 +2121,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
}
return mutation_source([partitioner, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2498,6 +2502,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
auto& table = db->local().find_column_family(schema);
auto reader = table.as_mutation_source().make_reader(
schema,
no_reader_permit(),
range,
slice,
service::get_local_sstable_query_read_priority(),
@@ -2700,8 +2705,8 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
auto& table = db->local().find_column_family(s);
//TODO need a way to transport io_priority_calls across shards
auto& pc = service::get_local_sstable_query_read_priority();
return table.as_mutation_source().make_reader(std::move(s), range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no,
fwd_mr);
return table.as_mutation_source().make_reader(std::move(s), no_reader_permit(), range, slice, pc, std::move(trace_state),
streamed_mutation::forwarding::no, fwd_mr);
};
auto reference_reader = make_filtering_reader(
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory)), local_partitioner,

View File

@@ -162,7 +162,7 @@ public:
: _sem(reader_concurrency_semaphore::no_limits{})
, _cache(_sem, cache_size, entry_ttl)
, _mutations(make_mutations(_s, external_make_value))
, _mutation_source([this] (schema_ptr, const dht::partition_range& range) {
, _mutation_source([this] (schema_ptr, reader_permit, const dht::partition_range& range) {
auto rd = flat_mutation_reader_from_mutations(_mutations, range);
rd.set_max_buffer_size(max_reader_buffer_size);
return rd;

View File

@@ -110,7 +110,7 @@ snapshot_source make_decorated_snapshot_source(snapshot_source src, std::functio
}
mutation_source make_source_with(mutation m) {
return mutation_source([m] (schema_ptr s, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
return mutation_source([m] (schema_ptr s, reader_permit, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
assert(m.schema() == s);
return flat_mutation_reader_from_mutations({m}, std::move(fwd));
});
@@ -210,7 +210,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_full_range)
auto s = make_schema();
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd) {
return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count);
})), tracker);
@@ -234,7 +241,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_single_part
auto s = make_schema();
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd) {
return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count);
})), tracker);
auto range = make_single_partition_range(s, 100);
@@ -252,7 +266,14 @@ SEASTAR_TEST_CASE(test_cache_uses_continuity_info_for_single_partition_query) {
auto s = make_schema();
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd) {
return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count);
})), tracker);
@@ -274,7 +295,14 @@ void test_cache_delegates_to_underlying_only_once_with_single_partition(schema_p
int calls_to_secondary) {
int secondary_calls_count = 0;
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([m, &secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
row_cache cache(s, snapshot_source_from_snapshot(mutation_source([m, &secondary_calls_count] (
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd) {
assert(m.schema() == s);
if (range.contains(dht::ring_position(m.decorated_key()), dht::ring_position_comparator(*s))) {
return make_counting_reader(flat_mutation_reader_from_mutations({m}, std::move(fwd)), secondary_calls_count);
@@ -370,7 +398,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
}
auto make_cache = [&tracker, &mt](schema_ptr s, int& secondary_calls_count) -> lw_shared_ptr<row_cache> {
auto secondary = mutation_source([&mt, &secondary_calls_count] (schema_ptr s, const dht::partition_range& range,
auto secondary = mutation_source([&mt, &secondary_calls_count] (schema_ptr s, reader_permit, const dht::partition_range& range,
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
return make_counting_reader(mt->make_flat_reader(s, range, slice, pc, std::move(trace), std::move(fwd)), secondary_calls_count);
});
@@ -380,7 +408,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
auto make_ds = [&make_cache](schema_ptr s, int& secondary_calls_count) -> mutation_source {
auto cache = make_cache(s, secondary_calls_count);
return mutation_source([cache] (schema_ptr s, const dht::partition_range& range,
return mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range,
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd));
});
@@ -388,7 +416,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
auto do_test = [&s, &partitions] (const mutation_source& ds, const dht::partition_range& range,
int& secondary_calls_count, int expected_calls) {
assert_that(ds.make_reader(s, range))
assert_that(ds.make_reader(s, no_reader_permit(), range))
.produces(slice(partitions, range))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(expected_calls, secondary_calls_count);
@@ -475,25 +503,25 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
auto range = dht::partition_range::make(
{partitions[0].decorated_key(), true},
{partitions[1].decorated_key(), true});
assert_that(ds.make_reader(s, range))
assert_that(ds.make_reader(s, no_reader_permit(), range))
.produces(slice(partitions, range))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(3, secondary_calls_count);
assert_that(ds.make_reader(s, range))
assert_that(ds.make_reader(s, no_reader_permit(), range))
.produces(slice(partitions, range))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(3, secondary_calls_count);
auto range2 = dht::partition_range::make(
{partitions[0].decorated_key(), true},
{partitions[1].decorated_key(), false});
assert_that(ds.make_reader(s, range2))
assert_that(ds.make_reader(s, no_reader_permit(), range2))
.produces(slice(partitions, range2))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(3, secondary_calls_count);
auto range3 = dht::partition_range::make(
{dht::ring_position::starting_at(key_before_all.token())},
{partitions[2].decorated_key(), false});
assert_that(ds.make_reader(s, range3))
assert_that(ds.make_reader(s, no_reader_permit(), range3))
.produces(slice(partitions, range3))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(5, secondary_calls_count);
@@ -505,7 +533,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
};
auto cache = make_cache(s, secondary_calls_count);
auto ds = mutation_source([cache] (schema_ptr s, const dht::partition_range& range,
auto ds = mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range,
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd));
});
@@ -515,7 +543,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation
cache->invalidate([] {}, key_after_all).get();
assert_that(ds.make_reader(s, query::full_partition_range))
assert_that(ds.make_reader(s, no_reader_permit(), query::full_partition_range))
.produces(slice(partitions, query::full_partition_range))
.produces_end_of_stream();
BOOST_CHECK_EQUAL(partitions.size() + 2, secondary_calls_count);
@@ -693,6 +721,7 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) {
auto cache = make_lw_shared<row_cache>(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);
return mutation_source([cache] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -761,14 +790,14 @@ SEASTAR_TEST_CASE(test_presence_checker_runs_under_right_allocator) {
auto src = snapshot_source([&] {
auto ms = underlying();
return mutation_source([ms = std::move(ms)] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding mr_fwd,
reader_resource_tracker rrt) mutable {
return ms.make_reader(s, pr, slice, pc, std::move(tr), fwd, mr_fwd, std::move(rrt));
mutation_reader::forwarding mr_fwd) {
return ms.make_reader(s, std::move(permit), pr, slice, pc, std::move(tr), fwd, mr_fwd);
}, [] {
return [saved = managed_bytes()] (const dht::decorated_key& key) mutable {
// size large enough to defeat the small blob optimization
@@ -1227,7 +1256,7 @@ private:
flat_mutation_reader make_reader(schema_ptr s, const dht::partition_range& pr,
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader<reader>(_throttle, _underlying.make_reader(s, pr, slice, pc, std::move(trace), std::move(fwd)));
return make_flat_mutation_reader<reader>(_throttle, _underlying.make_reader(s, no_reader_permit(), pr, slice, pc, std::move(trace), std::move(fwd)));
}
};
lw_shared_ptr<impl> _impl;
@@ -1237,7 +1266,7 @@ public:
{ }
operator mutation_source() const {
return mutation_source([impl = _impl] (schema_ptr s, const dht::partition_range& pr,
return mutation_source([impl = _impl] (schema_ptr s, reader_permit, const dht::partition_range& pr,
const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) {
return impl->make_reader(std::move(s), pr, slice, pc, std::move(trace), std::move(fwd));
});

View File

@@ -92,7 +92,7 @@ public:
return sstables::test(_sst).read_indexes();
}
flat_mutation_reader read_rows_flat() {
return _sst->read_rows_flat(_sst->_schema);
return _sst->read_rows_flat(_sst->_schema, no_reader_permit());
}
const stats_metadata& get_stats_metadata() const {
@@ -103,16 +103,15 @@ public:
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
reader_resource_tracker resource_tracker = no_resource_tracking(),
tracing::trace_state_ptr trace_state = {},
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
read_monitor& monitor = default_read_monitor()) {
return _sst->read_range_rows_flat(_sst->_schema,
no_reader_permit(),
range,
slice,
pc,
std::move(resource_tracker),
std::move(trace_state),
fwd,
fwd_mr,
@@ -263,7 +262,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_read) {
auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range,
UNCOMPRESSED_FILTERING_AND_FORWARDING_SCHEMA->full_slice(),
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
r.produces_partition_start(to_key(1))
@@ -312,7 +310,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_read) {
auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range,
slice,
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
@@ -523,7 +520,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_skip_using_index_rows) {
auto rd = sst.read_range_rows_flat(query::full_partition_range,
UNCOMPRESSED_SKIP_USING_INDEX_ROWS_SCHEMA->full_slice(),
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes);
rd.set_max_buffer_size(1);
@@ -564,7 +560,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_skip_using_index_rows) {
auto rd = sst.read_range_rows_flat(query::full_partition_range,
slice,
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes);
rd.set_max_buffer_size(1);
@@ -752,7 +747,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombst
auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range,
UNCOMPRESSED_FILTERING_AND_FORWARDING_SCHEMA->full_slice(),
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
std::array<int32_t, 2> rt_deletion_times {1534898600, 1534899416};
@@ -890,7 +884,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombst
auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range,
slice,
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
@@ -1087,7 +1080,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_slicing_interleaved_rows_and_rts_read
auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range,
UNCOMPRESSED_SLICING_INTERLEAVED_ROWS_AND_RTS_SCHEMA->full_slice(),
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
@@ -1156,7 +1148,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_slicing_interleaved_rows_and_rts_read
auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range,
slice,
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
const tombstone tomb = make_tombstone(1525385507816568, 1535592075);
@@ -3016,7 +3007,7 @@ static flat_mutation_reader compacted_sstable_reader(test_env& env, schema_ptr s
sstables::compact_sstables(sstables::compaction_descriptor(std::move(sstables)), *cf, new_sstable, replacer_fn_no_op()).get();
auto compacted_sst = open_sstable(env, s, tmp.path().string(), new_generation);
return compacted_sst->as_mutation_source().make_reader(s, query::full_partition_range, s->full_slice());
return compacted_sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice());
}
SEASTAR_THREAD_TEST_CASE(compact_deleted_row) {
@@ -4765,7 +4756,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_read_two_rows_fast_forwarding) {
auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range,
s->full_slice(),
default_priority_class(),
no_resource_tracking(),
tracing::trace_state_ptr(),
streamed_mutation::forwarding::yes));
r.produces_partition_start(to_pkey(0))
@@ -5113,7 +5103,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_reader_on_unknown_column) {
sst->load().get();
BOOST_REQUIRE_EXCEPTION(
assert_that(sst->read_rows_flat(read_schema))
assert_that(sst->read_rows_flat(read_schema, no_reader_permit()))
.produces_partition_start(dk)
.produces_row(to_ck(0), {{val2_cdef, int32_type->decompose(int32_t(200))}})
.produces_row(to_ck(1), {{val2_cdef, int32_type->decompose(int32_t(201))}})

View File

@@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, sst, mt, verifier, tomb, &static_set_col, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 11).then([s, verifier, tomb, &static_set_col] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) {
auto verify_set = [&tomb] (const collection_mutation_description& m) {
BOOST_REQUIRE(bool(m.tomb) == true);
@@ -858,7 +858,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
});
}).then([sstp, s, verifier] {
return do_with(make_dkey(s, "key2"), [sstp, s, verifier] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, rd] (auto mutation) {
auto m = verifier(mutation);
BOOST_REQUIRE(!m.tomb);
@@ -891,7 +891,7 @@ SEASTAR_TEST_CASE(datafile_generation_12) {
return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, tomb, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 12).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
@@ -927,7 +927,7 @@ static future<> sstable_compression_test(compressor_ptr c, unsigned generation)
return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tomb, generation, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, generation).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
@@ -1008,19 +1008,19 @@ static future<std::vector<sstables::shared_sstable>> open_sstables(test_env& env
// mutation_reader for sstable keeping all the required objects alive.
static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s) {
return sst->as_mutation_source().make_reader(s, query::full_partition_range, s->full_slice());
return sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice());
}
static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, const dht::partition_range& pr) {
return sst->as_mutation_source().make_reader(s, pr, s->full_slice());
return sst->as_mutation_source().make_reader(s, no_reader_permit(), pr, s->full_slice());
}
// We don't need to normalize the sstable reader for 'mc' format
// because it is naturally normalized now.
static flat_mutation_reader make_normalizing_sstable_reader(
shared_sstable sst, schema_ptr s, const dht::partition_range& pr) {
auto sstable_reader = sst->as_mutation_source().make_reader(s, pr, s->full_slice());
auto sstable_reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), pr, s->full_slice());
if (sst->get_version() == sstables::sstable::version_types::mc) {
return sstable_reader;
}
@@ -1419,7 +1419,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) {
return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 37).then([s, tmpdir_path] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
@@ -1454,7 +1454,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) {
return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 38).then([s] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
@@ -1490,7 +1490,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) {
return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 39).then([s] (auto sstp) {
return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
@@ -1586,7 +1586,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, tomb, tmpdir_path] {
return env.reusable_sst(s, tmpdir_path, 41).then([s, tomb] (auto sstp) mutable {
return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
@@ -2546,7 +2546,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query::partition_slice& ps,
std::vector<std::pair<partition_key, std::vector<clustering_key>>> expected)
{
auto reader = sst->as_mutation_source().make_reader(s, query::full_partition_range, ps);
auto reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, ps);
partition_key::equality pk_eq(*s);
clustering_key::equality ck_eq(*s);
@@ -3698,7 +3698,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
.with_range(query::clustering_range::make_singular(ck2))
.with_range(query::clustering_range::make_singular(ck3))
.build();
flat_mutation_reader rd = ms.make_reader(table.schema(), query::full_partition_range, slice);
flat_mutation_reader rd = ms.make_reader(table.schema(), no_reader_permit(), query::full_partition_range, slice);
assert_that(std::move(rd)).has_monotonic_positions();
}
}
@@ -3739,6 +3739,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
auto ms = as_mutation_source(sst);
auto rd = ms.make_reader(table.schema(),
no_reader_permit(),
query::full_partition_range,
table.schema()->full_slice(),
default_priority_class(),
@@ -4260,7 +4261,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
std::set<mutation, mutation_decorated_key_less_comparator> merged;
merged.insert(mutations.begin(), mutations.end());
auto rd = assert_that(sst->as_mutation_source().make_reader(s, query::full_partition_range));
auto rd = assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range));
auto keys_read = 0;
for (auto&& m : merged) {
keys_read++;
@@ -4270,7 +4271,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
BOOST_REQUIRE(keys_read == keys_written);
auto r = dht::partition_range::make({mutations.back().decorated_key(), true}, {mutations.back().decorated_key(), true});
assert_that(sst->as_mutation_source().make_reader(s, r))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), r))
.produces(slice(mutations, r))
.produces_end_of_stream();
});
@@ -4532,7 +4533,7 @@ SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) {
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_singular({ck})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -4940,7 +4941,7 @@ SEASTAR_TEST_CASE(test_reads_cassandra_static_compact) {
m.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("c2"),
atomic_cell::make_live(*utf8_type, 1551785032379079, utf8_type->decompose("cde"), {}));
assert_that(sst->as_mutation_source().make_reader(s))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit()))
.produces(m)
.produces_end_of_stream();
});
@@ -5191,11 +5192,11 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
compacting->insert(std::move(sst));
}
auto reader = ::make_range_sstable_reader(s,
no_reader_permit(),
compacting,
query::full_partition_range,
s->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);

View File

@@ -55,7 +55,7 @@ SEASTAR_THREAD_TEST_CASE(nonexistent_key) {
env.reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([] (auto sstp) {
return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return (*rd)(db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(!mutation);
return make_ready_future<>();
@@ -68,7 +68,7 @@ future<> test_no_clustered(sstables::test_env& env, bytes&& key, std::unordered_
return env.reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([k = std::move(key), map = std::move(map)] (auto sstp) mutable {
return do_with(make_dkey(uncompressed_schema(), std::move(k)), [sstp, map = std::move(map)] (auto& key) {
auto s = uncompressed_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
@@ -143,7 +143,7 @@ future<mutation> generate_clustered(sstables::test_env& env, bytes&& key) {
return env.reusable_sst(complex_schema(), "test/resource/sstables/complex", Generation).then([k = std::move(key)] (auto sstp) mutable {
return do_with(make_dkey(complex_schema(), std::move(k)), [sstp] (auto& key) {
auto s = complex_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
return std::move(*mutation);
@@ -343,7 +343,7 @@ future<> test_range_reads(sstables::test_env& env, const dht::token& min, const
auto stop = make_lw_shared<bool>(false);
return do_with(dht::partition_range::make(dht::ring_position::starting_at(min),
dht::ring_position::ending_at(max)), [&, sstp, s] (auto& pr) {
auto mutations = make_lw_shared<flat_mutation_reader>(sstp->read_range_rows_flat(s, pr));
auto mutations = make_lw_shared<flat_mutation_reader>(sstp->read_range_rows_flat(s, no_reader_permit(), pr));
return do_until([stop] { return *stop; },
// Note: The data in the following lambda, including
// "mutations", continues to live until after the last
@@ -475,7 +475,7 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) {
sstables::sstable::format_types::big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mr = sst->read_rows_flat(s, no_reader_permit());
auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0();
BOOST_REQUIRE(bool(mut));
auto& rts = mut->partition().row_tombstones();
@@ -496,7 +496,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_sparse_read) {
env.reusable_sst(compact_sparse_schema(), "test/resource/sstables/compact_sparse", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_sparse_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
@@ -515,7 +515,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_simple_dense_read) {
env.reusable_sst(compact_simple_dense_schema(), "test/resource/sstables/compact_simple_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_simple_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
@@ -536,7 +536,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_dense_read) {
env.reusable_sst(compact_dense_schema(), "test/resource/sstables/compact_dense", 1).then([] (auto sstp) {
return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) {
auto s = compact_dense_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
auto& mp = mutation->partition();
@@ -560,7 +560,7 @@ SEASTAR_THREAD_TEST_CASE(broken_ranges_collection) {
sstables::test_env env;
env.reusable_sst(peers_schema(), "test/resource/sstables/broken_ranges", 2).then([] (auto sstp) {
auto s = peers_schema();
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, query::full_partition_range));
auto reader = make_lw_shared<flat_mutation_reader>(sstp->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range));
return repeat([s, reader] {
return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([s, reader] (mutation_opt mut) {
auto key_equal = [s, &mut] (sstring ip) {
@@ -628,7 +628,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 1).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
@@ -692,7 +692,7 @@ SEASTAR_THREAD_TEST_CASE(range_tombstone_reading) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 4).then([] (auto sstp) {
auto s = tombstone_overlap_schema();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
@@ -770,7 +770,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone2) {
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
ka_sst(tombstone_overlap_schema2(), "test/resource/sstables/tombstone_overlap", 3).then([] (auto sstp) {
auto s = tombstone_overlap_schema2();
return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) {
return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) {
return repeat([sstp, s, &reader] {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) {
if (!mut) {
@@ -850,7 +850,7 @@ static schema_ptr buffer_overflow_schema() {
SEASTAR_THREAD_TEST_CASE(buffer_overflow) {
auto s = buffer_overflow_schema();
auto sstp = ka_sst(s, "test/resource/sstables/buffer_overflow", 5).get0();
auto r = sstp->read_rows_flat(s);
auto r = sstp->read_rows_flat(s, no_reader_permit());
auto pk1 = partition_key::from_exploded(*s, { int32_type->decompose(4) });
auto dk1 = dht::global_partitioner().decorate_key(*s, pk1);
auto pk2 = partition_key::from_exploded(*s, { int32_type->decompose(3) });
@@ -910,7 +910,7 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) {
sstables::sstable::format_types::big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mr = sst->read_rows_flat(s, no_reader_permit());
auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0();
BOOST_REQUIRE(bool(mut));
}
@@ -949,7 +949,7 @@ SEASTAR_TEST_CASE(test_has_partition_key) {
dht::decorated_key dk(dht::global_partitioner().decorate_key(*s, k));
auto hk = sstables::sstable::make_hashed_key(*s, dk.key());
sst->load().get();
auto mr = sst->read_rows_flat(s);
auto mr = sst->read_rows_flat(s, no_reader_permit());
auto res = sst->has_partition_key(hk, dk).get0();
BOOST_REQUIRE(bool(res));
@@ -1073,7 +1073,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -1133,7 +1133,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -1186,7 +1186,7 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck})).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -1231,7 +1231,7 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound
{
auto slice = partition_slice_builder(*s).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -1313,7 +1313,7 @@ SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compou
{
auto slice = partition_slice_builder(*s).build();
assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice))
assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
@@ -1556,7 +1556,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations)
auto t0 = std::chrono::steady_clock::now();
auto large_allocs_before = memory::stats().large_allocations();
auto sst_reader = sst->as_mutation_source().make_reader(s, pr);
auto sst_reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), pr);
mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader, db::no_timeout).get0();
auto large_allocs_after = memory::stats().large_allocations();
auto duration = std::chrono::steady_clock::now() - t0;
@@ -1604,14 +1604,14 @@ SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
}
auto mr = assert_that(created_with_base_schema->as_mutation_source()
.make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
.make_reader(changed, no_reader_permit(), dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
for (auto& m : changed_mutations) {
mr.produces(m);
}
mr.produces_end_of_stream();
mr = assert_that(created_with_changed_schema->as_mutation_source()
.make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
.make_reader(changed, no_reader_permit(), dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
for (auto& m : changed_mutations) {
mr.produces(m);
}

View File

@@ -132,7 +132,7 @@ void run_sstable_resharding_test() {
auto shard = shards.front();
BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard);
auto rd = assert_that(new_sst->as_mutation_source().make_reader(s));
auto rd = assert_that(new_sst->as_mutation_source().make_reader(s, no_reader_permit()));
BOOST_REQUIRE(muts[shard].size() == keys_per_shard);
for (auto k : boost::irange(0u, keys_per_shard)) {
rd.produces(muts[shard][k]);

View File

@@ -339,7 +339,7 @@ public:
int count_row_end = 0;
test_row_consumer(int64_t t)
: row_consumer(no_resource_tracking(), tracing::trace_state_ptr()
: row_consumer(no_reader_permit(), tracing::trace_state_ptr()
, default_priority_class()), desired_timestamp(t) {
}
@@ -460,7 +460,7 @@ public:
int count_range_tombstone = 0;
count_row_consumer()
: row_consumer(no_resource_tracking(), tracing::trace_state_ptr(), default_priority_class()) {
: row_consumer(no_reader_permit(), tracing::trace_state_ptr(), default_priority_class()) {
}
virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override {
@@ -838,7 +838,7 @@ SEASTAR_TEST_CASE(wrong_range) {
return test_using_reusable_sst(uncompressed_schema(), "test/resource/sstables/wrongrange", 114, [] (auto sstp) {
return do_with(make_dkey(uncompressed_schema(), "todata"), [sstp] (auto& key) {
auto s = columns_schema();
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, no_reader_permit(), key));
return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) {
return make_ready_future<>();
});
@@ -1017,7 +1017,7 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri
return seastar::async([sstp, s, key, ck1, ck2] () mutable {
auto ps = make_partition_slice(*s, ck1, ck2);
auto dkey = make_dkey(s, key.c_str());
auto rd = sstp->read_row_flat(s, dkey, ps);
auto rd = sstp->read_row_flat(s, no_reader_permit(), dkey, ps);
auto mfopt = rd(db::no_timeout).get0();
if (!mfopt) {
return 0;
@@ -1038,7 +1038,7 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri
static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key) {
return seastar::async([sstp, s, key] () mutable {
auto dkey = make_dkey(s, key.c_str());
auto rd = sstp->read_row_flat(s, dkey);
auto rd = sstp->read_row_flat(s, no_reader_permit(), dkey);
auto mfopt = rd(db::no_timeout).get0();
if (!mfopt) {
return 0;
@@ -1060,7 +1060,7 @@ static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring key) {
static future<int> count_rows(sstable_ptr sstp, schema_ptr s, sstring ck1, sstring ck2) {
return seastar::async([sstp, s, ck1, ck2] () mutable {
auto ps = make_partition_slice(*s, ck1, ck2);
auto reader = sstp->read_range_rows_flat(s, query::full_partition_range, ps);
auto reader = sstp->read_range_rows_flat(s, no_reader_permit(), query::full_partition_range, ps);
int nrows = 0;
auto mfopt = reader(db::no_timeout).get0();
while (mfopt) {

View File

@@ -175,7 +175,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) {
auto test_common = [&] (const query::partition_slice& slice) {
BOOST_TEST_MESSAGE("Read whole partitions at once");
auto pranges_walker = partition_range_walker(pranges);
auto mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice,
auto mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr);
auto actual = assert_that(std::move(mr));
for (auto& expected : mutations) {
@@ -201,7 +201,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) {
BOOST_TEST_MESSAGE("Read partitions with fast-forwarding to each individual row");
pranges_walker = partition_range_walker(pranges);
mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice,
mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::yes, fwd_mr);
actual = assert_that(std::move(mr));
for (auto& expected : mutations) {
@@ -237,14 +237,14 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) {
test_common(slice);
BOOST_TEST_MESSAGE("Test monotonic positions");
auto mr = ms.make_reader(s.schema(), query::full_partition_range, slice,
auto mr = ms.make_reader(s.schema(), no_reader_permit(), query::full_partition_range, slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr);
assert_that(std::move(mr)).has_monotonic_positions();
if (range_size != 1) {
BOOST_TEST_MESSAGE("Read partitions fast-forwarded to the range of interest");
auto pranges_walker = partition_range_walker(pranges);
mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice,
mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::yes, fwd_mr);
auto actual = assert_that(std::move(mr));
for (auto& expected : mutations) {
@@ -284,7 +284,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) {
BOOST_TEST_MESSAGE("Read partitions with just static rows");
auto pranges_walker = partition_range_walker(pranges);
mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice,
mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr);
auto actual = assert_that(std::move(mr));
for (auto& expected : mutations) {
@@ -311,7 +311,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) {
test_common(slice);
BOOST_TEST_MESSAGE("Test monotonic positions");
auto mr = ms.make_reader(s.schema(), query::full_partition_range, slice,
auto mr = ms.make_reader(s.schema(), no_reader_permit(), query::full_partition_range, slice,
default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr);
assert_that(std::move(mr)).has_monotonic_positions();
}
@@ -383,10 +383,10 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(populat
mutation_source ms = populate(m.schema(), {m}, gc_clock::now());
flat_mutation_reader sliced_reader =
ms.make_reader(m.schema(), prange, slice_with_ranges);
ms.make_reader(m.schema(), no_reader_permit(), prange, slice_with_ranges);
flat_mutation_reader fwd_reader =
ms.make_reader(m.schema(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes);
ms.make_reader(m.schema(), no_reader_permit(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes);
std::optional<mutation_rebuilder> builder{};
struct consumer {
@@ -474,6 +474,7 @@ static void test_streamed_mutation_forwarding_guarantees(populate_fn_ex populate
auto new_stream = [&ms, s, &m] () -> flat_reader_assertions {
BOOST_TEST_MESSAGE("Creating new streamed_mutation");
auto res = assert_that(ms.make_reader(s,
no_reader_permit(),
query::full_partition_range,
s->full_slice(),
default_priority_class(),
@@ -609,6 +610,7 @@ static void test_fast_forwarding_across_partitions_to_empty_range(populate_fn_ex
auto pr = dht::partition_range::make({keys[0]}, {keys[1]});
auto rd = assert_that(ms.make_reader(s,
no_reader_permit(),
pr,
s->full_slice(),
default_priority_class(),
@@ -710,7 +712,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu
))
.build();
auto rd = assert_that(ms.make_reader(s, pr, slice));
auto rd = assert_that(ms.make_reader(s, no_reader_permit(), pr, slice));
rd.produces_partition_start(m.decorated_key());
rd.produces_row_with_key(keys[2]);
@@ -729,7 +731,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu
))
.build();
auto rd = assert_that(ms.make_reader(s, pr, slice));
auto rd = assert_that(ms.make_reader(s, no_reader_permit(), pr, slice));
rd.produces_partition_start(m.decorated_key())
.produces_range_tombstone(rt3, slice.row_ranges(*s, m.key()))
@@ -784,6 +786,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(populate_f
mutation_source ms = populate(s, std::vector<mutation>({m}), gc_clock::now());
auto rd = assert_that(ms.make_reader(s,
no_reader_permit(),
query::full_partition_range,
s->full_slice(),
default_priority_class(),
@@ -867,7 +870,7 @@ static void test_range_queries(populate_fn_ex populate) {
auto test_slice = [&] (dht::partition_range r) {
BOOST_TEST_MESSAGE(format("Testing range {}", r));
assert_that(ds.make_reader(s, r))
assert_that(ds.make_reader(s, no_reader_permit(), r))
.produces(slice(partitions, r))
.produces_end_of_stream();
};
@@ -1008,7 +1011,7 @@ static void test_date_tiered_clustering_slicing(populate_fn_ex populate) {
.with_range(ss.make_ckey_range(1, 2))
.build();
auto prange = dht::partition_range::make_singular(pkey);
assert_that(ms.make_reader(s, prange, slice))
assert_that(ms.make_reader(s, no_reader_permit(), prange, slice))
.produces(m1, slice.row_ranges(*s, pkey.key()))
.produces_end_of_stream();
}
@@ -1018,7 +1021,7 @@ static void test_date_tiered_clustering_slicing(populate_fn_ex populate) {
.with_range(query::clustering_range::make_singular(ss.make_ckey(0)))
.build();
auto prange = dht::partition_range::make_singular(pkey);
assert_that(ms.make_reader(s, prange, slice))
assert_that(ms.make_reader(s, no_reader_permit(), prange, slice))
.produces(m1)
.produces_end_of_stream();
}
@@ -1105,14 +1108,14 @@ static void test_clustering_slices(populate_fn_ex populate) {
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(make_ck(0)))
.build();
assert_that(ds.make_reader(s, pr, slice))
assert_that(ds.make_reader(s, no_reader_permit(), pr, slice))
.produces_eos_or_empty_mutation();
}
{
auto slice = partition_slice_builder(*s)
.build();
auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
auto rd = assert_that(ds.make_reader(s, no_reader_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
rd.produces_partition_start(pk)
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
.produces_row_with_key(ck1)
@@ -1123,7 +1126,7 @@ static void test_clustering_slices(populate_fn_ex populate) {
{
auto slice = partition_slice_builder(*s)
.build();
auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
auto rd = assert_that(ds.make_reader(s, no_reader_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
rd.produces_partition_start(pk)
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
@@ -1135,7 +1138,7 @@ static void test_clustering_slices(populate_fn_ex populate) {
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(make_ck(1)))
.build();
assert_that(ds.make_reader(s, pr, slice))
assert_that(ds.make_reader(s, no_reader_permit(), pr, slice))
.produces(row1 + row2 + row3 + row4 + row5 + del_1, slice.row_ranges(*s, pk.key()))
.produces_end_of_stream();
}
@@ -1143,7 +1146,7 @@ static void test_clustering_slices(populate_fn_ex populate) {
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(make_ck(2)))
.build();
assert_that(ds.make_reader(s, pr, slice))
assert_that(ds.make_reader(s, no_reader_permit(), pr, slice))
.produces(row6 + row7 + del_1 + del_2, slice.row_ranges(*s, pk.key()))
.produces_end_of_stream();
}
@@ -1152,7 +1155,7 @@ static void test_clustering_slices(populate_fn_ex populate) {
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(make_ck(1, 2)))
.build();
assert_that(ds.make_reader(s, pr, slice))
assert_that(ds.make_reader(s, no_reader_permit(), pr, slice))
.produces(row3 + row4 + del_1, slice.row_ranges(*s, pk.key()))
.produces_end_of_stream();
}
@@ -1161,7 +1164,7 @@ static void test_clustering_slices(populate_fn_ex populate) {
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(make_ck(3)))
.build();
assert_that(ds.make_reader(s, pr, slice))
assert_that(ds.make_reader(s, no_reader_permit(), pr, slice))
.produces(row8 + del_3, slice.row_ranges(*s, pk.key()))
.produces_end_of_stream();
}
@@ -1169,12 +1172,12 @@ static void test_clustering_slices(populate_fn_ex populate) {
// Test out-of-range partition keys
{
auto pr = dht::partition_range::make_singular(keys[0]);
assert_that(ds.make_reader(s, pr, s->full_slice()))
assert_that(ds.make_reader(s, no_reader_permit(), pr, s->full_slice()))
.produces_eos_or_empty_mutation();
}
{
auto pr = dht::partition_range::make_singular(keys[2]);
assert_that(ds.make_reader(s, pr, s->full_slice()))
assert_that(ds.make_reader(s, no_reader_permit(), pr, s->full_slice()))
.produces_eos_or_empty_mutation();
}
}
@@ -1195,7 +1198,7 @@ static void test_query_only_static_row(populate_fn_ex populate) {
// fully populate cache
{
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
assert_that(ms.make_reader(s.schema(), prange, s.schema()->full_slice()))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, s.schema()->full_slice()))
.produces(m1)
.produces_end_of_stream();
}
@@ -1206,7 +1209,7 @@ static void test_query_only_static_row(populate_fn_ex populate) {
.with_ranges({})
.build();
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
assert_that(ms.make_reader(s.schema(), prange, slice))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice))
.produces(m1, slice.row_ranges(*s.schema(), m1.key()))
.produces_end_of_stream();
}
@@ -1217,7 +1220,7 @@ static void test_query_only_static_row(populate_fn_ex populate) {
.with_ranges({})
.build();
auto prange = dht::partition_range::make_singular(m1.decorated_key());
assert_that(ms.make_reader(s.schema(), prange, slice))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice))
.produces(m1, slice.row_ranges(*s.schema(), m1.key()))
.produces_end_of_stream();
}
@@ -1237,7 +1240,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop
{
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
assert_that(ms.make_reader(s.schema(), prange, s.schema()->full_slice()))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, s.schema()->full_slice()))
.produces(m1)
.produces_end_of_stream();
}
@@ -1248,7 +1251,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop
.with_ranges({})
.build();
auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key()));
assert_that(ms.make_reader(s.schema(), prange, slice))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice))
.produces(m1, slice.row_ranges(*s.schema(), m1.key()))
.produces_end_of_stream();
}
@@ -1259,7 +1262,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop
.with_ranges({})
.build();
auto prange = dht::partition_range::make_singular(m1.decorated_key());
assert_that(ms.make_reader(s.schema(), prange, slice))
assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice))
.produces(m1, slice.row_ranges(*s.schema(), m1.key()))
.produces_end_of_stream();
}
@@ -1275,6 +1278,7 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(populate_fn_ex popu
auto source = populate(s.schema(), {m}, gc_clock::now());
assert_that(source.make_reader(s.schema(),
no_reader_permit(),
query::full_partition_range,
s.schema()->full_slice(),
default_priority_class(),
@@ -1323,7 +1327,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn_ex populate) {
{
auto slice = partition_slice_builder(*s).with_range(range).build();
auto rd = ds.make_reader(s, query::full_partition_range, slice);
auto rd = ds.make_reader(s, no_reader_permit(), query::full_partition_range, slice);
auto prange = position_range(range);
mutation result(m1.schema(), m1.decorated_key());
@@ -1341,7 +1345,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn_ex populate) {
// Check fast_forward_to()
{
auto rd = ds.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(),
auto rd = ds.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::yes);
auto prange = position_range(range);

View File

@@ -199,7 +199,7 @@ public:
}
future<double> read_sequential_partitions(int idx) {
return do_with(_sst[0]->read_rows_flat(s), [this] (flat_mutation_reader& r) {
return do_with(_sst[0]->read_rows_flat(s, no_reader_permit()), [this] (flat_mutation_reader& r) {
auto start = perf_sstable_test_env::now();
auto total = make_lw_shared<size_t>(0);
auto done = make_lw_shared<bool>(false);