mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 12:06:44 +00:00
tests: Check that partition is not resurrected on compaction failure
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -5270,3 +5270,136 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/* Make sure data is not ressurrected.
|
||||
sstable 1 with key A and key B and key C
|
||||
sstable 2 with expired (GC'able) tombstone for key A
|
||||
|
||||
use max_sstable_size = 1;
|
||||
|
||||
so key A and expired tombstone for key A are compacted away.
|
||||
key B is written into a new sstable, and sstable 2 is removed.
|
||||
|
||||
Need to stop compaction at this point!!!
|
||||
|
||||
Result: sstable 1 is alive in the table, whereas sstable 2 is gone.
|
||||
|
||||
if key A can be read from table, data was ressurrected.
|
||||
*/
|
||||
SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
cell_locker_stats cl_stats;
|
||||
|
||||
// In a column family with gc_grace_seconds set to 0, check that a tombstone
|
||||
// is purged after compaction.
|
||||
auto builder = schema_builder("tests", "incremental_compaction_data_resurrection_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, tmp.path().string(), (*gen)++, la, big);
|
||||
};
|
||||
|
||||
auto next_timestamp = [] {
|
||||
static thread_local api::timestamp_type next = 1;
|
||||
return next++;
|
||||
};
|
||||
|
||||
auto make_insert = [&] (partition_key key) {
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), next_timestamp());
|
||||
return m;
|
||||
};
|
||||
|
||||
auto make_delete = [&] (partition_key key) {
|
||||
mutation m(s, key);
|
||||
tombstone tomb(next_timestamp(), gc_clock::now());
|
||||
m.partition().apply(tomb);
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tokens = token_generation_for_current_shard(3);
|
||||
auto alpha = partition_key::from_exploded(*s, {to_bytes(tokens[0].first)});
|
||||
auto beta = partition_key::from_exploded(*s, {to_bytes(tokens[1].first)});
|
||||
auto gamma = partition_key::from_exploded(*s, {to_bytes(tokens[2].first)});
|
||||
|
||||
auto ttl = 5;
|
||||
|
||||
auto mut1 = make_insert(alpha);
|
||||
auto mut2 = make_insert(beta);
|
||||
auto mut3 = make_insert(gamma);
|
||||
auto mut1_deletion = make_delete(alpha);
|
||||
|
||||
auto non_expired_sst = make_sstable_containing(sst_gen, {mut1, mut2, mut3});
|
||||
auto expired_sst = make_sstable_containing(sst_gen, {mut1_deletion});
|
||||
// make ssts belong to same run for compaction to enable incremental approach
|
||||
utils::UUID run_id = utils::make_random_uuid();
|
||||
sstables::test(non_expired_sst).set_run_identifier(run_id);
|
||||
sstables::test(expired_sst).set_run_identifier(run_id);
|
||||
|
||||
std::vector<shared_sstable> sstables = {
|
||||
non_expired_sst,
|
||||
expired_sst,
|
||||
};
|
||||
|
||||
// make mut1_deletion gc'able.
|
||||
forward_jump_clocks(std::chrono::seconds(ttl));
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = tmp.path().string();
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = true;
|
||||
cfg.enable_incremental_backups = false;
|
||||
reader_concurrency_semaphore sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
cfg.read_concurrency_semaphore = &sem;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::null);
|
||||
|
||||
auto is_partition_dead = [&s, &cf] (partition_key& pkey) {
|
||||
column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, pkey).get0();
|
||||
return mp && bool(mp->partition_tombstone());
|
||||
};
|
||||
|
||||
cf->add_sstable_and_update_cache(non_expired_sst).get();
|
||||
BOOST_REQUIRE(!is_partition_dead(alpha));
|
||||
cf->add_sstable_and_update_cache(expired_sst).get();
|
||||
BOOST_REQUIRE(is_partition_dead(alpha));
|
||||
|
||||
auto replacer = [&] (std::vector<sstables::shared_sstable> old_sstables, std::vector<sstables::shared_sstable> new_sstables) {
|
||||
|
||||
// expired_sst is exhausted, and new sstable is written with mut 2.
|
||||
BOOST_REQUIRE(old_sstables.size() == 1);
|
||||
BOOST_REQUIRE(old_sstables.front() == expired_sst);
|
||||
BOOST_REQUIRE(new_sstables.size() == 1);
|
||||
assert_that(sstable_reader(new_sstables.front(), s))
|
||||
.produces(mut2)
|
||||
.produces_end_of_stream();
|
||||
column_family_test(cf).rebuild_sstable_list(new_sstables, old_sstables);
|
||||
// force compaction failure after sstable containing expired tombstone is removed from set.
|
||||
throw std::runtime_error("forcing compaction failure on early replacement");
|
||||
};
|
||||
|
||||
bool swallowed = false;
|
||||
try {
|
||||
// The goal is to have one sstable generated for each mutation to trigger the issue.
|
||||
auto max_sstable_size = 0;
|
||||
auto result = sstables::compact_sstables(sstables::compaction_descriptor(sstables, 0, max_sstable_size), *cf, sst_gen, replacer).get0().new_sstables;
|
||||
BOOST_REQUIRE_EQUAL(2, result.size());
|
||||
} catch (...) {
|
||||
// swallow exception
|
||||
swallowed = true;
|
||||
}
|
||||
BOOST_REQUIRE(swallowed);
|
||||
// check there's no data resurrection
|
||||
BOOST_REQUIRE(is_partition_dead(alpha));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -158,6 +158,10 @@ public:
|
||||
_sst->_data_file_write_time = wtime;
|
||||
}
|
||||
|
||||
void set_run_identifier(utils::UUID identifier) {
|
||||
_sst->_run_identifier = identifier;
|
||||
}
|
||||
|
||||
future<> store() {
|
||||
_sst->_recognized_components.erase(component_type::Index);
|
||||
_sst->_recognized_components.erase(component_type::Data);
|
||||
|
||||
Reference in New Issue
Block a user