test: sstable_compaction_test: compact: complete conversion to async thread

We already use test_env::do_with_async in this function
but we didn't take full advantage of it to simplify the
implementation.

Do that before further changes are made.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-03-11 11:08:21 +02:00
parent 237c844901
commit d503eb75f1

View File

@@ -211,11 +211,11 @@ SEASTAR_TEST_CASE(compact) {
table_for_tests cf(env.manager(), s);
auto close_cf = deferred_stop(cf);
open_sstables(env, s, "test/resource/sstables/compaction", {1,2,3}).then([&env, s, cf, generation] (auto sstables) mutable {
auto sstables = open_sstables(env, s, "test/resource/sstables/compaction", {1,2,3}).get();
auto new_sstable = [&env, gen = make_lw_shared<unsigned>(generation), s] {
return env.make_sstable(s, (*gen)++);
};
return compact_sstables(sstables::compaction_descriptor(std::move(sstables), default_priority_class()), cf, new_sstable).then([&env, s, cf] (auto) {
compact_sstables(sstables::compaction_descriptor(std::move(sstables), default_priority_class()), cf, new_sstable).get();
// Verify that the compacted sstable has the right content. We expect to see:
// name | age | height
// -------+-----+--------
@@ -223,9 +223,13 @@ SEASTAR_TEST_CASE(compact) {
// tom | 20 | 180
// john | 20 | deleted
// nadav - deleted partition
return env.reusable_sst(s, generation).then([&env, s] (shared_sstable sst) {
auto reader = make_lw_shared<flat_mutation_reader_v2>(sstable_reader(sst, s, env.make_reader_permit())); // reader holds sst and s alive.
return read_mutation_from_flat_mutation_reader(*reader).then([reader, s] (mutation_opt m) {
auto sst = env.reusable_sst(s, generation).get();
auto reader = sstable_reader(sst, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
auto verify_mutation = [&] (std::function<void(mutation_opt)> verify) {
std::invoke(verify, read_mutation_from_flat_mutation_reader(reader).get());
};
verify_mutation([&] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("jerry")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -238,8 +242,8 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == managed_bytes({0,0,0,40}));
BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == managed_bytes({0,0,0,(int8_t)170}));
return read_mutation_from_flat_mutation_reader(*reader);
}).then([reader, s] (mutation_opt m) {
});
verify_mutation([&] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("tom")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -252,8 +256,8 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == managed_bytes({0,0,0,20}));
BOOST_REQUIRE(cells.cell_at(cdef2.id).as_atomic_cell(cdef2).value() == managed_bytes({0,0,0,(int8_t)180}));
return read_mutation_from_flat_mutation_reader(*reader);
}).then([reader, s] (mutation_opt m) {
});
verify_mutation([&] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("john")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
@@ -266,22 +270,17 @@ SEASTAR_TEST_CASE(compact) {
auto& cdef2 = *s->get_column_definition("height");
BOOST_REQUIRE(cells.cell_at(cdef1.id).as_atomic_cell(cdef1).value() == managed_bytes({0,0,0,20}));
BOOST_REQUIRE(cells.find_cell(cdef2.id) == nullptr);
return read_mutation_from_flat_mutation_reader(*reader);
}).then([reader, s] (mutation_opt m) {
});
verify_mutation([&] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("nadav")))));
BOOST_REQUIRE(m->partition().partition_tombstone());
auto rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 0);
return read_mutation_from_flat_mutation_reader(*reader);
}).then([reader] (mutation_opt m) {
BOOST_REQUIRE(!m);
}).finally([reader] {
return reader->close();
});
});
});
}).get();
verify_mutation([&] (mutation_opt m) {
BOOST_REQUIRE(!m);
});
});
// verify that the compacted sstable look like