From fe024cecdc84462ea9be67fdc888ec85a3c83ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 15 Apr 2020 15:34:19 +0300 Subject: [PATCH] row_cache: pass a valid permit to underlying read All reader are soon going to require a valid permit, so make sure we have a valid permit which we can pass to the underlying reader when creating it. This means `row_cache::make_reader()` now also requires a permit to be passed to it. --- read_context.hh | 4 + row_cache.cc | 5 +- row_cache.hh | 5 +- table.cc | 2 +- test/boost/cache_flat_mutation_reader_test.cc | 3 +- test/boost/row_cache_test.cc | 246 +++++++++--------- test/perf/perf_row_cache_update.cc | 2 +- test/unit/row_cache_alloc_stress_test.cc | 7 +- test/unit/row_cache_stress_test.cc | 2 +- 9 files changed, 142 insertions(+), 134 deletions(-) diff --git a/read_context.hh b/read_context.hh index 5cfb6cbb18..a6eb8a1c13 100644 --- a/read_context.hh +++ b/read_context.hh @@ -120,6 +120,7 @@ public: class read_context final : public enable_lw_shared_from_this { row_cache& _cache; schema_ptr _schema; + reader_permit _permit; const dht::partition_range& _range; const query::partition_slice& _slice; const io_priority_class& _pc; @@ -144,6 +145,7 @@ class read_context final : public enable_lw_shared_from_this { public: read_context(row_cache& cache, schema_ptr schema, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -151,6 +153,7 @@ public: mutation_reader::forwarding fwd_mr) : _cache(cache) , _schema(std::move(schema)) + , _permit(std::move(permit)) , _range(range) , _slice(slice) , _pc(pc) @@ -176,6 +179,7 @@ public: read_context(const read_context&) = delete; row_cache& cache() { return _cache; } const schema_ptr& schema() const { return _schema; } + reader_permit permit() const { return _permit; } const dht::partition_range& range() const { return _range; } const query::partition_slice& slice() const { return _slice; } const io_priority_class& pc() const { return _pc; } diff --git a/row_cache.cc b/row_cache.cc index e07bf359d7..15287a5fa7 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -47,7 +47,7 @@ using namespace cache; flat_mutation_reader row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) { ctx.on_underlying_created(); - return src.make_reader(_schema, no_reader_permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes); + return src.make_reader(_schema, ctx.permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes); } static thread_local mutation_application_stats dummy_app_stats; @@ -737,6 +737,7 @@ row_cache::make_scanning_reader(const dht::partition_range& range, lw_shared_ptr flat_mutation_reader row_cache::make_reader(schema_ptr s, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -744,7 +745,7 @@ row_cache::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - auto ctx = make_lw_shared(*this, s, range, slice, pc, trace_state, fwd_mr); + auto ctx = make_lw_shared(*this, s, std::move(permit), range, slice, pc, trace_state, fwd_mr); if (!ctx->is_range_query() && !fwd_mr) { auto mr = _read_section(_tracker.region(), [&] { diff --git a/row_cache.hh b/row_cache.hh index 3dd90fac4e..1a69da3d97 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -490,6 +490,7 @@ public: // as long as the reader is used. // The range must not wrap around. flat_mutation_reader make_reader(schema_ptr, + reader_permit permit, const dht::partition_range&, const query::partition_slice&, const io_priority_class& = default_priority_class(), @@ -497,9 +498,9 @@ public: streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); - flat_mutation_reader make_reader(schema_ptr s, const dht::partition_range& range = query::full_partition_range) { + flat_mutation_reader make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range = query::full_partition_range) { auto& full_slice = s->full_slice(); - return make_reader(std::move(s), range, full_slice); + return make_reader(std::move(s), std::move(permit), range, full_slice); } const stats& stats() const { return _stats; } diff --git a/table.cc b/table.cc index d1b5e48164..6a22080650 100644 --- a/table.cc +++ b/table.cc @@ -472,7 +472,7 @@ table::make_reader(schema_ptr s, } if (cache_enabled() && !slice.options.contains(query::partition_slice::option::bypass_cache)) { - readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(_cache.make_reader(s, permit, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } else { readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } diff --git a/test/boost/cache_flat_mutation_reader_test.cc b/test/boost/cache_flat_mutation_reader_test.cc index 01d1842ad9..25e02ad367 100644 --- a/test/boost/cache_flat_mutation_reader_test.cc +++ b/test/boost/cache_flat_mutation_reader_test.cc @@ -36,6 +36,7 @@ #include "test/lib/memtable_snapshot_source.hh" #include "test/lib/mutation_assertions.hh" #include "test/lib/flat_mutation_reader_assertions.hh" +#include "test/lib/reader_permit.hh" #include @@ -230,7 +231,7 @@ void test_slice_single_version(mutation& underlying, try { auto range = dht::partition_range::make_singular(DK); - auto reader = cache.make_reader(SCHEMA, range, slice); + auto reader = cache.make_reader(SCHEMA, tests::make_permit(), range, slice); check_produces_only(DK, std::move(reader), expected_sm_fragments, slice.row_ranges(*SCHEMA, DK.key())); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index cc03ef721a..3db65c9f50 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -125,7 +125,7 @@ snapshot_source snapshot_source_from_snapshot(mutation_source src) { bool has_key(row_cache& cache, const dht::decorated_key& key) { auto range = dht::partition_range::make_singular(key); - auto reader = cache.make_reader(cache.schema(), range); + auto reader = cache.make_reader(cache.schema(), tests::make_permit(), range); auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); if (!bool(mo)) { return false; @@ -143,7 +143,7 @@ void verify_does_not_have(row_cache& cache, const dht::decorated_key& key) { void verify_has(row_cache& cache, const mutation& m) { auto range = dht::partition_range::make_singular(m.decorated_key()); - auto reader = cache.make_reader(cache.schema(), range); + auto reader = cache.make_reader(cache.schema(), tests::make_permit(), range); assert_that(std::move(reader)).next_mutation().is_equal_to(m); } @@ -155,7 +155,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying) { cache_tracker tracker; row_cache cache(s, snapshot_source_from_snapshot(make_source_with(m)), tracker); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -169,13 +169,13 @@ SEASTAR_TEST_CASE(test_cache_works_after_clearing) { cache_tracker tracker; row_cache cache(s, snapshot_source_from_snapshot(make_source_with(m)), tracker); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(m) .produces_end_of_stream(); tracker.clear(); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(m) .produces_end_of_stream(); }); @@ -221,10 +221,10 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_full_range) return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); }); @@ -252,10 +252,10 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_empty_single_part return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); auto range = make_single_partition_range(s, 100); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces_eos_or_empty_mutation(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); }); @@ -277,13 +277,13 @@ SEASTAR_TEST_CASE(test_cache_uses_continuity_info_for_single_partition_query) { return make_counting_reader(make_empty_flat_reader(s), secondary_calls_count); })), tracker); - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); auto range = make_single_partition_range(s, 100); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, 1); }); @@ -311,11 +311,11 @@ void test_cache_delegates_to_underlying_only_once_with_single_partition(schema_p } })), tracker); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces(m) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, calls_to_secondary); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces(m) .produces_end_of_stream(); BOOST_REQUIRE_EQUAL(secondary_calls_count, calls_to_secondary); @@ -410,7 +410,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation auto cache = make_cache(s, secondary_calls_count); return mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { - return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd)); + return cache->make_reader(s, tests::make_permit(), range, slice, pc, std::move(trace), std::move(fwd)); }); }; @@ -535,7 +535,7 @@ SEASTAR_TEST_CASE(test_cache_delegates_to_underlying_only_once_multiple_mutation auto cache = make_cache(s, secondary_calls_count); auto ds = mutation_source([cache] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace, streamed_mutation::forwarding fwd) { - return cache->make_reader(s, range, slice, pc, std::move(trace), std::move(fwd)); + return cache->make_reader(s, tests::make_permit(), range, slice, pc, std::move(trace), std::move(fwd)); }); test(ds, query::full_partition_range, partitions.size() + 1); @@ -582,26 +582,26 @@ SEASTAR_TEST_CASE(test_query_of_incomplete_range_goes_to_underlying) { auto key2_range = get_partition_range(mutations[2]); // Populate cache for first key - assert_that(cache.make_reader(s, key0_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key0_range)) .produces(mutations[0]) .produces_end_of_stream(); // Populate cache for last key - assert_that(cache.make_reader(s, key2_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key2_range)) .produces(mutations[2]) .produces_end_of_stream(); // Test single-key queries - assert_that(cache.make_reader(s, key0_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key0_range)) .produces(mutations[0]) .produces_end_of_stream(); - assert_that(cache.make_reader(s, key2_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key2_range)) .produces(mutations[2]) .produces_end_of_stream(); // Test range query - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(mutations[0]) .produces(mutations[1]) .produces(mutations[2]) @@ -633,15 +633,15 @@ SEASTAR_TEST_CASE(test_single_key_queries_after_population_in_reverse_order) { auto key2_range = get_partition_range(mutations[2]); for (int i = 0; i < 2; ++i) { - assert_that(cache.make_reader(s, key2_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key2_range)) .produces(mutations[2]) .produces_end_of_stream(); - assert_that(cache.make_reader(s, key1_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key1_range)) .produces(mutations[1]) .produces_end_of_stream(); - assert_that(cache.make_reader(s, key0_range)) + assert_that(cache.make_reader(s, tests::make_permit(), key0_range)) .produces(mutations[0]) .produces_end_of_stream(); } @@ -678,7 +678,7 @@ SEASTAR_TEST_CASE(test_partition_range_population_with_concurrent_memtable_flush { auto pr = dht::partition_range::make_singular(query::ring_position(mutations[1].decorated_key())); - assert_that(cache.make_reader(s, pr)) + assert_that(cache.make_reader(s, tests::make_permit(), pr)) .produces(mutations[1]) .produces_end_of_stream(); } @@ -686,7 +686,7 @@ SEASTAR_TEST_CASE(test_partition_range_population_with_concurrent_memtable_flush { auto pr = dht::partition_range::make_ending_with( {query::ring_position(mutations[2].decorated_key()), true}); - assert_that(cache.make_reader(s, pr)) + assert_that(cache.make_reader(s, tests::make_permit(), pr)) .produces(mutations[0]) .produces(mutations[1]) .produces(mutations[2]) @@ -696,7 +696,7 @@ SEASTAR_TEST_CASE(test_partition_range_population_with_concurrent_memtable_flush cache.invalidate([]{}).get(); { - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(mutations[0]) .produces(mutations[1]) .produces(mutations[2]) @@ -728,7 +728,7 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return cache->make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return cache->make_reader(s, tests::make_permit(), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); }); }); @@ -761,7 +761,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) { cache.populate(m1); // m1 is supposed to have random continuity and populate() should preserve it - auto rd1 = cache.make_reader(gen.schema()); + auto rd1 = cache.make_reader(gen.schema(), tests::make_permit()); rd1.fill_buffer(db::no_timeout).get(); // Merge m2 into cache @@ -769,7 +769,7 @@ SEASTAR_TEST_CASE(test_reading_from_random_partial_partition) { mt->apply(m2); cache.update([&] { underlying.apply(m2); }, *mt).get(); - auto rd2 = cache.make_reader(gen.schema()); + auto rd2 = cache.make_reader(gen.schema(), tests::make_permit()); rd2.fill_buffer(db::no_timeout).get(); assert_that(std::move(rd1)).next_mutation().is_equal_to(m1); @@ -830,7 +830,7 @@ SEASTAR_TEST_CASE(test_random_partition_population) { row_cache cache(gen.schema(), snapshot_source([&] { return underlying(); }), tracker); - assert_that(cache.make_reader(gen.schema())) + assert_that(cache.make_reader(gen.schema(), tests::make_permit())) .produces(m1) .produces_end_of_stream(); @@ -839,7 +839,7 @@ SEASTAR_TEST_CASE(test_random_partition_population) { }).get(); auto pr = dht::partition_range::make_singular(m2.decorated_key()); - assert_that(cache.make_reader(gen.schema(), pr)) + assert_that(cache.make_reader(gen.schema(), tests::make_permit(), pr)) .produces(m1 + m2) .produces_end_of_stream(); }); @@ -865,7 +865,7 @@ SEASTAR_TEST_CASE(test_eviction) { for (auto&& key : keys) { auto pr = dht::partition_range::make_singular(key); - auto rd = cache.make_reader(s, pr); + auto rd = cache.make_reader(s, tests::make_permit(), pr); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); } @@ -903,7 +903,7 @@ SEASTAR_TEST_CASE(test_eviction_from_invalidated) { std::shuffle(keys.begin(), keys.end(), std::default_random_engine(random())); for (auto&& key : keys) { - cache.make_reader(s, dht::partition_range::make_singular(key)); + cache.make_reader(s, tests::make_permit(), dht::partition_range::make_singular(key)); } cache.invalidate([] {}).get(); @@ -934,7 +934,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) { { auto pr = dht::partition_range::make_singular(m.decorated_key()); - auto rd = cache.make_reader(s2, pr); + auto rd = cache.make_reader(s2, tests::make_permit(), pr); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); } @@ -1008,7 +1008,7 @@ SEASTAR_TEST_CASE(test_single_partition_update) { .with_range(query::clustering_range::make_ending_with(ck1)) .with_range(query::clustering_range::make_starting_with(ck4)) .build(); - auto reader = cache.make_reader(s, range, slice); + auto reader = cache.make_reader(s, tests::make_permit(), range, slice); test_sliced_read_row_presence(std::move(reader), s, {1, 4, 7}); } @@ -1021,7 +1021,7 @@ SEASTAR_TEST_CASE(test_single_partition_update) { }, *mt).get(); { - auto reader = cache.make_reader(s, range); + auto reader = cache.make_reader(s, tests::make_permit(), range); test_sliced_read_row_presence(std::move(reader), s, {1, 2, 3, 4, 7}); } @@ -1170,7 +1170,7 @@ SEASTAR_TEST_CASE(test_update_failure) { memory_hog.clear(); auto has_only = [&] (const partitions_type& partitions) { - auto reader = cache.make_reader(s, query::full_partition_range); + auto reader = cache.make_reader(s, tests::make_permit(), query::full_partition_range); for (int i = 0; i < partition_count; i++) { auto mopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); if (!mopt) { @@ -1296,13 +1296,13 @@ SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) { // Bring ring[2]and ring[3] to cache. auto range = dht::partition_range::make_starting_with({ ring[2].ring_position(), true }); - assert_that(cache.make_reader(s, range)) + assert_that(cache.make_reader(s, tests::make_permit(), range)) .produces(ring[2]) .produces(ring[3]) .produces_end_of_stream(); // Start reader with full range. - auto rd = assert_that(cache.make_reader(s, query::full_partition_range)); + auto rd = assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)); rd.produces(ring[0]); // Invalidate ring[2] and ring[3] @@ -1315,7 +1315,7 @@ SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) { .produces_end_of_stream(); // Start another reader with full range. - rd = assert_that(cache.make_reader(s, query::full_partition_range)); + rd = assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)); rd.produces(ring[0]) .produces(ring[1]) .produces(ring[2]); @@ -1327,7 +1327,7 @@ SEASTAR_TEST_CASE(test_continuity_flag_and_invalidate_race) { .produces_end_of_stream(); // Start yet another reader with full range. - assert_that(cache.make_reader(s, query::full_partition_range)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range)) .produces(ring[0]) .produces(ring[1]) .produces(ring[2]) @@ -1364,11 +1364,11 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { thr.block(); auto m0_range = dht::partition_range::make_singular(ring[0].ring_position()); - auto rd1 = cache.make_reader(s, m0_range); + auto rd1 = cache.make_reader(s, tests::make_permit(), m0_range); rd1.set_max_buffer_size(1); auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout); - auto rd2 = cache.make_reader(s); + auto rd2 = cache.make_reader(s, tests::make_permit()); rd2.set_max_buffer_size(1); auto rd2_fill_buffer = rd2.fill_buffer(db::no_timeout); @@ -1379,7 +1379,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { mt2_copy->apply(*mt2, tests::make_permit()).get(); auto update_future = cache.update([&] { memtables.apply(mt2_copy); }, *mt2); - auto rd3 = cache.make_reader(s); + auto rd3 = cache.make_reader(s, tests::make_permit()); // rd2, which is in progress, should not prevent forward progress of update() thr.unblock(); @@ -1406,7 +1406,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) { .produces_end_of_stream(); // Reads started after flush should see new data - assert_that(cache.make_reader(s)) + assert_that(cache.make_reader(s, tests::make_permit())) .produces(ring2[0]) .produces(ring2[1]) .produces(ring2[2]) @@ -1499,7 +1499,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { thr.block(); - auto rd1 = cache.make_reader(s); + auto rd1 = cache.make_reader(s, tests::make_permit()); rd1.set_max_buffer_size(1); auto rd1_fill_buffer = rd1.fill_buffer(db::no_timeout); @@ -1510,7 +1510,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { memtables.apply(mt2); }); - auto rd2 = cache.make_reader(s); + auto rd2 = cache.make_reader(s, tests::make_permit()); // rd1, which is in progress, should not prevent forward progress of clear() thr.unblock(); @@ -1535,7 +1535,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) { .produces_end_of_stream(); // Reads started after clear should see new data - assert_that(cache.make_reader(s)) + assert_that(cache.make_reader(s, tests::make_permit())) .produces(ring2[0]) .produces(ring2[1]) .produces(ring2[2]) @@ -1560,10 +1560,10 @@ SEASTAR_TEST_CASE(test_mvcc) { auto pk = m1.key(); cache.populate(m1); - auto rd1 = cache.make_reader(s); + auto rd1 = cache.make_reader(s, tests::make_permit()); rd1.fill_buffer(db::no_timeout).get(); - auto rd2 = cache.make_reader(s); + auto rd2 = cache.make_reader(s, tests::make_permit()); rd2.fill_buffer(db::no_timeout).get(); auto mt1 = make_lw_shared(s); @@ -1582,13 +1582,13 @@ SEASTAR_TEST_CASE(test_mvcc) { mt1_copy->apply(*mt1, tests::make_permit()).get(); cache.update([&] { underlying.apply(mt1_copy); }, *mt1).get(); - auto rd3 = cache.make_reader(s); + auto rd3 = cache.make_reader(s, tests::make_permit()); rd3.fill_buffer(db::no_timeout).get(); - auto rd4 = cache.make_reader(s); + auto rd4 = cache.make_reader(s, tests::make_permit()); rd4.fill_buffer(db::no_timeout).get(); - auto rd5 = cache.make_reader(s); + auto rd5 = cache.make_reader(s, tests::make_permit()); rd5.fill_buffer(db::no_timeout).get(); assert_that(std::move(rd3)).has_monotonic_positions(); @@ -1657,21 +1657,21 @@ SEASTAR_TEST_CASE(test_slicing_mutation_reader) { auto run_tests = [&] (auto& ps, std::deque expected) { cache.invalidate([] {}).get0(); - auto reader = cache.make_reader(s, query::full_partition_range, ps); + auto reader = cache.make_reader(s, tests::make_permit(), query::full_partition_range, ps); test_sliced_read_row_presence(std::move(reader), s, expected); - reader = cache.make_reader(s, query::full_partition_range, ps); + reader = cache.make_reader(s, tests::make_permit(), query::full_partition_range, ps); test_sliced_read_row_presence(std::move(reader), s, expected); auto dk = dht::decorate_key(*s, pk); auto singular_range = dht::partition_range::make_singular(dk); - reader = cache.make_reader(s, singular_range, ps); + reader = cache.make_reader(s, tests::make_permit(), singular_range, ps); test_sliced_read_row_presence(std::move(reader), s, expected); cache.invalidate([] {}).get0(); - reader = cache.make_reader(s, singular_range, ps); + reader = cache.make_reader(s, tests::make_permit(), singular_range, ps); test_sliced_read_row_presence(std::move(reader), s, expected); }; @@ -1755,7 +1755,7 @@ SEASTAR_TEST_CASE(test_lru) { } auto pr = dht::partition_range::make_ending_with(dht::ring_position(partitions[2].decorated_key())); - auto rd = cache.make_reader(s, pr); + auto rd = cache.make_reader(s, tests::make_permit(), pr); assert_that(std::move(rd)) .produces(partitions[0]) .produces(partitions[1]) @@ -1765,7 +1765,7 @@ SEASTAR_TEST_CASE(test_lru) { evict_one_partition(tracker); pr = dht::partition_range::make_ending_with(dht::ring_position(partitions[4].decorated_key())); - rd = cache.make_reader(s, pr); + rd = cache.make_reader(s, tests::make_permit(), pr); assert_that(std::move(rd)) .produces(partitions[0]) .produces(partitions[1]) @@ -1774,14 +1774,14 @@ SEASTAR_TEST_CASE(test_lru) { .produces_end_of_stream(); pr = dht::partition_range::make_singular(dht::ring_position(partitions[5].decorated_key())); - rd = cache.make_reader(s, pr); + rd = cache.make_reader(s, tests::make_permit(), pr); assert_that(std::move(rd)) .produces(partitions[5]) .produces_end_of_stream(); evict_one_partition(tracker); - rd = cache.make_reader(s); + rd = cache.make_reader(s, tests::make_permit()); assert_that(std::move(rd)) .produces(partitions[0]) .produces(partitions[1]) @@ -1817,7 +1817,7 @@ SEASTAR_TEST_CASE(test_update_invalidating) { row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m1) .produces(m2) .produces_end_of_stream(); @@ -1836,7 +1836,7 @@ SEASTAR_TEST_CASE(test_update_invalidating) { mt_copy->apply(*mt, tests::make_permit()).get(); cache.update_invalidating([&] { underlying.apply(mt_copy); }, *mt).get(); - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m5) .produces(m1 + m3) .produces(m4) @@ -1880,7 +1880,7 @@ SEASTAR_TEST_CASE(test_scan_with_partial_partitions) { .with_range(query::clustering_range::make_ending_with(s.make_ckey(1))) .build(); auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); - assert_that(cache.make_reader(s.schema(), prange, slice)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } @@ -1891,20 +1891,20 @@ SEASTAR_TEST_CASE(test_scan_with_partial_partitions) { .with_range(query::clustering_range::make_ending_with(s.make_ckey(1))) .build(); auto prange = dht::partition_range::make_singular(m3.decorated_key()); - assert_that(cache.make_reader(s.schema(), prange, slice)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), prange, slice)) .produces(m3, slice.row_ranges(*s.schema(), m3.key())) .produces_end_of_stream(); } // full scan - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m1) .produces(m2) .produces(m3) .produces_end_of_stream(); // full scan after full scan - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m1) .produces(m2) .produces(m3) @@ -1935,23 +1935,23 @@ SEASTAR_TEST_CASE(test_cache_populates_partition_tombstone) { // singular range case { auto prange = dht::partition_range::make_singular(dht::ring_position(m1.decorated_key())); - assert_that(cache.make_reader(s.schema(), prange)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), prange)) .produces(m1) .produces_end_of_stream(); - assert_that(cache.make_reader(s.schema(), prange)) // over populated + assert_that(cache.make_reader(s.schema(), tests::make_permit(), prange)) // over populated .produces(m1) .produces_end_of_stream(); } // range scan case { - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m1) .produces(m2) .produces_end_of_stream(); - assert_that(cache.make_reader(s.schema())) // over populated + assert_that(cache.make_reader(s.schema(), tests::make_permit())) // over populated .produces(m1) .produces(m2) .produces_end_of_stream(); @@ -1995,7 +1995,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) { .with_range(query::clustering_range::make_singular(s.make_ckey(4))) .build(); - assert_that(cache.make_reader(s.schema(), pr, slice)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr, slice)) .produces(m1 + m2, slice.row_ranges(*s.schema(), pk.key())) .produces_end_of_stream(); } @@ -2005,11 +2005,11 @@ SEASTAR_TEST_CASE(test_tombstone_merging_in_partial_partition) { .with_range(query::clustering_range::make_starting_with(s.make_ckey(4))) .build(); - assert_that(cache.make_reader(s.schema(), pr, slice)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr, slice)) .produces(m1 + m2, slice.row_ranges(*s.schema(), pk.key())) .produces_end_of_stream(); - assert_that(cache.make_reader(s.schema(), pr, slice)).has_monotonic_positions(); + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr, slice)).has_monotonic_positions(); } }); } @@ -2023,7 +2023,7 @@ static void populate_range(row_cache& cache, const query::clustering_range& r = query::full_clustering_range) { auto slice = partition_slice_builder(*cache.schema()).with_range(r).build(); - auto rd = cache.make_reader(cache.schema(), pr, slice); + auto rd = cache.make_reader(cache.schema(), tests::make_permit(), pr, slice); consume_all(rd); } @@ -2063,7 +2063,7 @@ SEASTAR_TEST_CASE(test_readers_get_all_data_after_eviction) { }; auto make_reader = [&] (const query::partition_slice& slice) { - auto rd = cache.make_reader(s, query::full_partition_range, slice); + auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); @@ -2113,7 +2113,7 @@ SEASTAR_TEST_CASE(test_single_tombstone_with_small_buffer) { populate_range(cache); - auto rd = cache.make_reader(s.schema(), pr); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr); rd.set_max_buffer_size(1); assert_that(std::move(rd)).produces_partition_start(pk) @@ -2145,7 +2145,7 @@ SEASTAR_TEST_CASE(test_tombstone_and_row_with_small_buffer) { populate_range(cache); - auto rd = cache.make_reader(s.schema(), pr); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr); rd.set_max_buffer_size(1); assert_that(std::move(rd)).produces_partition_start(pk) @@ -2189,7 +2189,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_not_missed_when_range_is_invalidated) { row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); auto make_reader = [&] (const query::partition_slice& slice) { - auto rd = cache.make_reader(s.schema(), pr, slice); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr, slice); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); @@ -2294,7 +2294,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) { }), tracker); auto make_reader = [&] (const dht::partition_range& pr) { - auto rd = cache.make_reader(s.schema(), pr); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -2328,7 +2328,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) { injector.cancel(); - assert_that(cache.make_reader(cache.schema())) + assert_that(cache.make_reader(cache.schema(), tests::make_permit())) .produces(muts2) .produces_end_of_stream(); @@ -2339,7 +2339,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) { .produces_end_of_stream(); } catch (const std::bad_alloc&) { // expected - assert_that(cache.make_reader(cache.schema())) + assert_that(cache.make_reader(cache.schema(), tests::make_permit())) .produces(orig) .produces_end_of_stream(); @@ -2374,14 +2374,14 @@ SEASTAR_TEST_CASE(test_exception_safety_of_reads) { while (true) { try { injector.fail_after(i++); - auto rd = cache.make_reader(s, query::full_partition_range, slice); + auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice); auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(got_opt); BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0()); injector.cancel(); assert_that(*got_opt).is_equal_to(mut, ranges); - assert_that(cache.make_reader(s, query::full_partition_range, slice)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice)) .produces(mut, ranges); if (!injector.failed()) { @@ -2397,7 +2397,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_reads) { auto slice = partition_slice_builder(*s).with_ranges(gen.make_random_ranges(3)).build(); auto&& ranges = slice.row_ranges(*s, mut.key()); injector.fail_after(0); - assert_that(cache.make_reader(s, query::full_partition_range, slice)) + assert_that(cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice)) .produces(mut, ranges); injector.cancel(); }; @@ -2450,7 +2450,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_transitioning_from_underlying_read_to populate_range(cache, pr, s.make_ckey_range(6, 10)); injector.fail_after(i++); - auto rd = cache.make_reader(s.schema(), pr, slice); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr, slice); auto got_opt = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0(); BOOST_REQUIRE(got_opt); auto mfopt = rd(db::no_timeout).get0(); @@ -2497,7 +2497,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_partition_scan) { populate_range(cache, dht::partition_range::make({pkeys[3]}, {pkeys[5]})); injector.fail_after(i++); - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(muts) .produces_end_of_stream(); injector.cancel(); @@ -2525,7 +2525,7 @@ SEASTAR_TEST_CASE(test_concurrent_population_before_latest_version_iterator) { row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); auto make_reader = [&] (const query::partition_slice& slice) { - auto rd = cache.make_reader(s.schema(), pr, slice); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr, slice); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); @@ -2625,10 +2625,10 @@ SEASTAR_TEST_CASE(test_concurrent_populating_partition_range_reads) { populate_range(cache, dht::partition_range::make_singular({keys[6]})); // FIXME: When readers have buffering across partitions, limit buffering to 1 - auto rd1 = assert_that(cache.make_reader(s.schema(), range1)); + auto rd1 = assert_that(cache.make_reader(s.schema(), tests::make_permit(), range1)); rd1.produces(muts[0]); - auto rd2 = assert_that(cache.make_reader(s.schema(), range2)); + auto rd2 = assert_that(cache.make_reader(s.schema(), tests::make_permit(), range2)); rd2.produces(muts[4]); rd1.produces(muts[1]); @@ -2687,7 +2687,7 @@ SEASTAR_TEST_CASE(test_random_row_population) { row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); auto make_reader = [&] (const query::partition_slice* slice = nullptr) { - auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice()); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr, slice ? *slice : s.schema()->full_slice()); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -2802,7 +2802,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver }; auto make_reader = [&] { - auto rd = cache.make_reader(s.schema(), pr); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -2837,7 +2837,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver populate_range(cache, pr, query::full_clustering_range); check_continuous(cache, pr, query::full_clustering_range); - assert_that(cache.make_reader(s.schema(), pr)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr)) .produces(m1 + m2) .produces_end_of_stream(); } @@ -2856,7 +2856,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver populate_range(cache, pr, query::full_clustering_range); check_continuous(cache, pr, query::full_clustering_range); - assert_that(cache.make_reader(s.schema(), pr)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr)) .produces(m1 + m2 + m3) .produces_end_of_stream(); } @@ -2874,7 +2874,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver populate_range(cache, pr, query::full_clustering_range); check_continuous(cache, pr, query::full_clustering_range); - assert_that(cache.make_reader(s.schema(), pr)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr)) .produces_compacted(m1 + m2 + m3 + m4, gc_clock::now()) .produces_end_of_stream(); } @@ -2930,7 +2930,7 @@ SEASTAR_TEST_CASE(test_continuity_population_with_multicolumn_clustering_key) { }; auto make_reader = [&] (const query::partition_slice* slice = nullptr) { - auto rd = cache.make_reader(s, pr, slice ? *slice : s->full_slice()); + auto rd = cache.make_reader(s, tests::make_permit(), pr, slice ? *slice : s->full_slice()); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -2957,14 +2957,14 @@ SEASTAR_TEST_CASE(test_continuity_population_with_multicolumn_clustering_key) { .produces_partition_end() .produces_end_of_stream(); - assert_that(cache.make_reader(s, pr)) + assert_that(cache.make_reader(s, tests::make_permit(), pr)) .produces_compacted(m1 + m2, gc_clock::now()) .produces_end_of_stream(); auto slice34 = partition_slice_builder(*s) .with_range(range_3_4) .build(); - assert_that(cache.make_reader(s, pr, slice34)) + assert_that(cache.make_reader(s, tests::make_permit(), pr, slice34)) .produces_compacted(m34, gc_clock::now()) .produces_end_of_stream(); } @@ -3009,7 +3009,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_for_single_row_reads) { populate_range(cache, pr, query::clustering_range::make_singular(s.make_ckey(7))); check_continuous(cache, pr, query::clustering_range::make_singular(s.make_ckey(7))); - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces_compacted(m1, gc_clock::now()) .produces_end_of_stream(); }); @@ -3037,7 +3037,7 @@ SEASTAR_TEST_CASE(test_concurrent_setting_of_continuity_on_read_upper_bound) { row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); auto make_rd = [&] (const query::partition_slice* slice = nullptr) { - auto rd = cache.make_reader(s.schema(), pr, slice ? *slice : s.schema()->full_slice()); + auto rd = cache.make_reader(s.schema(), tests::make_permit(), pr, slice ? *slice : s.schema()->full_slice()); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -3071,7 +3071,7 @@ SEASTAR_TEST_CASE(test_concurrent_setting_of_continuity_on_read_upper_bound) { // FIXME: [1, 2] will not be continuous due to concurrent population. // check_continuous(cache, pr, s.make_ckey_range(0, 4)); - assert_that(cache.make_reader(s.schema(), pr)) + assert_that(cache.make_reader(s.schema(), tests::make_permit(), pr)) .produces(m1 + m2) .produces_end_of_stream(); } @@ -3101,7 +3101,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi row_cache cache(s.schema(), snapshot_source([&] { return underlying(); }), tracker); auto make_reader = [&] { - auto rd = cache.make_reader(s.schema()); + auto rd = cache.make_reader(s.schema(), tests::make_permit()); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -3114,7 +3114,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi apply(cache, underlying, m2); - assert_that(cache.make_reader(s.schema())) + assert_that(cache.make_reader(s.schema(), tests::make_permit())) .produces(m1 + m2) .produces_end_of_stream(); }); @@ -3139,7 +3139,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) { auto pr = dht::partition_range::make_singular(m0.decorated_key()); auto make_reader = [&] (const query::partition_slice& slice) { - auto rd = cache.make_reader(s, pr, slice); + auto rd = cache.make_reader(s, tests::make_permit(), pr, slice); rd.set_max_buffer_size(3); rd.fill_buffer(db::no_timeout).get(); return rd; @@ -3222,7 +3222,7 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) { done = true; readers.get(); - assert_that(cache.make_reader(s)) + assert_that(cache.make_reader(s, tests::make_permit())) .produces(versions.back()); }); } @@ -3250,7 +3250,7 @@ SEASTAR_TEST_CASE(test_alter_then_preempted_update_then_memtable_read) { auto pr = dht::partition_range::make_singular(m.decorated_key()); // Populate the cache so that update has an entry to update. - assert_that(cache.make_reader(s, pr)).produces(m); + assert_that(cache.make_reader(s, tests::make_permit(), pr)).produces(m); auto mt2 = make_lw_shared(s); mt2->apply(m2); @@ -3274,7 +3274,7 @@ SEASTAR_TEST_CASE(test_alter_then_preempted_update_then_memtable_read) { auto mt2_reader = mt2->make_flat_reader(s, tests::make_permit(), pr, s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); - auto cache_reader = cache.make_reader(s, pr, s->full_slice(), default_priority_class(), + auto cache_reader = cache.make_reader(s, tests::make_permit(), pr, s->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); assert_that(std::move(mt2_reader)).produces(m2); @@ -3283,7 +3283,7 @@ SEASTAR_TEST_CASE(test_alter_then_preempted_update_then_memtable_read) { wait_for_update.cancel(); update_f.get(); - assert_that(cache.make_reader(s)).produces(m + m2); + assert_that(cache.make_reader(s, tests::make_permit())).produces(m + m2); }); } @@ -3322,13 +3322,13 @@ SEASTAR_TEST_CASE(test_cache_update_and_eviction_preserves_monotonicity_of_memta assert_that(std::move(mt_rd1)) .produces(m1); - auto c_rd1 = cache.make_reader(s); + auto c_rd1 = cache.make_reader(s, tests::make_permit()); c_rd1.set_max_buffer_size(1); c_rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); - auto c_rd2 = cache.make_reader(s); + auto c_rd2 = cache.make_reader(s, tests::make_permit()); c_rd2.set_max_buffer_size(1); c_rd2.fill_buffer(db::no_timeout).get(); @@ -3354,7 +3354,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker); { - auto rd = cache.make_reader(s); + auto rd = cache.make_reader(s, tests::make_permit()); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); @@ -3363,14 +3363,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto slice = s->full_slice(); slice.options.set(); - auto rd = cache.make_reader(s, query::full_partition_range, slice); + auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { - auto rd = cache.make_reader(s); + auto rd = cache.make_reader(s, tests::make_permit()); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); @@ -3381,7 +3381,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { cache.update([&] { }, *mt).get(); { - auto rd = cache.make_reader(s); + auto rd = cache.make_reader(s, tests::make_permit()); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); @@ -3390,14 +3390,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) { { auto slice = s->full_slice(); slice.options.set(); - auto rd = cache.make_reader(s, query::full_partition_range, slice); + auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { - auto rd = cache.make_reader(s); + auto rd = cache.make_reader(s, tests::make_permit()); rd(db::no_timeout).get0()->as_partition_start(); clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row()); BOOST_REQUIRE(row.cells().cell_hash_for(0)); @@ -3423,7 +3423,7 @@ SEASTAR_TEST_CASE(test_random_population_with_many_versions) { row_cache cache(s, snapshot_source([&] { return underlying(); }), tracker); auto make_reader = [&] () { - auto rd = cache.make_reader(s, query::full_partition_range, s->full_slice()); + auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, s->full_slice()); rd.set_max_buffer_size(1); rd.fill_buffer(db::no_timeout).get(); return assert_that(std::move(rd)); @@ -3491,7 +3491,7 @@ SEASTAR_TEST_CASE(test_static_row_is_kept_alive_by_reads_with_no_clustering_rang auto slice = partition_slice_builder(*s) .with_ranges({}) .build(); - assert_that(cache.make_reader(s, dht::partition_range::make_singular(keys[0]), slice)) + assert_that(cache.make_reader(s, tests::make_permit(), dht::partition_range::make_singular(keys[0]), slice)) .produces(m1); } @@ -3530,14 +3530,14 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_ populate_range(cache); auto pr1 = dht::partition_range::make_singular(pk); - auto rd1 = cache.make_reader(s, pr1); + auto rd1 = cache.make_reader(s, tests::make_permit(), pr1); rd1.set_max_buffer_size(1); rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); auto pr2 = dht::partition_range::make_singular(pk); - auto rd2 = cache.make_reader(s, pr2); + auto rd2 = cache.make_reader(s, tests::make_permit(), pr2); rd2.set_max_buffer_size(1); auto rd1_a = assert_that(std::move(rd1)); @@ -3571,13 +3571,13 @@ SEASTAR_TEST_CASE(test_eviction_after_old_snapshot_touches_overriden_rows_keeps_ populate_range(cache, pr, query::clustering_range::make_singular(table.make_ckey(1))); populate_range(cache, pr, query::clustering_range::make_singular(table.make_ckey(2))); - auto rd1 = cache.make_reader(s, pr); + auto rd1 = cache.make_reader(s, tests::make_permit(), pr); rd1.set_max_buffer_size(1); rd1.fill_buffer(db::no_timeout).get(); apply(cache, underlying, m2); - auto rd2 = cache.make_reader(s, pr); + auto rd2 = cache.make_reader(s, tests::make_permit(), pr); rd2.set_max_buffer_size(1); auto rd1_a = assert_that(std::move(rd1)); @@ -3615,7 +3615,7 @@ SEASTAR_TEST_CASE(test_reading_progress_with_small_buffer_and_invalidation) { populate_range(cache, pkr, s.make_ckey_range(4, 7)); populate_range(cache, pkr, s.make_ckey_range(3, 7)); - auto rd3 = cache.make_reader(s.schema(), pkr); + auto rd3 = cache.make_reader(s.schema(), tests::make_permit(), pkr); rd3.set_max_buffer_size(1); while (!rd3.is_end_of_stream()) { diff --git a/test/perf/perf_row_cache_update.cc b/test/perf/perf_row_cache_update.cc index 6e53c7c2f4..e4d37fad12 100644 --- a/test/perf/perf_row_cache_update.cc +++ b/test/perf/perf_row_cache_update.cc @@ -153,7 +153,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) { // Create a reader which tests the case of memtable snapshots // going away after memtable was merged to cache. auto rd = std::make_unique( - make_combined_reader(s, cache.make_reader(s), mt->make_flat_reader(s, tests::make_permit()))); + make_combined_reader(s, cache.make_reader(s, tests::make_permit()), mt->make_flat_reader(s, tests::make_permit()))); rd->set_max_buffer_size(1); rd->fill_buffer(db::no_timeout).get(); diff --git a/test/unit/row_cache_alloc_stress_test.cc b/test/unit/row_cache_alloc_stress_test.cc index bbbc78512c..f362a5e7ea 100644 --- a/test/unit/row_cache_alloc_stress_test.cc +++ b/test/unit/row_cache_alloc_stress_test.cc @@ -30,6 +30,7 @@ #include "log.hh" #include "schema_builder.hh" #include "memtable.hh" +#include "test/lib/reader_permit.hh" static partition_key new_key(schema_ptr s) { @@ -186,7 +187,7 @@ int main(int argc, char** argv) { // Verify that all mutations from memtable went through for (auto&& key : keys) { auto range = dht::partition_range::make_singular(key); - auto reader = cache.make_reader(s, range); + auto reader = cache.make_reader(s, tests::make_permit(), range); auto mo = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0(); assert(mo); assert(mo->partition().live_row_count(*s) == @@ -203,7 +204,7 @@ int main(int argc, char** argv) { for (auto&& key : keys) { auto range = dht::partition_range::make_singular(key); - auto reader = cache.make_reader(s, range); + auto reader = cache.make_reader(s, tests::make_permit(), range); auto mfopt = reader(db::no_timeout).get0(); assert(mfopt); assert(mfopt->is_partition_start()); @@ -241,7 +242,7 @@ int main(int argc, char** argv) { } try { - auto reader = cache.make_reader(s, range); + auto reader = cache.make_reader(s, tests::make_permit(), range); assert(!reader(db::no_timeout).get0()); auto evicted_from_cache = logalloc::segment_size + large_cell_size; // GCC's -fallocation-dce can remove dead calls to new and malloc, so diff --git a/test/unit/row_cache_stress_test.cc b/test/unit/row_cache_stress_test.cc index c056be02b0..81d0a90299 100644 --- a/test/unit/row_cache_stress_test.cc +++ b/test/unit/row_cache_stress_test.cc @@ -153,7 +153,7 @@ struct table { } rd.push_back(mt->make_flat_reader(s.schema(), tests::make_permit(), r->pr, r->slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no)); - rd.push_back(cache.make_reader(s.schema(), r->pr, r->slice, default_priority_class(), nullptr, + rd.push_back(cache.make_reader(s.schema(), tests::make_permit(), r->pr, r->slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no)); r->rd = make_combined_reader(s.schema(), std::move(rd), streamed_mutation::forwarding::no, mutation_reader::forwarding::no); return r;