test: row_cache_test: close readers

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-02-03 16:35:56 +02:00
parent f9ae50483f
commit db66a39b3e

View File

@@ -869,6 +869,7 @@ SEASTAR_TEST_CASE(test_eviction) {
for (auto&& key : keys) {
auto pr = dht::partition_range::make_singular(key);
auto rd = cache.make_reader(s, tests::make_permit(), pr);
auto close_rd = deferred_close(rd);
rd.set_max_buffer_size(1);
rd.fill_buffer(db::no_timeout).get();
}
@@ -906,7 +907,7 @@ SEASTAR_TEST_CASE(test_eviction_from_invalidated) {
std::shuffle(keys.begin(), keys.end(), random);
for (auto&& key : keys) {
cache.make_reader(s, tests::make_permit(), dht::partition_range::make_singular(key));
cache.make_reader(s, tests::make_permit(), dht::partition_range::make_singular(key)).close().get();
}
cache.invalidate(row_cache::external_updater([] {})).get();
@@ -950,6 +951,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) {
{
auto pr = dht::partition_range::make_singular(m.decorated_key());
auto rd = cache.make_reader(s2, tests::make_permit(), pr);
auto close_rd = deferred_close(rd);
rd.set_max_buffer_size(1);
rd.fill_buffer(db::no_timeout).get();
}
@@ -966,6 +968,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) {
void test_sliced_read_row_presence(flat_mutation_reader reader, schema_ptr s, std::deque<int> expected)
{
auto close_reader = deferred_close(reader);
clustering_key::equality ck_eq(*s);
auto mfopt = reader(db::no_timeout).get0();
@@ -1588,7 +1591,11 @@ SEASTAR_TEST_CASE(test_mvcc) {
auto m12 = m1 + m2;
flat_mutation_reader_opt mt1_reader_opt;
auto close_mt1_reader = defer([&mt1_reader_opt] { mt1_reader_opt->close().get(); });
auto close_mt1_reader = defer([&mt1_reader_opt] {
if (mt1_reader_opt) {
mt1_reader_opt->close().get();
}
});
if (with_active_memtable_reader) {
mt1_reader_opt = mt1->make_flat_reader(s, tests::make_permit());
mt1_reader_opt->set_max_buffer_size(1);
@@ -2041,6 +2048,7 @@ static void populate_range(row_cache& cache,
{
auto slice = partition_slice_builder(*cache.schema()).with_range(r).build();
auto rd = cache.make_reader(cache.schema(), tests::make_permit(), pr, slice);
auto close_rd = deferred_close(rd);
consume_all(rd);
}
@@ -2319,6 +2327,11 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
populate_range(cache, population_range);
auto rd1_v1 = assert_that(make_reader(population_range));
flat_mutation_reader_opt snap;
auto close_snap = defer([&snap] {
if (snap) {
snap->close().get();
}
});
auto d = defer([&] {
memory::scoped_critical_alloc_section dfg;
@@ -2699,6 +2712,17 @@ SEASTAR_TEST_CASE(test_random_row_population) {
std::unique_ptr<query::partition_slice> slice;
flat_mutation_reader reader;
mutation result;
read() = delete;
read(std::unique_ptr<query::partition_slice> slice_, flat_mutation_reader reader_, mutation result_) noexcept
: slice(std::move(slice_))
, reader(std::move(reader_))
, result(std::move(result_))
{ }
read(read&& o) = default;
~read() {
reader.close().get();
}
};
std::vector<read> readers;
@@ -2709,18 +2733,18 @@ SEASTAR_TEST_CASE(test_random_row_population) {
}
while (!readers.empty()) {
auto i = readers.begin();
while (i != readers.end()) {
std::vector<read> remaining_readers;
for (auto i = readers.begin(); i != readers.end(); i++) {
auto mfo = i->reader(db::no_timeout).get0();
if (!mfo) {
auto&& ranges = i->slice->row_ranges(*s.schema(), pk.key());
assert_that(i->result).is_equal_to(m1, ranges);
i = readers.erase(i);
} else {
i->result.apply(*mfo);
++i;
remaining_readers.emplace_back(std::move(*i));
}
}
readers = std::move(remaining_readers);
}
check_continuous(cache, pr, query::clustering_range::make({s.make_ckey(0)}, {s.make_ckey(9)}));
@@ -2798,6 +2822,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver
{
auto rd1 = make_reader(); // to keep the old version around
auto close_rd1 = deferred_close(rd1);
populate_range(cache, pr, query::clustering_range::make({s.make_ckey(2)}, {s.make_ckey(4)}));
@@ -2838,6 +2863,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver
populate_range(cache, pr, s.make_ckey_range(8, 8));
auto rd1 = make_reader(); // to keep the old version around
auto close_rd1 = deferred_close(rd1);
apply(m3);
@@ -2856,6 +2882,7 @@ SEASTAR_TEST_CASE(test_continuity_is_populated_when_read_overlaps_with_older_ver
populate_range(cache, pr, query::clustering_range::make_singular(s.make_ckey(7)));
auto rd1 = make_reader(); // to keep the old version around
auto close_rd1 = deferred_close(rd1);
apply(m4);
@@ -2933,6 +2960,7 @@ SEASTAR_TEST_CASE(test_continuity_population_with_multicolumn_clustering_key) {
.with_range(query::clustering_range::make_singular(ck2))
.build();
auto rd1 = make_reader(&slice1);
auto close_rd1 = deferred_close(rd1);
apply(m2);
@@ -3033,6 +3061,7 @@ SEASTAR_TEST_CASE(test_concurrent_setting_of_continuity_on_read_upper_bound) {
{
auto rd1 = make_rd(); // to keep the old version around
auto close_rd1 = deferred_close(rd1);
populate_range(cache, pr, s.make_ckey_range(0, 0));
populate_range(cache, pr, s.make_ckey_range(3, 3));
@@ -3099,6 +3128,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi
populate_range(cache, pr, s.make_ckey_range(0, 3));
auto rd1 = make_reader();
auto close_rd1 = deferred_close(rd1);
apply(cache, underlying, m2);
@@ -3344,6 +3374,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
@@ -3353,6 +3384,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice);
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
@@ -3360,6 +3392,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
@@ -3371,6 +3404,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
@@ -3380,6 +3414,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice);
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
@@ -3387,6 +3422,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
auto close_rd = deferred_close(rd);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
@@ -3605,6 +3641,7 @@ SEASTAR_TEST_CASE(test_reading_progress_with_small_buffer_and_invalidation) {
populate_range(cache, pkr, s.make_ckey_range(3, 7));
auto rd3 = cache.make_reader(s.schema(), tests::make_permit(), pkr);
auto close_rd3 = deferred_close(rd3);
rd3.set_max_buffer_size(1);
while (!rd3.is_end_of_stream()) {