diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index fd3f4f2209..7efc5dd321 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -5270,3 +5270,136 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { } }); } + +/* Make sure data is not ressurrected. + sstable 1 with key A and key B and key C + sstable 2 with expired (GC'able) tombstone for key A + + use max_sstable_size = 1; + + so key A and expired tombstone for key A are compacted away. + key B is written into a new sstable, and sstable 2 is removed. + + Need to stop compaction at this point!!! + + Result: sstable 1 is alive in the table, whereas sstable 2 is gone. + + if key A can be read from table, data was ressurrected. + */ +SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) { + return test_env::do_with_async([] (test_env& env) { + storage_service_for_tests ssft; + cell_locker_stats cl_stats; + + // In a column family with gc_grace_seconds set to 0, check that a tombstone + // is purged after compaction. + auto builder = schema_builder("tests", "incremental_compaction_data_resurrection_test") + .with_column("id", utf8_type, column_kind::partition_key) + .with_column("value", int32_type); + builder.set_gc_grace_seconds(0); + auto s = builder.build(); + + auto tmp = tmpdir(); + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)] () mutable { + return env.make_sstable(s, tmp.path().string(), (*gen)++, la, big); + }; + + auto next_timestamp = [] { + static thread_local api::timestamp_type next = 1; + return next++; + }; + + auto make_insert = [&] (partition_key key) { + mutation m(s, key); + m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), next_timestamp()); + return m; + }; + + auto make_delete = [&] (partition_key key) { + mutation m(s, key); + tombstone tomb(next_timestamp(), gc_clock::now()); + m.partition().apply(tomb); + return m; + }; + + auto tokens = token_generation_for_current_shard(3); + auto alpha = partition_key::from_exploded(*s, {to_bytes(tokens[0].first)}); + auto beta = partition_key::from_exploded(*s, {to_bytes(tokens[1].first)}); + auto gamma = partition_key::from_exploded(*s, {to_bytes(tokens[2].first)}); + + auto ttl = 5; + + auto mut1 = make_insert(alpha); + auto mut2 = make_insert(beta); + auto mut3 = make_insert(gamma); + auto mut1_deletion = make_delete(alpha); + + auto non_expired_sst = make_sstable_containing(sst_gen, {mut1, mut2, mut3}); + auto expired_sst = make_sstable_containing(sst_gen, {mut1_deletion}); + // make ssts belong to same run for compaction to enable incremental approach + utils::UUID run_id = utils::make_random_uuid(); + sstables::test(non_expired_sst).set_run_identifier(run_id); + sstables::test(expired_sst).set_run_identifier(run_id); + + std::vector sstables = { + non_expired_sst, + expired_sst, + }; + + // make mut1_deletion gc'able. + forward_jump_clocks(std::chrono::seconds(ttl)); + + auto cm = make_lw_shared(); + column_family::config cfg = column_family_test_config(); + cfg.datadir = tmp.path().string(); + cfg.enable_disk_writes = false; + cfg.enable_commitlog = false; + cfg.enable_cache = true; + cfg.enable_incremental_backups = false; + reader_concurrency_semaphore sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{}); + cfg.read_concurrency_semaphore = &sem; + auto tracker = make_lw_shared(); + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker); + cf->mark_ready_for_writes(); + cf->start(); + cf->set_compaction_strategy(sstables::compaction_strategy_type::null); + + auto is_partition_dead = [&s, &cf] (partition_key& pkey) { + column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, pkey).get0(); + return mp && bool(mp->partition_tombstone()); + }; + + cf->add_sstable_and_update_cache(non_expired_sst).get(); + BOOST_REQUIRE(!is_partition_dead(alpha)); + cf->add_sstable_and_update_cache(expired_sst).get(); + BOOST_REQUIRE(is_partition_dead(alpha)); + + auto replacer = [&] (std::vector old_sstables, std::vector new_sstables) { + + // expired_sst is exhausted, and new sstable is written with mut 2. + BOOST_REQUIRE(old_sstables.size() == 1); + BOOST_REQUIRE(old_sstables.front() == expired_sst); + BOOST_REQUIRE(new_sstables.size() == 1); + assert_that(sstable_reader(new_sstables.front(), s)) + .produces(mut2) + .produces_end_of_stream(); + column_family_test(cf).rebuild_sstable_list(new_sstables, old_sstables); + // force compaction failure after sstable containing expired tombstone is removed from set. + throw std::runtime_error("forcing compaction failure on early replacement"); + }; + + bool swallowed = false; + try { + // The goal is to have one sstable generated for each mutation to trigger the issue. + auto max_sstable_size = 0; + auto result = sstables::compact_sstables(sstables::compaction_descriptor(sstables, 0, max_sstable_size), *cf, sst_gen, replacer).get0().new_sstables; + BOOST_REQUIRE_EQUAL(2, result.size()); + } catch (...) { + // swallow exception + swallowed = true; + } + BOOST_REQUIRE(swallowed); + // check there's no data resurrection + BOOST_REQUIRE(is_partition_dead(alpha)); + }); +} diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 59dcf40f16..a759187e19 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -158,6 +158,10 @@ public: _sst->_data_file_write_time = wtime; } + void set_run_identifier(utils::UUID identifier) { + _sst->_run_identifier = identifier; + } + future<> store() { _sst->_recognized_components.erase(component_type::Index); _sst->_recognized_components.erase(component_type::Data);