From b2c2eb6cd20329402cf28c8b864abbddb2f5e504 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 9 Sep 2015 09:15:45 +0200 Subject: [PATCH] tests: Add test exploiting flush while scanning issue --- tests/mutation_test.cc | 81 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/mutation_test.cc b/tests/mutation_test.cc index 1cd6f7a188..ef666ad090 100644 --- a/tests/mutation_test.cc +++ b/tests/mutation_test.cc @@ -20,12 +20,15 @@ #include "query-result-set.hh" #include "query-result-reader.hh" #include "partition_slice_builder.hh" +#include "tmpdir.hh" #include "tests/test-utils.hh" #include "tests/mutation_assertions.hh" #include "tests/mutation_reader_assertions.hh" #include "tests/result_set_assertions.hh" +using namespace std::chrono_literals; + static sstring some_keyspace("ks"); static sstring some_column_family("cf"); @@ -286,6 +289,84 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { }); } +SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) { + auto s = schema_builder("ks", "cf") + .with_column("pk", bytes_type, column_kind::partition_key) + .with_column("v", bytes_type) + .build(); + + auto dir = make_lw_shared(); + + column_family::config cfg; + cfg.datadir = { dir->path }; + cfg.enable_disk_reads = true; + cfg.enable_disk_writes = true; + cfg.enable_cache = true; + + return with_column_family(s, cfg, [s](column_family& cf) { + return seastar::async([s, &cf] { + // populate + auto new_key = [&] { + static thread_local int next = 0; + return dht::global_partitioner().decorate_key(*s, + partition_key::from_single_value(*s, to_bytes(sprint("key%d", next++)))); + }; + auto make_mutation = [&] { + mutation m(new_key(), s); + m.set_clustered_cell(clustering_key::make_empty(*s), "v", to_bytes("value"), 1); + return m; + }; + + std::vector mutations; + for (int i = 0; i < 1000; ++i) { + auto m = make_mutation(); + cf.apply(m); + mutations.emplace_back(std::move(m)); + } + + std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator()); + + // Flush will happen in the middle of reading for this scanner + auto assert_that_scanner1 = assert_that(cf.make_reader(query::full_partition_range)); + + // Flush will happen before it is invoked + auto assert_that_scanner2 = assert_that(cf.make_reader(query::full_partition_range)); + + // Flush will happen after all data was read, but before EOS was consumed + auto assert_that_scanner3 = assert_that(cf.make_reader(query::full_partition_range)); + + assert_that_scanner1.produces(mutations[0]); + assert_that_scanner1.produces(mutations[1]); + + for (unsigned i = 0; i < mutations.size(); ++i) { + assert_that_scanner3.produces(mutations[i]); + } + + memtable& m = cf.active_memtable(); // held by scanners + + auto flushed = cf.flush(); + + while (!m.is_flushed()) { + sleep(10ms).get(); + } + + for (unsigned i = 2; i < mutations.size(); ++i) { + assert_that_scanner1.produces(mutations[i]); + } + assert_that_scanner1.produces_end_of_stream(); + + for (unsigned i = 0; i < mutations.size(); ++i) { + assert_that_scanner2.produces(mutations[i]); + } + assert_that_scanner2.produces_end_of_stream(); + + assert_that_scanner3.produces_end_of_stream(); + + flushed.get(); + }); + }).then([dir] {}); +} + SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, {{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));