From dfc8b2fc45d79932af09b198a7c32a9b8b7eff38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 15 Nov 2019 09:54:27 +0200 Subject: [PATCH] treewide: replace reader_resource_tracer with reader_permit The former was never really more than a reader_permit with one additional method. Currently using it doesn't even save one from any includes. Now that readers will be using reader_permit we would have to pass down both to mutation_source. Instead get rid of reader_resource_tracker and just use reader_permit. Instead of making it a last and optional parameter that is easy to ignore, make it a first class parameter, right after schema, to signify that permits are now a prominent part of the reader API. This -- mostly mechanical -- patch essentially refactors mutation_source to ask for the reader_permit instead of reader_resource_tracking and updates all usage sites. --- database.cc | 1 + database.hh | 4 +- db/size_estimates_virtual_reader.hh | 1 + db/view/build_progress_virtual_reader.hh | 1 + db/view/view.cc | 2 +- db/view/view_update_generator.cc | 2 +- flat_mutation_reader.cc | 6 +-- index/built_indexes_virtual_reader.hh | 1 + memtable.cc | 3 +- multishard_mutation_query.cc | 8 +-- mutation_partition.cc | 2 +- mutation_reader.cc | 14 ++--- mutation_reader.hh | 61 ++++++++------------- querier.hh | 2 +- row_cache.cc | 2 +- sstables/compaction.cc | 4 +- sstables/data_consume_context.hh | 4 +- sstables/mp_row_consumer.hh | 16 +++--- sstables/partition.cc | 37 ++++++------- sstables/row.hh | 24 ++++----- sstables/sstables.cc | 11 ++-- sstables/sstables.hh | 15 +++--- table.cc | 49 +++++++++-------- test/boost/broken_sstable_test.cc | 2 +- test/boost/database_test.cc | 1 + test/boost/flat_mutation_reader_test.cc | 4 +- test/boost/mutation_fragment_test.cc | 1 + test/boost/mutation_query_test.cc | 2 +- test/boost/mutation_reader_test.cc | 33 +++++++----- test/boost/querier_cache_test.cc | 2 +- test/boost/row_cache_test.cc | 67 +++++++++++++++++------- test/boost/sstable_3_x_test.cc | 18 ++----- test/boost/sstable_datafile_test.cc | 37 ++++++------- test/boost/sstable_mutation_test.cc | 46 ++++++++-------- test/boost/sstable_resharding_test.cc | 2 +- test/boost/sstable_test.cc | 12 ++--- test/lib/mutation_source_test.cc | 64 +++++++++++----------- test/perf/perf_sstable.hh | 2 +- 38 files changed, 297 insertions(+), 266 deletions(-) diff --git a/database.cc b/database.cc index 7ea3aac87f..0f5859ca0f 100644 --- a/database.cc +++ b/database.cc @@ -1960,6 +1960,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, } }; auto ms = mutation_source([&db, &partitioner] (schema_ptr s, + reader_permit, const dht::partition_range& pr, const query::partition_slice& ps, const io_priority_class& pc, diff --git a/database.hh b/database.hh index a353fec006..10b3a9a33d 100644 --- a/database.hh +++ b/database.hh @@ -1043,22 +1043,22 @@ using sstable_reader_factory_type = std::function sstables, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); flat_mutation_reader make_range_sstable_reader(schema_ptr s, + reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh index 4bf738dba9..25922e94e4 100644 --- a/db/size_estimates_virtual_reader.hh +++ b/db/size_estimates_virtual_reader.hh @@ -58,6 +58,7 @@ private: struct virtual_reader { flat_mutation_reader operator()(schema_ptr schema, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index 238101e447..c864f07451 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -188,6 +188,7 @@ public: flat_mutation_reader operator()( schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/db/view/view.cc b/db/view/view.cc index cddec04fb8..b833b6b170 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1231,11 +1231,11 @@ void view_builder::initialize_reader_at_current_token(build_step& step) { step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); step.reader = make_local_shard_sstable_reader( step.base->schema(), + no_reader_permit(), make_lw_shared(sstables::sstable_set(step.base->get_sstable_set())), step.prange, step.pslice, default_priority_class(), - no_resource_tracking(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 03c26cf752..de084e0830 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -37,7 +37,7 @@ future<> view_update_generator::start() { auto& [sst, t] = _sstables_with_tables.front(); try { schema_ptr s = t->schema(); - flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s); + flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s, no_reader_permit()); auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, _proxy, sst, _as), db::no_timeout); if (result == stop_iteration::yes) { break; diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 449c692118..6edfe4a9f0 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -613,7 +613,7 @@ public: } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { if (!_reader) { - _reader = _source.make_reader(_schema, pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no, + _reader = _source.make_reader(_schema, no_reader_permit(), pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no, mutation_reader::forwarding::yes); _end_of_stream = false; return make_ready_future<>(); @@ -663,7 +663,7 @@ public: tracing::trace_state_ptr trace_state) : impl(s) , _generator(std::move(generator)) - , _reader(source.make_reader(s, first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)) + , _reader(source.make_reader(s, no_reader_permit(), first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)) { } @@ -737,7 +737,7 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa return make_empty_flat_reader(std::move(s)); } } else if (ranges.size() == 1) { - return source.make_reader(std::move(s), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); + return source.make_reader(std::move(s), no_reader_permit(), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr); } else { return make_flat_mutation_reader>(std::move(s), std::move(source), ranges.front(), adapter(std::next(ranges.cbegin()), ranges.cend()), slice, pc, std::move(trace_state)); diff --git a/index/built_indexes_virtual_reader.hh b/index/built_indexes_virtual_reader.hh index b5da0ecd8b..84a7b357fa 100644 --- a/index/built_indexes_virtual_reader.hh +++ b/index/built_indexes_virtual_reader.hh @@ -118,6 +118,7 @@ public: flat_mutation_reader operator()( schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/memtable.cc b/memtable.cc index 31ff5e913e..80b639eb07 100644 --- a/memtable.cc +++ b/memtable.cc @@ -346,7 +346,7 @@ protected: const io_priority_class& pc, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - auto ret = _memtable->_underlying->make_reader(_schema, delegate, slice, pc, nullptr, fwd, fwd_mr); + auto ret = _memtable->_underlying->make_reader(_schema, no_reader_permit(), delegate, slice, pc, nullptr, fwd, fwd_mr); _memtable = {}; _last = {}; return ret; @@ -742,6 +742,7 @@ logalloc::occupancy_stats memtable::occupancy() const { mutation_source memtable::as_data_source() { return mutation_source([mt = shared_from_this()] (schema_ptr s, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 284388af9a..5e9198f651 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -295,7 +295,8 @@ flat_mutation_reader read_context::create_reader( rm.rparts->read_operation = table.read_in_progress(); rm.state = reader_state::used; - return table.as_mutation_source().make_reader(std::move(schema), *rm.rparts->range, *rm.rparts->slice, pc, std::move(trace_state)); + return table.as_mutation_source().make_reader(std::move(schema), no_reader_permit(), *rm.rparts->range, *rm.rparts->slice, pc, + std::move(trace_state)); } void read_context::destroy_reader(shard_id shard, future reader_fut) noexcept { @@ -584,6 +585,7 @@ static future do_query_mutations( return ctx->lookup_readers().then([&ctx, s = std::move(s), &cmd, &ranges, trace_state, timeout, accounter = std::move(accounter)] () mutable { auto ms = mutation_source([&] (schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& ps, const io_priority_class& pc, @@ -592,8 +594,8 @@ static future do_query_mutations( mutation_reader::forwarding fwd_mr) { return make_multishard_combining_reader(ctx, dht::global_partitioner(), std::move(s), pr, ps, pc, std::move(trace_state), fwd_mr); }); - auto reader = make_flat_multi_range_reader(s, std::move(ms), ranges, cmd.slice, service::get_local_sstable_query_read_priority(), - trace_state, mutation_reader::forwarding::no); + auto reader = make_flat_multi_range_reader(s, std::move(ms), ranges, cmd.slice, + service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no); auto compaction_state = make_lw_shared(*s, cmd.timestamp, cmd.slice, cmd.row_limit, cmd.partition_limit); diff --git a/mutation_partition.cc b/mutation_partition.cc index d86d15ef52..a44ec8dd56 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2511,7 +2511,7 @@ future counter_write_query(schema_ptr s, const mutation_source& so const query::partition_slice& slice, tracing::trace_state_ptr trace_ptr) : range(dht::partition_range::make_singular(dk)) - , reader(source.make_reader(s, range, slice, service::get_local_sstable_query_read_priority(), + , reader(source.make_reader(s, no_reader_permit(), range, slice, service::get_local_sstable_query_read_priority(), std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) { } diff --git a/mutation_reader.cc b/mutation_reader.cc index 8818724f5e..0784b24674 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -673,8 +673,8 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { streamed_mutation::forwarding _fwd; mutation_reader::forwarding _fwd_mr; - flat_mutation_reader operator()(reader_resource_tracker tracker) { - return _ms.make_reader(std::move(_s), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr, tracker); + flat_mutation_reader operator()(reader_permit permit) { + return _ms.make_reader(std::move(_s), std::move(permit), _range.get(), _slice.get(), _pc.get(), std::move(_trace_state), _fwd, _fwd_mr); } }; @@ -683,7 +683,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { mutation_source_and_params reader_factory; }; struct admitted_state { - reader_permit permit; flat_mutation_reader reader; }; std::variant _state; @@ -705,7 +704,7 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { return std::get(_state).semaphore.wait_admission(new_reader_base_cost, timeout).then([this, fn = std::move(fn)] (reader_permit permit) mutable { auto reader_factory = std::move(std::get(_state).reader_factory); - _state.emplace(admitted_state{permit, reader_factory(reader_resource_tracker(permit))}); + _state.emplace(admitted_state{reader_factory(std::move(permit))}); return fn(std::get(_state).reader); }); } @@ -786,13 +785,13 @@ snapshot_source make_empty_snapshot_source() { mutation_source make_empty_mutation_source() { return mutation_source([](schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding, - reader_resource_tracker) { + mutation_reader::forwarding) { return make_empty_flat_reader(s); }, [] { return [] (const dht::decorated_key& key) { @@ -803,6 +802,7 @@ mutation_source make_empty_mutation_source() { mutation_source make_combined_mutation_source(std::vector addends) { return mutation_source([addends = std::move(addends)] (schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, @@ -811,7 +811,7 @@ mutation_source make_combined_mutation_source(std::vector adden std::vector rd; rd.reserve(addends.size()); for (auto&& ms : addends) { - rd.emplace_back(ms.make_reader(s, pr, slice, pc, tr, fwd)); + rd.emplace_back(ms.make_reader(s, permit, pr, slice, pc, tr, fwd)); } return make_combined_reader(s, std::move(rd), fwd); }); diff --git a/mutation_reader.hh b/mutation_reader.hh index b717cad91f..55a69c46fb 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -171,13 +171,13 @@ class mutation_source { using partition_range = const dht::partition_range&; using io_priority = const io_priority_class&; using flat_reader_factory_type = std::function; + mutation_reader::forwarding)>; // We could have our own version of std::function<> that is nothrow // move constructible and save some indirection and allocation. // Probably not worth the effort though. @@ -193,70 +193,54 @@ public: , _presence_checker_factory(make_lw_shared(std::move(pcf))) { } - mutation_source(std::function fn, - std::function pcf = [] { return make_default_partition_presence_checker(); }) - : mutation_source([fn = std::move(fn)] (schema_ptr s, - partition_range range, - const query::partition_slice& slice, - io_priority pc, - tracing::trace_state_ptr tr, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - reader_resource_tracker) { - return fn(s, range, slice, pc, std::move(tr), fwd, fwd_mr); - } - , std::move(pcf)) - { } - // For sources which don't care about the mutation_reader::forwarding flag (always fast forwardable) - mutation_source(std::function fn) : mutation_source([fn = std::move(fn)] (schema_ptr s, + reader_permit permit, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding, - reader_resource_tracker) { - return fn(s, range, slice, pc, std::move(tr), fwd); + mutation_reader::forwarding) { + return fn(std::move(s), std::move(permit), range, slice, pc, std::move(tr), fwd); }) {} - mutation_source(std::function fn) + mutation_source(std::function fn) : mutation_source([fn = std::move(fn)] (schema_ptr s, + reader_permit permit, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding, - reader_resource_tracker) { + mutation_reader::forwarding) { assert(!fwd); - return fn(s, range, slice, pc); + return fn(std::move(s), std::move(permit), range, slice, pc); }) {} - mutation_source(std::function fn) + mutation_source(std::function fn) : mutation_source([fn = std::move(fn)] (schema_ptr s, + reader_permit permit, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding, - reader_resource_tracker) { + mutation_reader::forwarding) { assert(!fwd); - return fn(s, range, slice); + return fn(std::move(s), std::move(permit), range, slice); }) {} - mutation_source(std::function fn) + mutation_source(std::function fn) : mutation_source([fn = std::move(fn)] (schema_ptr s, + reader_permit permit, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding, - reader_resource_tracker) { + mutation_reader::forwarding) { assert(!fwd); - return fn(s, range); + return fn(std::move(s), std::move(permit), range); }) {} mutation_source(const mutation_source& other) = default; @@ -271,24 +255,25 @@ public: flat_mutation_reader make_reader( schema_ptr s, + reader_permit permit, partition_range range, const query::partition_slice& slice, io_priority pc = default_priority_class(), tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, - mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes, - reader_resource_tracker tracker = no_resource_tracking()) const + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const { - return (*_fn)(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr, tracker); + return (*_fn)(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); } flat_mutation_reader make_reader( schema_ptr s, + reader_permit permit = no_reader_permit(), partition_range range = query::full_partition_range) const { auto& full_slice = s->full_slice(); - return this->make_reader(std::move(s), range, full_slice); + return this->make_reader(std::move(s), std::move(permit), range, full_slice); } partition_presence_checker make_partition_presence_checker() { diff --git a/querier.hh b/querier.hh index 7d608132e9..c00bc54728 100644 --- a/querier.hh +++ b/querier.hh @@ -155,7 +155,7 @@ public: : _schema(schema) , _range(std::make_unique(std::move(range))) , _slice(std::make_unique(std::move(slice))) - , _reader(ms.make_reader(schema, *_range, *_slice, pc, std::move(trace_ptr), + , _reader(ms.make_reader(schema, no_reader_permit(), *_range, *_slice, pc, std::move(trace_ptr), streamed_mutation::forwarding::no, mutation_reader::forwarding::no)) , _compaction_state(make_lw_shared>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) { } diff --git a/row_cache.cc b/row_cache.cc index 173c7745b1..a8ce4d43bb 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -47,7 +47,7 @@ using namespace cache; flat_mutation_reader row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) { ctx.on_underlying_created(); - return src.make_reader(_schema, pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes); + return src.make_reader(_schema, no_reader_permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes); } static thread_local mutation_application_stats dummy_app_stats; diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 2df8c77535..90dfe0ad0e 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -710,11 +710,11 @@ public: flat_mutation_reader make_sstable_reader() const override { return ::make_local_shard_sstable_reader(_schema, + no_reader_permit(), _compacting, query::full_partition_range, _schema->full_slice(), service::get_local_compaction_priority(), - no_resource_tracking(), tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no, @@ -967,11 +967,11 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { return ::make_range_sstable_reader(_schema, + no_reader_permit(), _compacting, query::full_partition_range, _schema->full_slice(), service::get_local_compaction_priority(), - no_resource_tracking(), nullptr, ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no); diff --git a/sstables/data_consume_context.hh b/sstables/data_consume_context.hh index 90930b8585..61ca5efa83 100644 --- a/sstables/data_consume_context.hh +++ b/sstables/data_consume_context.hh @@ -180,14 +180,14 @@ inline data_consume_context data_consume_rows(const sche // can be beneficial if the user wants to fast_forward_to() on the // returned context, and may make small skips. auto input = sst->data_stream(toread.start, last_end - toread.start, consumer.io_priority(), - consumer.resource_tracker(), consumer.trace_state(), sst->_partition_range_history); + consumer.permit(), consumer.trace_state(), sst->_partition_range_history); return {s, std::move(sst), consumer, std::move(input), toread.start, toread.end - toread.start }; } template inline data_consume_context data_consume_single_partition(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer, sstable::disk_read_range toread) { auto input = sst->data_stream(toread.start, toread.end - toread.start, consumer.io_priority(), - consumer.resource_tracker(), consumer.trace_state(), sst->_single_partition_history); + consumer.permit(), consumer.trace_state(), sst->_single_partition_history); return {s, std::move(sst), consumer, std::move(input), toread.start, toread.end - toread.start }; } diff --git a/sstables/mp_row_consumer.hh b/sstables/mp_row_consumer.hh index da269e0525..509dd34393 100644 --- a/sstables/mp_row_consumer.hh +++ b/sstables/mp_row_consumer.hh @@ -381,13 +381,13 @@ public: mp_row_consumer_k_l(mp_row_consumer_reader* reader, const schema_ptr schema, + reader_permit permit, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, const shared_sstable& sst) - : row_consumer(std::move(resource_tracker), std::move(trace_state), pc) + : row_consumer(std::move(permit), std::move(trace_state), pc) , _reader(reader) , _schema(schema) , _slice(slice) @@ -398,12 +398,12 @@ public: mp_row_consumer_k_l(mp_row_consumer_reader* reader, const schema_ptr schema, + reader_permit permit, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, const shared_sstable& sst) - : mp_row_consumer_k_l(reader, schema, schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, sst) { } + : mp_row_consumer_k_l(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst) { } virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override { if (!_is_mutation_end) { @@ -991,13 +991,13 @@ public: mp_row_consumer_m(mp_row_consumer_reader* reader, const schema_ptr schema, + reader_permit permit, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, const shared_sstable& sst) - : consumer_m(resource_tracker, std::move(trace_state), pc) + : consumer_m(std::move(permit), std::move(trace_state), pc) , _reader(reader) , _schema(schema) , _slice(slice) @@ -1010,12 +1010,12 @@ public: mp_row_consumer_m(mp_row_consumer_reader* reader, const schema_ptr schema, + reader_permit permit, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, const shared_sstable& sst) - : mp_row_consumer_m(reader, schema, schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, sst) + : mp_row_consumer_m(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst) { } virtual ~mp_row_consumer_m() {} diff --git a/sstables/partition.cc b/sstables/partition.cc index 5d33cb944e..c8fb935b6d 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -139,13 +139,13 @@ class sstable_mutation_reader : public mp_row_consumer_reader { read_monitor& _monitor; public: sstable_mutation_reader(shared_sstable sst, schema_ptr schema, + reader_permit permit, const io_priority_class &pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, read_monitor& mon) : mp_row_consumer_reader(std::move(schema), std::move(sst)) - , _consumer(this, _schema, _schema->full_slice(), pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst) + , _consumer(this, _schema, std::move(permit), _schema->full_slice(), pc, std::move(trace_state), fwd, _sst) , _initialize([this] { _context = data_consume_rows(*_schema, _sst, _consumer); _monitor.on_read_started(_context->reader_position()); @@ -155,16 +155,16 @@ public: , _monitor(mon) { } sstable_mutation_reader(shared_sstable sst, schema_ptr schema, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, read_monitor& mon) : mp_row_consumer_reader(std::move(schema), std::move(sst)) - , _consumer(this, _schema, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst) + , _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst) , _initialize([this, pr, &pc, &slice, fwd_mr] () mutable { auto f = get_index_reader().advance_to(pr); return f.then([this, &pc, &slice, fwd_mr] () mutable { @@ -183,16 +183,16 @@ public: , _monitor(mon) { } sstable_mutation_reader(shared_sstable sst, schema_ptr schema, + reader_permit permit, dht::ring_position_view key, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, read_monitor& mon) : mp_row_consumer_reader(std::move(schema), std::move(sst)) - , _consumer(this, _schema, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, _sst) + , _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst) , _single_partition_read(true) , _initialize([this, key = std::move(key), &pc, &slice, fwd_mr] () mutable { position_in_partition_view pos = get_slice_upper_bound(*_schema, slice, key); @@ -479,22 +479,23 @@ void mp_row_consumer_reader::on_next_partition(dht::decorated_key key, tombstone _sst->get_stats().on_partition_read(); } -flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, const io_priority_class& pc, streamed_mutation::forwarding fwd) { +flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, reader_permit permit, const io_priority_class& pc, + streamed_mutation::forwarding fwd) { get_stats().on_sstable_partition_read(); if (_version == version_types::mc) { return make_flat_mutation_reader>( - shared_from_this(), std::move(schema), pc, no_resource_tracking(), tracing::trace_state_ptr(), fwd, default_read_monitor()); + shared_from_this(), std::move(schema), std::move(permit), pc, tracing::trace_state_ptr(), fwd, default_read_monitor()); } - return make_flat_mutation_reader>(shared_from_this(), std::move(schema), pc, - no_resource_tracking(), tracing::trace_state_ptr(), fwd, default_read_monitor()); + return make_flat_mutation_reader>(shared_from_this(), std::move(schema), std::move(permit), pc, + tracing::trace_state_ptr(), fwd, default_read_monitor()); } flat_mutation_reader sstables::sstable::read_row_flat(schema_ptr schema, + reader_permit permit, dht::ring_position_view key, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, read_monitor& mon) @@ -502,19 +503,19 @@ sstables::sstable::read_row_flat(schema_ptr schema, get_stats().on_single_partition_read(); if (_version == version_types::mc) { return make_flat_mutation_reader>( - shared_from_this(), std::move(schema), std::move(key), slice, pc, - std::move(resource_tracker), std::move(trace_state), fwd, mutation_reader::forwarding::no, mon); + shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc, + std::move(trace_state), fwd, mutation_reader::forwarding::no, mon); } - return make_flat_mutation_reader>(shared_from_this(), std::move(schema), std::move(key), slice, pc, - std::move(resource_tracker), std::move(trace_state), fwd, mutation_reader::forwarding::no, mon); + return make_flat_mutation_reader>(shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc, + std::move(trace_state), fwd, mutation_reader::forwarding::no, mon); } flat_mutation_reader sstable::read_range_rows_flat(schema_ptr schema, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, @@ -522,10 +523,10 @@ sstable::read_range_rows_flat(schema_ptr schema, get_stats().on_range_partition_read(); if (_version == version_types::mc) { return make_flat_mutation_reader>( - shared_from_this(), std::move(schema), range, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, fwd_mr, mon); + shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon); } return make_flat_mutation_reader>( - shared_from_this(), std::move(schema), range, slice, pc, std::move(resource_tracker), std::move(trace_state), fwd, fwd_mr, mon); + shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon); } } diff --git a/sstables/row.hh b/sstables/row.hh index 7e1e413fbc..456096131d 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -65,7 +65,7 @@ // row into one buffer, the byte_views remain valid until consume_row_end() // is called.] class row_consumer { - reader_resource_tracker _resource_tracker; + reader_permit _permit; tracing::trace_state_ptr _trace_state; const io_priority_class& _pc; @@ -78,8 +78,8 @@ public: */ constexpr static bool is_setting_range_tombstone_start_supported = false; - row_consumer(reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, const io_priority_class& pc) - : _resource_tracker(resource_tracker) + row_consumer(reader_permit permit, tracing::trace_state_ptr trace_state, const io_priority_class& pc) + : _permit(std::move(permit)) , _trace_state(std::move(trace_state)) , _pc(pc) { } @@ -133,9 +133,9 @@ public: return _pc; } - // The restriction that applies to this consumer - reader_resource_tracker resource_tracker() const { - return _resource_tracker; + // The permit for this read + reader_permit permit() const { + return _permit; } tracing::trace_state_ptr trace_state() const { @@ -144,7 +144,7 @@ public: }; class consumer_m { - reader_resource_tracker _resource_tracker; + reader_permit _permit; tracing::trace_state_ptr _trace_state; const io_priority_class& _pc; public: @@ -162,8 +162,8 @@ public: skip_row }; - consumer_m(reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, const io_priority_class& pc) - : _resource_tracker(resource_tracker) + consumer_m(reader_permit permit, tracing::trace_state_ptr trace_state, const io_priority_class& pc) + : _permit(std::move(permit)) , _trace_state(std::move(trace_state)) , _pc(pc) { } @@ -227,9 +227,9 @@ public: return _pc; } - // The restriction that applies to this consumer - reader_resource_tracker resource_tracker() const { - return _resource_tracker; + // The permit for this read + reader_permit permit() const { + return _permit; } tracing::trace_state_ptr trace_state() const { diff --git a/sstables/sstables.cc b/sstables/sstables.cc index ceef7e9e07..bf080534f9 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -2789,14 +2789,14 @@ component_type sstable::component_from_sstring(version_types v, sstring &s) { } input_stream sstable::data_stream(uint64_t pos, size_t len, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, lw_shared_ptr history) { + reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr history) { file_input_stream_options options; options.buffer_size = sstable_buffer_size; options.io_priority_class = pc; options.read_ahead = 4; options.dynamic_adjustments = std::move(history); - file f = resource_tracker.track(_data_file); + file f = make_tracked_file(_data_file, std::move(permit)); if (trace_state) { f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename())); } @@ -2816,7 +2816,7 @@ input_stream sstable::data_stream(uint64_t pos, size_t len, const io_prior } future> sstable::data_read(uint64_t pos, size_t len, const io_priority_class& pc) { - return do_with(data_stream(pos, len, pc, no_resource_tracking(), tracing::trace_state_ptr(), {}), [len] (auto& stream) { + return do_with(data_stream(pos, len, pc, no_reader_permit(), tracing::trace_state_ptr(), {}), [len] (auto& stream) { return stream.read_exactly(len).finally([&stream] { return stream.close(); }); @@ -3431,6 +3431,7 @@ future<> init_metrics() { mutation_source sstable::as_mutation_source() { return mutation_source([sst = shared_from_this()] (schema_ptr s, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -3442,9 +3443,9 @@ mutation_source sstable::as_mutation_source() { // consequence, fast_forward_to() will *NOT* work on the result, // regardless of what the fwd_mr parameter says. if (range.is_singular() && range.start()->value().has_key()) { - return sst->read_row_flat(s, range.start()->value(), slice, pc, no_resource_tracking(), std::move(trace_state), fwd); + return sst->read_row_flat(s, std::move(permit), range.start()->value(), slice, pc, std::move(trace_state), fwd); } else { - return sst->read_range_rows_flat(s, range, slice, pc, no_resource_tracking(), std::move(trace_state), fwd, fwd_mr); + return sst->read_range_rows_flat(s, std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); } }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index b3268ac427..c01c80f6dd 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -206,34 +206,34 @@ public: // returned in the result. flat_mutation_reader read_row_flat( schema_ptr schema, + reader_permit permit, dht::ring_position_view key, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), - reader_resource_tracker resource_tracker = no_resource_tracking(), tracing::trace_state_ptr trace_state = {}, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, read_monitor& monitor = default_read_monitor()); - flat_mutation_reader read_row_flat(schema_ptr schema, dht::ring_position_view key) { + flat_mutation_reader read_row_flat(schema_ptr schema, reader_permit permit, dht::ring_position_view key) { auto& full_slice = schema->full_slice(); - return read_row_flat(std::move(schema), std::move(key), full_slice); + return read_row_flat(std::move(schema), std::move(permit), std::move(key), full_slice); } // Returns a mutation_reader for given range of partitions flat_mutation_reader read_range_rows_flat( schema_ptr schema, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), - reader_resource_tracker resource_tracker = no_resource_tracking(), tracing::trace_state_ptr trace_state = {}, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes, read_monitor& monitor = default_read_monitor()); - flat_mutation_reader read_range_rows_flat(schema_ptr schema, const dht::partition_range& range) { + flat_mutation_reader read_range_rows_flat(schema_ptr schema, reader_permit permit, const dht::partition_range& range) { auto& full_slice = schema->full_slice(); - return read_range_rows_flat(std::move(schema), range, full_slice); + return read_range_rows_flat(std::move(schema), std::move(permit), range, full_slice); } // read_rows_flat() returns each of the rows in the sstable, in sequence, @@ -248,6 +248,7 @@ public: // as well as the sstable, remains alive as long as a read() is in // progress (i.e., returned a future which hasn't completed yet). flat_mutation_reader read_rows_flat(schema_ptr schema, + reader_permit permit, const io_priority_class& pc = default_priority_class(), streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no); @@ -628,7 +629,7 @@ private: // about the buffer size to read, and where exactly to stop reading // (even when a large buffer size is used). input_stream data_stream(uint64_t pos, size_t len, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, lw_shared_ptr history); + reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr history); // Read exactly the specific byte range from the data file (after // uncompression, if the file is compressed). This can be used to read diff --git a/table.cc b/table.cc index 43811b6efa..ff77296c1f 100644 --- a/table.cc +++ b/table.cc @@ -277,12 +277,12 @@ public: static flat_mutation_reader create_single_key_sstable_reader(column_family* cf, schema_ptr schema, + reader_permit permit, lw_shared_ptr sstables, utils::estimated_histogram& sstable_histogram, const dht::partition_range& pr, // must be singular const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) @@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf, filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice) | boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) { tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd); + return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); }) ); if (readers.empty()) { @@ -303,19 +303,19 @@ create_single_key_sstable_reader(column_family* cf, } flat_mutation_reader make_range_sstable_reader(schema_ptr s, + reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator) { - auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst)); + return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); }; return make_combined_reader(s, std::make_unique(s, std::move(sstables), @@ -349,41 +349,41 @@ table::make_sstable_reader(schema_ptr s, if (dht::shard_of(pos.token()) != engine().cpu_id()) { return mutation_source([] ( schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - reader_resource_tracker tracker) { + mutation_reader::forwarding fwd_mr) { return make_empty_flat_reader(s); // range doesn't belong to this shard }); } return mutation_source([semaphore, this, sstables=std::move(sstables)] ( schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - reader_resource_tracker tracker) { - return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(sstables), - _stats.estimated_sstable_per_read, pr, slice, pc, tracker, std::move(trace_state), fwd, fwd_mr); + mutation_reader::forwarding fwd_mr) { + return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), std::move(sstables), + _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { return mutation_source([semaphore, sstables=std::move(sstables)] ( schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - reader_resource_tracker tracker) { - return make_local_shard_sstable_reader(std::move(s), std::move(sstables), pr, slice, pc, - tracker, std::move(trace_state), fwd, fwd_mr); + mutation_reader::forwarding fwd_mr) { + return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr); }); } }(); @@ -391,7 +391,7 @@ table::make_sstable_reader(schema_ptr s, if (semaphore) { return make_restricted_flat_reader(*semaphore, std::move(ms), std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } else { - return ms.make_reader(std::move(s), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + return ms.make_reader(std::move(s), no_reader_permit(), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } } @@ -440,7 +440,7 @@ table::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { if (_virtual_reader) { - return (*_virtual_reader).make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr); + return (*_virtual_reader).make_reader(s, no_reader_permit(), range, slice, pc, trace_state, fwd, fwd_mr); } std::vector readers; @@ -500,7 +500,7 @@ table::make_streaming_reader(schema_ptr s, auto& slice = s->full_slice(); auto& pc = service::get_local_streaming_read_priority(); - auto source = mutation_source([this] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice, + auto source = mutation_source([this] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { std::vector readers; readers.reserve(_memtables->size() + 1); @@ -574,20 +574,20 @@ static bool belongs_to_other_shard(const std::vector& shards) { } flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, + reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, - reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator) { - auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc, - resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst)); + flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, + trace_state, fwd, fwd_mr, monitor_generator(sst)); if (sst->is_shared()) { using sig = bool (&)(const dht::decorated_key&); reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard)); @@ -1611,6 +1611,7 @@ table::sstables_as_snapshot_source() { return snapshot_source([this] () { auto sst_set = _sstables; return mutation_source([this, sst_set] (schema_ptr s, + reader_permit, const dht::partition_range& r, const query::partition_slice& slice, const io_priority_class& pc, @@ -2439,6 +2440,7 @@ table::query(schema_ptr s, mutation_source table::as_mutation_source() const { return mutation_source([this] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2579,7 +2581,7 @@ future table::do_push_view_replica_updates(const schema std::move(slice), std::move(m), [base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable { - auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); + auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable { // return the local partition/row lock we have taken so it // remains locked until the caller is done modifying this @@ -2601,6 +2603,7 @@ future table::stream_view_replica_updates(const schema_ mutation_source table::as_mutation_source_excluding(sstables::shared_sstable sst) const { return mutation_source([this, sst = std::move(sst)] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/test/boost/broken_sstable_test.cc b/test/boost/broken_sstable_test.cc index d77268fd5e..1679ac39fc 100644 --- a/test/boost/broken_sstable_test.cc +++ b/test/boost/broken_sstable_test.cc @@ -43,7 +43,7 @@ static void broken_sst(sstring dir, unsigned long generation, schema_ptr s, sstr try { sstables::test_env env; sstable_ptr sstp = std::get<0>(env.reusable_sst(s, dir, generation, version).get()); - auto r = sstp->read_rows_flat(s); + auto r = sstp->read_rows_flat(s, no_reader_permit()); r.consume(my_consumer{}, db::no_timeout).get(); BOOST_FAIL("expecting exception"); } catch (malformed_sstable_exception& e) { diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 86a4a1d76e..d76fd39555 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -143,6 +143,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc cf.flush().get(); cf.get_row_cache().invalidate([] {}).get(); return mutation_source([&] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index e388a8994b..a548ff51e7 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -412,7 +412,7 @@ SEASTAR_TEST_CASE(test_multi_range_reader) { return m; })); - auto source = mutation_source([&] (schema_ptr, const dht::partition_range& range) { + auto source = mutation_source([&] (schema_ptr, reader_permit, const dht::partition_range& range) { return flat_mutation_reader_from_mutations(ms, range); }); @@ -745,6 +745,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_mutations_as_mutation_source) auto populate = [] (schema_ptr, const std::vector &muts) { return mutation_source([=] ( schema_ptr schema, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class&, @@ -761,6 +762,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_fragments_as_mutation_source) auto populate = [] (schema_ptr, const std::vector &muts) { return mutation_source([=] ( schema_ptr schema, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class&, diff --git a/test/boost/mutation_fragment_test.cc b/test/boost/mutation_fragment_test.cc index c3b63a302f..c7f89f09e8 100644 --- a/test/boost/mutation_fragment_test.cc +++ b/test/boost/mutation_fragment_test.cc @@ -117,6 +117,7 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) { } return mutation_source([memtables] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 9d865cc915..ec442b735b 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -61,7 +61,7 @@ struct mutation_less_cmp { } }; mutation_source make_source(std::vector mutations) { - return mutation_source([mutations = std::move(mutations)] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice, + return mutation_source([mutations = std::move(mutations)] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { assert(range.is_full()); // slicing not implemented yet for (auto&& m : mutations) { diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 4acc135f97..d6697f0935 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -632,6 +632,7 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { sstable_mutation_readers.emplace_back( sst->as_mutation_source().make_reader( s.schema(), + no_reader_permit(), query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), @@ -645,11 +646,11 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { auto incremental_reader = make_local_shard_sstable_reader( s.schema(), + no_reader_permit(), sstable_set, query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), - no_resource_tracking(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); @@ -906,14 +907,14 @@ class tracking_reader : public flat_mutation_reader::impl { std::size_t _call_count{0}; std::size_t _ff_count{0}; public: - tracking_reader(schema_ptr schema, lw_shared_ptr sst, reader_resource_tracker tracker) + tracking_reader(schema_ptr schema, reader_permit permit, lw_shared_ptr sst) : impl(schema) , _reader(sst->read_range_rows_flat( schema, + permit, query::full_partition_range, schema->full_slice(), default_priority_class(), - tracker, tracing::trace_state_ptr(), streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)) { @@ -972,14 +973,14 @@ public: , _timeout(timeout) { auto ms = mutation_source([this, sst=std::move(sst)] (schema_ptr schema, + reader_permit permit, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding, - reader_resource_tracker res_tracker) { - auto tracker_ptr = std::make_unique(std::move(schema), std::move(sst), res_tracker); + mutation_reader::forwarding) { + auto tracker_ptr = std::make_unique(std::move(schema), std::move(permit), std::move(sst)); _tracker = tracker_ptr.get(); return flat_mutation_reader(std::move(tracker_ptr)); }); @@ -1336,13 +1337,13 @@ SEASTAR_TEST_CASE(test_restricted_reader_as_mutation_source) { auto ms = mt->as_data_source(); return mutation_source([&semaphore, ms = std::move(ms)](schema_ptr schema, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - reader_resource_tracker res_tracker) { + mutation_reader::forwarding fwd_mr) { return make_restricted_flat_reader(semaphore, std::move(ms), std::move(schema), range, slice, pc, tr, fwd, fwd_mr); }); @@ -1383,6 +1384,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin } mutation_source ds = create_sstable(env, s, muts)->as_mutation_source(); readers.push_back(ds.make_reader(s, + no_reader_permit(), dht::partition_range::make({keys[0]}, {keys[0]}), s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, @@ -1457,8 +1459,8 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones { auto slice = partition_slice_builder(*s).with_range(range).build(); - readers.push_back(ds1.make_reader(s, query::full_partition_range, slice)); - readers.push_back(ds2.make_reader(s, query::full_partition_range, slice)); + readers.push_back(ds1.make_reader(s, no_reader_permit(), query::full_partition_range, slice)); + readers.push_back(ds2.make_reader(s, no_reader_permit(), query::full_partition_range, slice)); auto rd = make_combined_reader(s, std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no); @@ -1480,9 +1482,9 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones // Check fast_forward_to() { - readers.push_back(ds1.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(), + readers.push_back(ds1.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); - readers.push_back(ds2.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(), + readers.push_back(ds2.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); auto rd = make_combined_reader(s, std::move(readers), @@ -1600,6 +1602,7 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) { auto reader_factory_ptr = make_lw_shared(std::move(reader_factory)); return mutation_source([reader_factory_ptr] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2118,6 +2121,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) { } return mutation_source([partitioner, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2498,6 +2502,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) { auto& table = db->local().find_column_family(schema); auto reader = table.as_mutation_source().make_reader( schema, + no_reader_permit(), range, slice, service::get_local_sstable_query_read_priority(), @@ -2700,8 +2705,8 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) { auto& table = db->local().find_column_family(s); //TODO need a way to transport io_priority_calls across shards auto& pc = service::get_local_sstable_query_read_priority(); - return table.as_mutation_source().make_reader(std::move(s), range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, - fwd_mr); + return table.as_mutation_source().make_reader(std::move(s), no_reader_permit(), range, slice, pc, std::move(trace_state), + streamed_mutation::forwarding::no, fwd_mr); }; auto reference_reader = make_filtering_reader( make_multishard_combining_reader(seastar::make_shared(std::move(reader_factory)), local_partitioner, diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index eaba959a7a..26da41f27a 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -162,7 +162,7 @@ public: : _sem(reader_concurrency_semaphore::no_limits{}) , _cache(_sem, cache_size, entry_ttl) , _mutations(make_mutations(_s, external_make_value)) - , _mutation_source([this] (schema_ptr, const dht::partition_range& range) { + , _mutation_source([this] (schema_ptr, reader_permit, const dht::partition_range& range) { auto rd = flat_mutation_reader_from_mutations(_mutations, range); rd.set_max_buffer_size(max_reader_buffer_size); return rd; diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 8d4e96cf45..8829704e5d 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -110,7 +110,7 @@ snapshot_source make_decorated_snapshot_source(snapshot_source src, std::functio } mutation_source make_source_with(mutation m) { - return mutation_source([m] (schema_ptr s, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { + return mutation_source([m] (schema_ptr s, reader_permit, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { assert(m.schema() == s); return flat_mutation_reader_from_mutations({m}, std::move(fwd)); }); @@ -210,7 +210,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_full_range) auto s = make_schema(); int secondary_calls_count = 0; cache_tracker tracker; - row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { + row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] ( + schema_ptr s, + reader_permit, + const dht::partition_range& range, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd) { return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); @@ -234,7 +241,14 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_single_part auto s = make_schema(); int secondary_calls_count = 0; cache_tracker tracker; - row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { + row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] ( + schema_ptr s, + reader_permit, + const dht::partition_range& range, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd) { return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); auto range = make_single_partition_range(s, 100); @@ -252,7 +266,14 @@ SEASTAR_TEST_CASE(test_cache_uses_continuity_info_for_single_partition_query) { auto s = make_schema(); int secondary_calls_count = 0; cache_tracker tracker; - row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { + row_cache cache(s, snapshot_source_from_snapshot(mutation_source([&secondary_calls_count] ( + schema_ptr s, + reader_permit, + const dht::partition_range& range, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd) { return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); @@ -274,7 +295,14 @@ void test_cache_delegates_to_underlying_only_once_with_single_partition(schema_p int calls_to_secondary) { int secondary_calls_count = 0; cache_tracker tracker; - row_cache cache(s, snapshot_source_from_snapshot(mutation_source([m, &secondary_calls_count] (schema_ptr s, const dht::partition_range& range, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) { + row_cache cache(s, snapshot_source_from_snapshot(mutation_source([m, &secondary_calls_count] ( + schema_ptr s, + reader_permit, + const dht::partition_range& range, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd) { assert(m.schema() == s); if (range.contains(dht::ring_position(m.decorated_key()), dht::ring_position_comparator(*s))) { return make_counting_reader(flat_mutation_reader_from_mutations({m}, std::move(fwd)), secondary_calls_count); @@ -370,7 +398,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation } auto make_cache = [&tracker, &mt](schema_ptr s, int& secondary_calls_count) -> lw_shared_ptr { - auto secondary = mutation_source([&mt, &secondary_calls_count] (schema_ptr s, const dht::partition_range& range, + auto secondary = mutation_source([&mt, &secondary_calls_count] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { return make_counting_reader(mt->make_flat_reader(s, range, slice, pc, std::move(trace), std::move(fwd)), secondary_calls_count); }); @@ -380,7 +408,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation auto make_ds = [&make_cache](schema_ptr s, int& secondary_calls_count) -> mutation_source { auto cache = make_cache(s, secondary_calls_count); - return mutation_source([cache] (schema_ptr s, const dht::partition_range& range, + return mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd)); }); @@ -388,7 +416,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation auto do_test = [&s, &partitions] (const mutation_source& ds, const dht::partition_range& range, int& secondary_calls_count, int expected_calls) { - assert_that(ds.make_reader(s, range)) + assert_that(ds.make_reader(s, no_reader_permit(), range)) .produces(slice(partitions, range)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(expected_calls, secondary_calls_count); @@ -475,25 +503,25 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation auto range = dht::partition_range::make( {partitions[0].decorated_key(), true}, {partitions[1].decorated_key(), true}); - assert_that(ds.make_reader(s, range)) + assert_that(ds.make_reader(s, no_reader_permit(), range)) .produces(slice(partitions, range)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(3, secondary_calls_count); - assert_that(ds.make_reader(s, range)) + assert_that(ds.make_reader(s, no_reader_permit(), range)) .produces(slice(partitions, range)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(3, secondary_calls_count); auto range2 = dht::partition_range::make( {partitions[0].decorated_key(), true}, {partitions[1].decorated_key(), false}); - assert_that(ds.make_reader(s, range2)) + assert_that(ds.make_reader(s, no_reader_permit(), range2)) .produces(slice(partitions, range2)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(3, secondary_calls_count); auto range3 = dht::partition_range::make( {dht::ring_position::starting_at(key_before_all.token())}, {partitions[2].decorated_key(), false}); - assert_that(ds.make_reader(s, range3)) + assert_that(ds.make_reader(s, no_reader_permit(), range3)) .produces(slice(partitions, range3)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(5, secondary_calls_count); @@ -505,7 +533,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation }; auto cache = make_cache(s, secondary_calls_count); - auto ds = mutation_source([cache] (schema_ptr s, const dht::partition_range& range, + auto ds = mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd)); }); @@ -515,7 +543,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation cache->invalidate([] {}, key_after_all).get(); - assert_that(ds.make_reader(s, query::full_partition_range)) + assert_that(ds.make_reader(s, no_reader_permit(), query::full_partition_range)) .produces(slice(partitions, query::full_partition_range)) .produces_end_of_stream(); BOOST_CHECK_EQUAL(partitions.size() + 2, secondary_calls_count); @@ -693,6 +721,7 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) { auto cache = make_lw_shared(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker); return mutation_source([cache] (schema_ptr s, + reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -761,14 +790,14 @@ SEASTAR_TEST_CASE(test_presence_checker_runs_under_right_allocator) { auto src = snapshot_source([&] { auto ms = underlying(); return mutation_source([ms = std::move(ms)] (schema_ptr s, + reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, - mutation_reader::forwarding mr_fwd, - reader_resource_tracker rrt) mutable { - return ms.make_reader(s, pr, slice, pc, std::move(tr), fwd, mr_fwd, std::move(rrt)); + mutation_reader::forwarding mr_fwd) { + return ms.make_reader(s, std::move(permit), pr, slice, pc, std::move(tr), fwd, mr_fwd); }, [] { return [saved = managed_bytes()] (const dht::decorated_key& key) mutable { // size large enough to defeat the small blob optimization @@ -1227,7 +1256,7 @@ private: flat_mutation_reader make_reader(schema_ptr s, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { - return make_flat_mutation_reader(_throttle, _underlying.make_reader(s, pr, slice, pc, std::move(trace), std::move(fwd))); + return make_flat_mutation_reader(_throttle, _underlying.make_reader(s, no_reader_permit(), pr, slice, pc, std::move(trace), std::move(fwd))); } }; lw_shared_ptr _impl; @@ -1237,7 +1266,7 @@ public: { } operator mutation_source() const { - return mutation_source([impl = _impl] (schema_ptr s, const dht::partition_range& pr, + return mutation_source([impl = _impl] (schema_ptr s, reader_permit, const dht::partition_range& pr, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { return impl->make_reader(std::move(s), pr, slice, pc, std::move(trace), std::move(fwd)); }); diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 20368e0a7b..d2ca1755ab 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -92,7 +92,7 @@ public: return sstables::test(_sst).read_indexes(); } flat_mutation_reader read_rows_flat() { - return _sst->read_rows_flat(_sst->_schema); + return _sst->read_rows_flat(_sst->_schema, no_reader_permit()); } const stats_metadata& get_stats_metadata() const { @@ -103,16 +103,15 @@ public: const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), - reader_resource_tracker resource_tracker = no_resource_tracking(), tracing::trace_state_ptr trace_state = {}, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes, read_monitor& monitor = default_read_monitor()) { return _sst->read_range_rows_flat(_sst->_schema, + no_reader_permit(), range, slice, pc, - std::move(resource_tracker), std::move(trace_state), fwd, fwd_mr, @@ -263,7 +262,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_read) { auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range, UNCOMPRESSED_FILTERING_AND_FORWARDING_SCHEMA->full_slice(), default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); r.produces_partition_start(to_key(1)) @@ -312,7 +310,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_read) { auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range, slice, default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); @@ -523,7 +520,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_skip_using_index_rows) { auto rd = sst.read_range_rows_flat(query::full_partition_range, UNCOMPRESSED_SKIP_USING_INDEX_ROWS_SCHEMA->full_slice(), default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes); rd.set_max_buffer_size(1); @@ -564,7 +560,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_skip_using_index_rows) { auto rd = sst.read_range_rows_flat(query::full_partition_range, slice, default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes); rd.set_max_buffer_size(1); @@ -752,7 +747,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombst auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range, UNCOMPRESSED_FILTERING_AND_FORWARDING_SCHEMA->full_slice(), default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); std::array rt_deletion_times {1534898600, 1534899416}; @@ -890,7 +884,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_filtering_and_forwarding_range_tombst auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range, slice, default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); @@ -1087,7 +1080,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_slicing_interleaved_rows_and_rts_read auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range, UNCOMPRESSED_SLICING_INTERLEAVED_ROWS_AND_RTS_SCHEMA->full_slice(), default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); @@ -1156,7 +1148,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_slicing_interleaved_rows_and_rts_read auto r = make_assertions(sst.read_range_rows_flat(query::full_partition_range, slice, default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); const tombstone tomb = make_tombstone(1525385507816568, 1535592075); @@ -3016,7 +3007,7 @@ static flat_mutation_reader compacted_sstable_reader(test_env& env, schema_ptr s sstables::compact_sstables(sstables::compaction_descriptor(std::move(sstables)), *cf, new_sstable, replacer_fn_no_op()).get(); auto compacted_sst = open_sstable(env, s, tmp.path().string(), new_generation); - return compacted_sst->as_mutation_source().make_reader(s, query::full_partition_range, s->full_slice()); + return compacted_sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice()); } SEASTAR_THREAD_TEST_CASE(compact_deleted_row) { @@ -4765,7 +4756,6 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_read_two_rows_fast_forwarding) { auto r = assert_that(sst.read_range_rows_flat(query::full_partition_range, s->full_slice(), default_priority_class(), - no_resource_tracking(), tracing::trace_state_ptr(), streamed_mutation::forwarding::yes)); r.produces_partition_start(to_pkey(0)) @@ -5113,7 +5103,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_reader_on_unknown_column) { sst->load().get(); BOOST_REQUIRE_EXCEPTION( - assert_that(sst->read_rows_flat(read_schema)) + assert_that(sst->read_rows_flat(read_schema, no_reader_permit())) .produces_partition_start(dk) .produces_row(to_ck(0), {{val2_cdef, int32_type->decompose(int32_t(200))}}) .produces_row(to_ck(1), {{val2_cdef, int32_type->decompose(int32_t(201))}}) diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index eb56cf216e..9572332c42 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, sst, mt, verifier, tomb, &static_set_col, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 11).then([s, verifier, tomb, &static_set_col] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) { auto verify_set = [&tomb] (const collection_mutation_description& m) { BOOST_REQUIRE(bool(m.tomb) == true); @@ -858,7 +858,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) { }); }).then([sstp, s, verifier] { return do_with(make_dkey(s, "key2"), [sstp, s, verifier] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, verifier, rd] (auto mutation) { auto m = verifier(mutation); BOOST_REQUIRE(!m.tomb); @@ -891,7 +891,7 @@ SEASTAR_TEST_CASE(datafile_generation_12) { return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, tomb, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 12).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.row_tombstones().size() == 1); @@ -927,7 +927,7 @@ static future<> sstable_compression_test(compressor_ptr c, unsigned generation) return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tomb, generation, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, generation).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.row_tombstones().size() == 1); @@ -1008,19 +1008,19 @@ static future> open_sstables(test_env& env // mutation_reader for sstable keeping all the required objects alive. static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s) { - return sst->as_mutation_source().make_reader(s, query::full_partition_range, s->full_slice()); + return sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice()); } static flat_mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, const dht::partition_range& pr) { - return sst->as_mutation_source().make_reader(s, pr, s->full_slice()); + return sst->as_mutation_source().make_reader(s, no_reader_permit(), pr, s->full_slice()); } // We don't need to normalize the sstable reader for 'mc' format // because it is naturally normalized now. static flat_mutation_reader make_normalizing_sstable_reader( shared_sstable sst, schema_ptr s, const dht::partition_range& pr) { - auto sstable_reader = sst->as_mutation_source().make_reader(s, pr, s->full_slice()); + auto sstable_reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), pr, s->full_slice()); if (sst->get_version() == sstables::sstable::version_types::mc) { return sstable_reader; } @@ -1419,7 +1419,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) { return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 37).then([s, tmpdir_path] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); @@ -1454,7 +1454,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) { return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 38).then([s] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")}); @@ -1490,7 +1490,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) { return write_memtable_to_sstable_for_test(*mtp, sst).then([&env, s, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 39).then([s] (auto sstp) { return do_with(make_dkey(s, "key1"), [sstp, s] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, rd] (auto mutation) { auto& mp = mutation->partition(); auto& row = mp.clustered_row(*s, clustering_key::make_empty()); @@ -1586,7 +1586,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) { return write_memtable_to_sstable_for_test(*mt, sst).then([&env, s, tomb, tmpdir_path] { return env.reusable_sst(s, tmpdir_path, 41).then([s, tomb] (auto sstp) mutable { return do_with(make_dkey(s, "key1"), [sstp, s, tomb] (auto& key) { - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, tomb, rd] (auto mutation) { auto& mp = mutation->partition(); BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1); @@ -2546,7 +2546,7 @@ SEASTAR_TEST_CASE(sstable_rewrite) { void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, const query::partition_slice& ps, std::vector>> expected) { - auto reader = sst->as_mutation_source().make_reader(s, query::full_partition_range, ps); + auto reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range, ps); partition_key::equality pk_eq(*s); clustering_key::equality ck_eq(*s); @@ -3698,7 +3698,7 @@ SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) { .with_range(query::clustering_range::make_singular(ck2)) .with_range(query::clustering_range::make_singular(ck3)) .build(); - flat_mutation_reader rd = ms.make_reader(table.schema(), query::full_partition_range, slice); + flat_mutation_reader rd = ms.make_reader(table.schema(), no_reader_permit(), query::full_partition_range, slice); assert_that(std::move(rd)).has_monotonic_positions(); } } @@ -3739,6 +3739,7 @@ SEASTAR_TEST_CASE(test_skipping_using_index) { auto ms = as_mutation_source(sst); auto rd = ms.make_reader(table.schema(), + no_reader_permit(), query::full_partition_range, table.schema()->full_slice(), default_priority_class(), @@ -4260,7 +4261,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) { std::set merged; merged.insert(mutations.begin(), mutations.end()); - auto rd = assert_that(sst->as_mutation_source().make_reader(s, query::full_partition_range)); + auto rd = assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range)); auto keys_read = 0; for (auto&& m : merged) { keys_read++; @@ -4270,7 +4271,7 @@ SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) { BOOST_REQUIRE(keys_read == keys_written); auto r = dht::partition_range::make({mutations.back().decorated_key(), true}, {mutations.back().decorated_key(), true}); - assert_that(sst->as_mutation_source().make_reader(s, r)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), r)) .produces(slice(mutations, r)) .produces_end_of_stream(); }); @@ -4532,7 +4533,7 @@ SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) { { auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_singular({ck})).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -4940,7 +4941,7 @@ SEASTAR_TEST_CASE(test_reads_cassandra_static_compact) { m.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("c2"), atomic_cell::make_live(*utf8_type, 1551785032379079, utf8_type->decompose("cde"), {})); - assert_that(sst->as_mutation_source().make_reader(s)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit())) .produces(m) .produces_end_of_stream(); }); @@ -5191,11 +5192,11 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { compacting->insert(std::move(sst)); } auto reader = ::make_range_sstable_reader(s, + no_reader_permit(), compacting, query::full_partition_range, s->full_slice(), service::get_local_compaction_priority(), - no_resource_tracking(), nullptr, ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no); diff --git a/test/boost/sstable_mutation_test.cc b/test/boost/sstable_mutation_test.cc index a1698a3208..52820900fc 100644 --- a/test/boost/sstable_mutation_test.cc +++ b/test/boost/sstable_mutation_test.cc @@ -55,7 +55,7 @@ SEASTAR_THREAD_TEST_CASE(nonexistent_key) { env.reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([] (auto sstp) { return do_with(make_dkey(uncompressed_schema(), "invalid_key"), [sstp] (auto& key) { auto s = uncompressed_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return (*rd)(db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(!mutation); return make_ready_future<>(); @@ -68,7 +68,7 @@ future<> test_no_clustered(sstables::test_env& env, bytes&& key, std::unordered_ return env.reusable_sst(uncompressed_schema(), uncompressed_dir(), 1).then([k = std::move(key), map = std::move(map)] (auto sstp) mutable { return do_with(make_dkey(uncompressed_schema(), std::move(k)), [sstp, map = std::move(map)] (auto& key) { auto s = uncompressed_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd, map = std::move(map)] (auto mutation) { BOOST_REQUIRE(mutation); auto& mp = mutation->partition(); @@ -143,7 +143,7 @@ future generate_clustered(sstables::test_env& env, bytes&& key) { return env.reusable_sst(complex_schema(), "test/resource/sstables/complex", Generation).then([k = std::move(key)] (auto sstp) mutable { return do_with(make_dkey(complex_schema(), std::move(k)), [sstp] (auto& key) { auto s = complex_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(mutation); return std::move(*mutation); @@ -343,7 +343,7 @@ future<> test_range_reads(sstables::test_env& env, const dht::token& min, const auto stop = make_lw_shared(false); return do_with(dht::partition_range::make(dht::ring_position::starting_at(min), dht::ring_position::ending_at(max)), [&, sstp, s] (auto& pr) { - auto mutations = make_lw_shared(sstp->read_range_rows_flat(s, pr)); + auto mutations = make_lw_shared(sstp->read_range_rows_flat(s, no_reader_permit(), pr)); return do_until([stop] { return *stop; }, // Note: The data in the following lambda, including // "mutations", continues to live until after the last @@ -475,7 +475,7 @@ SEASTAR_TEST_CASE(test_sstable_can_write_and_read_range_tombstone) { sstables::sstable::format_types::big); write_memtable_to_sstable_for_test(*mt, sst).get(); sst->load().get(); - auto mr = sst->read_rows_flat(s); + auto mr = sst->read_rows_flat(s, no_reader_permit()); auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0(); BOOST_REQUIRE(bool(mut)); auto& rts = mut->partition().row_tombstones(); @@ -496,7 +496,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_sparse_read) { env.reusable_sst(compact_sparse_schema(), "test/resource/sstables/compact_sparse", 1).then([] (auto sstp) { return do_with(make_dkey(compact_sparse_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_sparse_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { BOOST_REQUIRE(mutation); auto& mp = mutation->partition(); @@ -515,7 +515,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_simple_dense_read) { env.reusable_sst(compact_simple_dense_schema(), "test/resource/sstables/compact_simple_dense", 1).then([] (auto sstp) { return do_with(make_dkey(compact_simple_dense_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_simple_dense_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { auto& mp = mutation->partition(); @@ -536,7 +536,7 @@ SEASTAR_THREAD_TEST_CASE(compact_storage_dense_read) { env.reusable_sst(compact_dense_schema(), "test/resource/sstables/compact_dense", 1).then([] (auto sstp) { return do_with(make_dkey(compact_dense_schema(), "first_row"), [sstp] (auto& key) { auto s = compact_dense_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { auto& mp = mutation->partition(); @@ -560,7 +560,7 @@ SEASTAR_THREAD_TEST_CASE(broken_ranges_collection) { sstables::test_env env; env.reusable_sst(peers_schema(), "test/resource/sstables/broken_ranges", 2).then([] (auto sstp) { auto s = peers_schema(); - auto reader = make_lw_shared(sstp->as_mutation_source().make_reader(s, query::full_partition_range)); + auto reader = make_lw_shared(sstp->as_mutation_source().make_reader(s, no_reader_permit(), query::full_partition_range)); return repeat([s, reader] { return read_mutation_from_flat_mutation_reader(*reader, db::no_timeout).then([s, reader] (mutation_opt mut) { auto key_equal = [s, &mut] (sstring ip) { @@ -628,7 +628,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone) { auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); }); ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 1).then([] (auto sstp) { auto s = tombstone_overlap_schema(); - return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { + return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { @@ -692,7 +692,7 @@ SEASTAR_THREAD_TEST_CASE(range_tombstone_reading) { auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); }); ka_sst(tombstone_overlap_schema(), "test/resource/sstables/tombstone_overlap", 4).then([] (auto sstp) { auto s = tombstone_overlap_schema(); - return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { + return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { @@ -770,7 +770,7 @@ SEASTAR_THREAD_TEST_CASE(tombstone_in_tombstone2) { auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); }); ka_sst(tombstone_overlap_schema2(), "test/resource/sstables/tombstone_overlap", 3).then([] (auto sstp) { auto s = tombstone_overlap_schema2(); - return do_with(sstp->read_rows_flat(s), [sstp, s] (auto& reader) { + return do_with(sstp->read_rows_flat(s, no_reader_permit()), [sstp, s] (auto& reader) { return repeat([sstp, s, &reader] { return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([s] (mutation_opt mut) { if (!mut) { @@ -850,7 +850,7 @@ static schema_ptr buffer_overflow_schema() { SEASTAR_THREAD_TEST_CASE(buffer_overflow) { auto s = buffer_overflow_schema(); auto sstp = ka_sst(s, "test/resource/sstables/buffer_overflow", 5).get0(); - auto r = sstp->read_rows_flat(s); + auto r = sstp->read_rows_flat(s, no_reader_permit()); auto pk1 = partition_key::from_exploded(*s, { int32_type->decompose(4) }); auto dk1 = dht::global_partitioner().decorate_key(*s, pk1); auto pk2 = partition_key::from_exploded(*s, { int32_type->decompose(3) }); @@ -910,7 +910,7 @@ SEASTAR_TEST_CASE(test_non_compound_table_row_is_not_marked_as_static) { sstables::sstable::format_types::big); write_memtable_to_sstable_for_test(*mt, sst).get(); sst->load().get(); - auto mr = sst->read_rows_flat(s); + auto mr = sst->read_rows_flat(s, no_reader_permit()); auto mut = read_mutation_from_flat_mutation_reader(mr, db::no_timeout).get0(); BOOST_REQUIRE(bool(mut)); } @@ -949,7 +949,7 @@ SEASTAR_TEST_CASE(test_has_partition_key) { dht::decorated_key dk(dht::global_partitioner().decorate_key(*s, k)); auto hk = sstables::sstable::make_hashed_key(*s, dk.key()); sst->load().get(); - auto mr = sst->read_rows_flat(s); + auto mr = sst->read_rows_flat(s, no_reader_permit()); auto res = sst->has_partition_key(hk, dk).get0(); BOOST_REQUIRE(bool(res)); @@ -1073,7 +1073,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) { { auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -1133,7 +1133,7 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) { { auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck1})).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -1186,7 +1186,7 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) { { auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_starting_with({ck})).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -1231,7 +1231,7 @@ SEASTAR_TEST_CASE(test_range_tombstones_are_correctly_seralized_for_non_compound { auto slice = partition_slice_builder(*s).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -1313,7 +1313,7 @@ SEASTAR_TEST_CASE(test_can_write_and_read_non_compound_range_tombstone_as_compou { auto slice = partition_slice_builder(*s).build(); - assert_that(sst->as_mutation_source().make_reader(s, dht::partition_range::make_singular(dk), slice)) + assert_that(sst->as_mutation_source().make_reader(s, no_reader_permit(), dht::partition_range::make_singular(dk), slice)) .produces(m) .produces_end_of_stream(); } @@ -1556,7 +1556,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations) auto t0 = std::chrono::steady_clock::now(); auto large_allocs_before = memory::stats().large_allocations(); - auto sst_reader = sst->as_mutation_source().make_reader(s, pr); + auto sst_reader = sst->as_mutation_source().make_reader(s, no_reader_permit(), pr); mutation actual = *read_mutation_from_flat_mutation_reader(sst_reader, db::no_timeout).get0(); auto large_allocs_after = memory::stats().large_allocations(); auto duration = std::chrono::steady_clock::now() - t0; @@ -1604,14 +1604,14 @@ SEASTAR_THREAD_TEST_CASE(test_schema_changes) { } auto mr = assert_that(created_with_base_schema->as_mutation_source() - .make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice())); + .make_reader(changed, no_reader_permit(), dht::partition_range::make_open_ended_both_sides(), changed->full_slice())); for (auto& m : changed_mutations) { mr.produces(m); } mr.produces_end_of_stream(); mr = assert_that(created_with_changed_schema->as_mutation_source() - .make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice())); + .make_reader(changed, no_reader_permit(), dht::partition_range::make_open_ended_both_sides(), changed->full_slice())); for (auto& m : changed_mutations) { mr.produces(m); } diff --git a/test/boost/sstable_resharding_test.cc b/test/boost/sstable_resharding_test.cc index f3aa99e9ac..350ff571b9 100644 --- a/test/boost/sstable_resharding_test.cc +++ b/test/boost/sstable_resharding_test.cc @@ -132,7 +132,7 @@ void run_sstable_resharding_test() { auto shard = shards.front(); BOOST_REQUIRE(column_family_test::calculate_shard_from_sstable_generation(new_sst->generation()) == shard); - auto rd = assert_that(new_sst->as_mutation_source().make_reader(s)); + auto rd = assert_that(new_sst->as_mutation_source().make_reader(s, no_reader_permit())); BOOST_REQUIRE(muts[shard].size() == keys_per_shard); for (auto k : boost::irange(0u, keys_per_shard)) { rd.produces(muts[shard][k]); diff --git a/test/boost/sstable_test.cc b/test/boost/sstable_test.cc index 3f94ab79e0..29515624f5 100644 --- a/test/boost/sstable_test.cc +++ b/test/boost/sstable_test.cc @@ -339,7 +339,7 @@ public: int count_row_end = 0; test_row_consumer(int64_t t) - : row_consumer(no_resource_tracking(), tracing::trace_state_ptr() + : row_consumer(no_reader_permit(), tracing::trace_state_ptr() , default_priority_class()), desired_timestamp(t) { } @@ -460,7 +460,7 @@ public: int count_range_tombstone = 0; count_row_consumer() - : row_consumer(no_resource_tracking(), tracing::trace_state_ptr(), default_priority_class()) { + : row_consumer(no_reader_permit(), tracing::trace_state_ptr(), default_priority_class()) { } virtual proceed consume_row_start(sstables::key_view key, sstables::deletion_time deltime) override { @@ -838,7 +838,7 @@ SEASTAR_TEST_CASE(wrong_range) { return test_using_reusable_sst(uncompressed_schema(), "test/resource/sstables/wrongrange", 114, [] (auto sstp) { return do_with(make_dkey(uncompressed_schema(), "todata"), [sstp] (auto& key) { auto s = columns_schema(); - auto rd = make_lw_shared(sstp->read_row_flat(s, key)); + auto rd = make_lw_shared(sstp->read_row_flat(s, no_reader_permit(), key)); return read_mutation_from_flat_mutation_reader(*rd, db::no_timeout).then([sstp, s, &key, rd] (auto mutation) { return make_ready_future<>(); }); @@ -1017,7 +1017,7 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri return seastar::async([sstp, s, key, ck1, ck2] () mutable { auto ps = make_partition_slice(*s, ck1, ck2); auto dkey = make_dkey(s, key.c_str()); - auto rd = sstp->read_row_flat(s, dkey, ps); + auto rd = sstp->read_row_flat(s, no_reader_permit(), dkey, ps); auto mfopt = rd(db::no_timeout).get0(); if (!mfopt) { return 0; @@ -1038,7 +1038,7 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key, sstri static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key) { return seastar::async([sstp, s, key] () mutable { auto dkey = make_dkey(s, key.c_str()); - auto rd = sstp->read_row_flat(s, dkey); + auto rd = sstp->read_row_flat(s, no_reader_permit(), dkey); auto mfopt = rd(db::no_timeout).get0(); if (!mfopt) { return 0; @@ -1060,7 +1060,7 @@ static future count_rows(sstable_ptr sstp, schema_ptr s, sstring key) { static future count_rows(sstable_ptr sstp, schema_ptr s, sstring ck1, sstring ck2) { return seastar::async([sstp, s, ck1, ck2] () mutable { auto ps = make_partition_slice(*s, ck1, ck2); - auto reader = sstp->read_range_rows_flat(s, query::full_partition_range, ps); + auto reader = sstp->read_range_rows_flat(s, no_reader_permit(), query::full_partition_range, ps); int nrows = 0; auto mfopt = reader(db::no_timeout).get0(); while (mfopt) { diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index dcc395521f..fb48ed0749 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -175,7 +175,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) { auto test_common = [&] (const query::partition_slice& slice) { BOOST_TEST_MESSAGE("Read whole partitions at once"); auto pranges_walker = partition_range_walker(pranges); - auto mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice, + auto mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { @@ -201,7 +201,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) { BOOST_TEST_MESSAGE("Read partitions with fast-forwarding to each individual row"); pranges_walker = partition_range_walker(pranges); - mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice, + mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes, fwd_mr); actual = assert_that(std::move(mr)); for (auto& expected : mutations) { @@ -237,14 +237,14 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) { test_common(slice); BOOST_TEST_MESSAGE("Test monotonic positions"); - auto mr = ms.make_reader(s.schema(), query::full_partition_range, slice, + auto mr = ms.make_reader(s.schema(), no_reader_permit(), query::full_partition_range, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr); assert_that(std::move(mr)).has_monotonic_positions(); if (range_size != 1) { BOOST_TEST_MESSAGE("Read partitions fast-forwarded to the range of interest"); auto pranges_walker = partition_range_walker(pranges); - mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice, + mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { @@ -284,7 +284,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) { BOOST_TEST_MESSAGE("Read partitions with just static rows"); auto pranges_walker = partition_range_walker(pranges); - mr = ms.make_reader(s.schema(), pranges_walker.initial_range(), slice, + mr = ms.make_reader(s.schema(), no_reader_permit(), pranges_walker.initial_range(), slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { @@ -311,7 +311,7 @@ static void test_slicing_and_fast_forwarding(populate_fn_ex populate) { test_common(slice); BOOST_TEST_MESSAGE("Test monotonic positions"); - auto mr = ms.make_reader(s.schema(), query::full_partition_range, slice, + auto mr = ms.make_reader(s.schema(), no_reader_permit(), query::full_partition_range, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, fwd_mr); assert_that(std::move(mr)).has_monotonic_positions(); } @@ -383,10 +383,10 @@ static void test_streamed_mutation_forwarding_is_consistent_with_slicing(populat mutation_source ms = populate(m.schema(), {m}, gc_clock::now()); flat_mutation_reader sliced_reader = - ms.make_reader(m.schema(), prange, slice_with_ranges); + ms.make_reader(m.schema(), no_reader_permit(), prange, slice_with_ranges); flat_mutation_reader fwd_reader = - ms.make_reader(m.schema(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes); + ms.make_reader(m.schema(), no_reader_permit(), prange, full_slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes); std::optional builder{}; struct consumer { @@ -474,6 +474,7 @@ static void test_streamed_mutation_forwarding_guarantees(populate_fn_ex populate auto new_stream = [&ms, s, &m] () -> flat_reader_assertions { BOOST_TEST_MESSAGE("Creating new streamed_mutation"); auto res = assert_that(ms.make_reader(s, + no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(), @@ -609,6 +610,7 @@ static void test_fast_forwarding_across_partitions_to_empty_range(populate_fn_ex auto pr = dht::partition_range::make({keys[0]}, {keys[1]}); auto rd = assert_that(ms.make_reader(s, + no_reader_permit(), pr, s->full_slice(), default_priority_class(), @@ -710,7 +712,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu )) .build(); - auto rd = assert_that(ms.make_reader(s, pr, slice)); + auto rd = assert_that(ms.make_reader(s, no_reader_permit(), pr, slice)); rd.produces_partition_start(m.decorated_key()); rd.produces_row_with_key(keys[2]); @@ -729,7 +731,7 @@ static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(popu )) .build(); - auto rd = assert_that(ms.make_reader(s, pr, slice)); + auto rd = assert_that(ms.make_reader(s, no_reader_permit(), pr, slice)); rd.produces_partition_start(m.decorated_key()) .produces_range_tombstone(rt3, slice.row_ranges(*s, m.key())) @@ -784,6 +786,7 @@ static void test_streamed_mutation_forwarding_across_range_tombstones(populate_f mutation_source ms = populate(s, std::vector({m}), gc_clock::now()); auto rd = assert_that(ms.make_reader(s, + no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(), @@ -867,7 +870,7 @@ static void test_range_queries(populate_fn_ex populate) { auto test_slice = [&] (dht::partition_range r) { BOOST_TEST_MESSAGE(format("Testing range {}", r)); - assert_that(ds.make_reader(s, r)) + assert_that(ds.make_reader(s, no_reader_permit(), r)) .produces(slice(partitions, r)) .produces_end_of_stream(); }; @@ -1008,7 +1011,7 @@ static void test_date_tiered_clustering_slicing(populate_fn_ex populate) { .with_range(ss.make_ckey_range(1, 2)) .build(); auto prange = dht::partition_range::make_singular(pkey); - assert_that(ms.make_reader(s, prange, slice)) + assert_that(ms.make_reader(s, no_reader_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s, pkey.key())) .produces_end_of_stream(); } @@ -1018,7 +1021,7 @@ static void test_date_tiered_clustering_slicing(populate_fn_ex populate) { .with_range(query::clustering_range::make_singular(ss.make_ckey(0))) .build(); auto prange = dht::partition_range::make_singular(pkey); - assert_that(ms.make_reader(s, prange, slice)) + assert_that(ms.make_reader(s, no_reader_permit(), prange, slice)) .produces(m1) .produces_end_of_stream(); } @@ -1105,14 +1108,14 @@ static void test_clustering_slices(populate_fn_ex populate) { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(0))) .build(); - assert_that(ds.make_reader(s, pr, slice)) + assert_that(ds.make_reader(s, no_reader_permit(), pr, slice)) .produces_eos_or_empty_mutation(); } { auto slice = partition_slice_builder(*s) .build(); - auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); + auto rd = assert_that(ds.make_reader(s, no_reader_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); rd.produces_partition_start(pk) .fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2))) .produces_row_with_key(ck1) @@ -1123,7 +1126,7 @@ static void test_clustering_slices(populate_fn_ex populate) { { auto slice = partition_slice_builder(*s) .build(); - auto rd = assert_that(ds.make_reader(s, pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); + auto rd = assert_that(ds.make_reader(s, no_reader_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes)); rd.produces_partition_start(pk) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2))) @@ -1135,7 +1138,7 @@ static void test_clustering_slices(populate_fn_ex populate) { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(1))) .build(); - assert_that(ds.make_reader(s, pr, slice)) + assert_that(ds.make_reader(s, no_reader_permit(), pr, slice)) .produces(row1 + row2 + row3 + row4 + row5 + del_1, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } @@ -1143,7 +1146,7 @@ static void test_clustering_slices(populate_fn_ex populate) { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(2))) .build(); - assert_that(ds.make_reader(s, pr, slice)) + assert_that(ds.make_reader(s, no_reader_permit(), pr, slice)) .produces(row6 + row7 + del_1 + del_2, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } @@ -1152,7 +1155,7 @@ static void test_clustering_slices(populate_fn_ex populate) { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(1, 2))) .build(); - assert_that(ds.make_reader(s, pr, slice)) + assert_that(ds.make_reader(s, no_reader_permit(), pr, slice)) .produces(row3 + row4 + del_1, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } @@ -1161,7 +1164,7 @@ static void test_clustering_slices(populate_fn_ex populate) { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(3))) .build(); - assert_that(ds.make_reader(s, pr, slice)) + assert_that(ds.make_reader(s, no_reader_permit(), pr, slice)) .produces(row8 + del_3, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } @@ -1169,12 +1172,12 @@ static void test_clustering_slices(populate_fn_ex populate) { // Test out-of-range partition keys { auto pr = dht::partition_range::make_singular(keys[0]); - assert_that(ds.make_reader(s, pr, s->full_slice())) + assert_that(ds.make_reader(s, no_reader_permit(), pr, s->full_slice())) .produces_eos_or_empty_mutation(); } { auto pr = dht::partition_range::make_singular(keys[2]); - assert_that(ds.make_reader(s, pr, s->full_slice())) + assert_that(ds.make_reader(s, no_reader_permit(), pr, s->full_slice())) .produces_eos_or_empty_mutation(); } } @@ -1195,7 +1198,7 @@ static void test_query_only_static_row(populate_fn_ex populate) { // fully populate cache { auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); - assert_that(ms.make_reader(s.schema(), prange, s.schema()->full_slice())) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, s.schema()->full_slice())) .produces(m1) .produces_end_of_stream(); } @@ -1206,7 +1209,7 @@ static void test_query_only_static_row(populate_fn_ex populate) { .with_ranges({}) .build(); auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); - assert_that(ms.make_reader(s.schema(), prange, slice)) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } @@ -1217,7 +1220,7 @@ static void test_query_only_static_row(populate_fn_ex populate) { .with_ranges({}) .build(); auto prange = dht::partition_range::make_singular(m1.decorated_key()); - assert_that(ms.make_reader(s.schema(), prange, slice)) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } @@ -1237,7 +1240,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop { auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); - assert_that(ms.make_reader(s.schema(), prange, s.schema()->full_slice())) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, s.schema()->full_slice())) .produces(m1) .produces_end_of_stream(); } @@ -1248,7 +1251,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop .with_ranges({}) .build(); auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); - assert_that(ms.make_reader(s.schema(), prange, slice)) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } @@ -1259,7 +1262,7 @@ static void test_query_no_clustering_ranges_no_static_columns(populate_fn_ex pop .with_ranges({}) .build(); auto prange = dht::partition_range::make_singular(m1.decorated_key()); - assert_that(ms.make_reader(s.schema(), prange, slice)) + assert_that(ms.make_reader(s.schema(), no_reader_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } @@ -1275,6 +1278,7 @@ void test_streamed_mutation_forwarding_succeeds_with_no_data(populate_fn_ex popu auto source = populate(s.schema(), {m}, gc_clock::now()); assert_that(source.make_reader(s.schema(), + no_reader_permit(), query::full_partition_range, s.schema()->full_slice(), default_priority_class(), @@ -1323,7 +1327,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn_ex populate) { { auto slice = partition_slice_builder(*s).with_range(range).build(); - auto rd = ds.make_reader(s, query::full_partition_range, slice); + auto rd = ds.make_reader(s, no_reader_permit(), query::full_partition_range, slice); auto prange = position_range(range); mutation result(m1.schema(), m1.decorated_key()); @@ -1341,7 +1345,7 @@ void test_slicing_with_overlapping_range_tombstones(populate_fn_ex populate) { // Check fast_forward_to() { - auto rd = ds.make_reader(s, query::full_partition_range, s->full_slice(), default_priority_class(), + auto rd = ds.make_reader(s, no_reader_permit(), query::full_partition_range, s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes); auto prange = position_range(range); diff --git a/test/perf/perf_sstable.hh b/test/perf/perf_sstable.hh index e393cb4aa5..e0a800cb87 100644 --- a/test/perf/perf_sstable.hh +++ b/test/perf/perf_sstable.hh @@ -199,7 +199,7 @@ public: } future read_sequential_partitions(int idx) { - return do_with(_sst[0]->read_rows_flat(s), [this] (flat_mutation_reader& r) { + return do_with(_sst[0]->read_rows_flat(s, no_reader_permit()), [this] (flat_mutation_reader& r) { auto start = perf_sstable_test_env::now(); auto total = make_lw_shared(0); auto done = make_lw_shared(false);