mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
tests: Add test exploiting flush while scanning issue
This commit is contained in:
@@ -20,12 +20,15 @@
|
||||
#include "query-result-set.hh"
|
||||
#include "query-result-reader.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "tmpdir.hh"
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "tests/mutation_assertions.hh"
|
||||
#include "tests/mutation_reader_assertions.hh"
|
||||
#include "tests/result_set_assertions.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static sstring some_keyspace("ks");
|
||||
static sstring some_column_family("cf");
|
||||
|
||||
@@ -286,6 +289,84 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("v", bytes_type)
|
||||
.build();
|
||||
|
||||
auto dir = make_lw_shared<tmpdir>();
|
||||
|
||||
column_family::config cfg;
|
||||
cfg.datadir = { dir->path };
|
||||
cfg.enable_disk_reads = true;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = true;
|
||||
|
||||
return with_column_family(s, cfg, [s](column_family& cf) {
|
||||
return seastar::async([s, &cf] {
|
||||
// populate
|
||||
auto new_key = [&] {
|
||||
static thread_local int next = 0;
|
||||
return dht::global_partitioner().decorate_key(*s,
|
||||
partition_key::from_single_value(*s, to_bytes(sprint("key%d", next++))));
|
||||
};
|
||||
auto make_mutation = [&] {
|
||||
mutation m(new_key(), s);
|
||||
m.set_clustered_cell(clustering_key::make_empty(*s), "v", to_bytes("value"), 1);
|
||||
return m;
|
||||
};
|
||||
|
||||
std::vector<mutation> mutations;
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
auto m = make_mutation();
|
||||
cf.apply(m);
|
||||
mutations.emplace_back(std::move(m));
|
||||
}
|
||||
|
||||
std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator());
|
||||
|
||||
// Flush will happen in the middle of reading for this scanner
|
||||
auto assert_that_scanner1 = assert_that(cf.make_reader(query::full_partition_range));
|
||||
|
||||
// Flush will happen before it is invoked
|
||||
auto assert_that_scanner2 = assert_that(cf.make_reader(query::full_partition_range));
|
||||
|
||||
// Flush will happen after all data was read, but before EOS was consumed
|
||||
auto assert_that_scanner3 = assert_that(cf.make_reader(query::full_partition_range));
|
||||
|
||||
assert_that_scanner1.produces(mutations[0]);
|
||||
assert_that_scanner1.produces(mutations[1]);
|
||||
|
||||
for (unsigned i = 0; i < mutations.size(); ++i) {
|
||||
assert_that_scanner3.produces(mutations[i]);
|
||||
}
|
||||
|
||||
memtable& m = cf.active_memtable(); // held by scanners
|
||||
|
||||
auto flushed = cf.flush();
|
||||
|
||||
while (!m.is_flushed()) {
|
||||
sleep(10ms).get();
|
||||
}
|
||||
|
||||
for (unsigned i = 2; i < mutations.size(); ++i) {
|
||||
assert_that_scanner1.produces(mutations[i]);
|
||||
}
|
||||
assert_that_scanner1.produces_end_of_stream();
|
||||
|
||||
for (unsigned i = 0; i < mutations.size(); ++i) {
|
||||
assert_that_scanner2.produces(mutations[i]);
|
||||
}
|
||||
assert_that_scanner2.produces_end_of_stream();
|
||||
|
||||
assert_that_scanner3.produces_end_of_stream();
|
||||
|
||||
flushed.get();
|
||||
});
|
||||
}).then([dir] {});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
|
||||
|
||||
Reference in New Issue
Block a user