diff --git a/configure.py b/configure.py index 2804312709..9dce6a4e83 100755 --- a/configure.py +++ b/configure.py @@ -503,6 +503,7 @@ scylla_tests = set([ 'test/boost/crc_test', 'test/boost/data_listeners_test', 'test/boost/database_test', + 'test/boost/commitlog_cleanup_test', 'test/boost/dirty_memory_manager_test', 'test/boost/duration_test', 'test/boost/dynamic_bitset_test', diff --git a/test/boost/commitlog_cleanup_test.cc b/test/boost/commitlog_cleanup_test.cc new file mode 100644 index 0000000000..1abf15b74c --- /dev/null +++ b/test/boost/commitlog_cleanup_test.cc @@ -0,0 +1,218 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "test/lib/scylla_test_case.hh" +#include "test/lib/cql_test_env.hh" +#include "db/commitlog/commitlog_replayer.hh" +#include "db/commitlog/commitlog.hh" + +// Test that `canonical_token_range(tr)` contains the same tokens as `tr`. +SEASTAR_TEST_CASE(test_canonical_token_range) { + const int64_t arbitrary_token = -42; + // Define some "interesting tokens": + // an arbitrary integer, integers which are offset by 1 from the arbitrary one, + // minimum and maximum finite tokens, infinite tokens. + // + // The idea is that combinations of these should be enough to cover all logic. + std::vector interesting_tokens = { + dht::minimum_token(), + dht::first_token(), + dht::token::from_int64(arbitrary_token - 1), + dht::token::from_int64(arbitrary_token), + dht::token::from_int64(arbitrary_token + 1), + dht::token::from_int64(std::numeric_limits::max()), + dht::maximum_token(), + }; + // Define interesting bounds: + // inclusive/exclusive bound for each interesting token, + // and empty (infinite) bound. + std::vector> interesting_bounds = { + std::nullopt, + }; + for (const auto& t : interesting_tokens) { + interesting_bounds.push_back(dht::token_range::bound{t, false}); + interesting_bounds.push_back(dht::token_range::bound{t, true}); + } + // For every interesting range `tr` (every valid pair of interesting bounds), + // check that `tr` it is semantically equal to `canonical_token_range(tr)`, + // by testing that it contains the same subset of finite interesting tokens. + for (const auto& a : interesting_bounds) { + for (const auto& b : interesting_bounds) { + auto wrapping_orig = wrapping_interval(a, b); + if (wrapping_orig.is_wrap_around(dht::token_comparator())) { + continue; + } + auto orig = dht::token_range(wrapping_orig); + auto canon = db::system_keyspace::canonical_token_range(orig); + for (auto t : { + std::numeric_limits::min() + 1, + arbitrary_token - 1, + arbitrary_token, + arbitrary_token + 1, + std::numeric_limits::max(), + }) { + bool in_tr = orig.contains(dht::token::from_int64(t), dht::token_comparator()); + bool in_canon = t > canon.first && t <= canon.second; + BOOST_REQUIRE_EQUAL(in_tr, in_canon); + } + } + } + return make_ready_future<>(); +} + +// Test basic functionality of system_keyspace::save_range_cleanup_record() +// and system_keyspace::get_range_cleanup_records(). +SEASTAR_TEST_CASE(test_commitlog_cleanup_records) { + return do_with_cql_env_thread([](cql_test_env& e) { + auto tableid = table_id(utils::UUID_gen::get_time_UUID()); + + auto insert_record = [&] (table_id tid, std::pair lo, std::pair hi, db::replay_position rp) { + auto tr = dht::token_range::make({dht::token::from_int64(lo.first), lo.second}, {dht::token::from_int64(hi.first), hi.second}); + e.get_system_keyspace().local().save_commitlog_cleanup_record(tid, tr, rp).get(); + }; + + // Insert an interval. + insert_record(tableid, {0, true}, {1, true}, {42, 1337, 1}); + // Overlap it with a range with higher replay position. + insert_record(tableid, {1, true}, {2, true}, {42, 1337, 2}); + // Overlap both with a range with lower replay position. + insert_record(tableid, {-1, true}, {3, true}, {42, 1337, 0}); + // Insert an empty range, it shouldn't affect the result at all. + insert_record(tableid, {0, false}, {1, false}, {42, 1337, 3}); + // Insert a record for a different table. + insert_record(table_id(utils::UUID_gen::get_time_UUID()), {0, false}, {1, false}, {42, 1337, 5}); + // Insert a record for a different shard. + insert_record(tableid, {0, false}, {1, false}, {1, 1337, 5}); + + // Read the records. + auto map = e.get_system_keyspace().local().get_commitlog_cleanup_records().get(); + BOOST_REQUIRE_EQUAL(map.size(), 3); // 3 combinations + auto local_map = map.find({tableid, 42}); + BOOST_REQUIRE(local_map != map.end()); + + auto get_rp = [&] (int64_t t) -> std::optional { + return local_map->second.get(t); + }; + // Check that the resulting map is as we expect. + BOOST_REQUIRE(get_rp(-2) == std::nullopt); + BOOST_REQUIRE(get_rp(-1) == db::replay_position(42, 1337, 0)); + BOOST_REQUIRE(get_rp(0) == db::replay_position(42, 1337, 1)); + BOOST_REQUIRE(get_rp(1) == db::replay_position(42, 1337, 2)); + BOOST_REQUIRE(get_rp(2) == db::replay_position(42, 1337, 2)); + BOOST_REQUIRE(get_rp(3) == db::replay_position(42, 1337, 0)); + BOOST_REQUIRE(get_rp(4) == std::nullopt); + }); +} + +// Test that commitlog doesn't resurrect data after table cleanup. +SEASTAR_TEST_CASE(test_commitlog_cleanups) { + auto cfg = cql_test_config(); + cfg.db_config->auto_snapshot.set(false); + cfg.db_config->commitlog_sync.set("batch"); + cfg.db_config->experimental_features.set({db::experimental_features_t::feature::TABLETS}); + cfg.initial_tablets = 1; + + return do_with_cql_env_thread([](cql_test_env& e) { + // Create a table. + e.execute_cql("create table ks.cf (pk int, ck int, primary key (pk, ck))").get(); + auto get_num_rows = [&] { + auto res = e.execute_cql("select * from ks.cf;").get(); + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().result_set().size(); + }; + + // Insert a row into the table. + e.execute_cql("insert into ks.cf (pk,ck) values (0, 0)").get(); + BOOST_REQUIRE_EQUAL(get_num_rows(), 1); + + // Cleanup the tablet. + e.db().invoke_on_all([&] (replica::database& db) { + return db.find_column_family("ks", "cf").cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)); + }).get(); + BOOST_REQUIRE_EQUAL(get_num_rows(), 0); + + // Insert more rows into the table. + e.execute_cql("insert into ks.cf (pk,ck) values (0, 1)").get(); + e.execute_cql("insert into ks.cf (pk,ck) values (0, 2)").get(); + BOOST_REQUIRE_EQUAL(get_num_rows(), 2); + + // Drop memtables. + e.db().invoke_on_all([&] (replica::database& db) -> future<> { + return db.find_column_family("ks", "cf").clear(); + }).get(); + BOOST_REQUIRE_EQUAL(get_num_rows(), 0); + + // Commitlog replay should resurrect the 2 dropped rows, + // but shouldn't resurrect the 1 cleaned row. + e.db().invoke_on_all([&] (replica::database& db) -> future<> { + auto cl = db.commitlog(); + auto rp = co_await db::commitlog_replayer::create_replayer(e.db(), e.get_system_keyspace()); + auto paths = co_await cl->list_existing_segments(); + co_await rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX); + }).get(); + BOOST_REQUIRE_EQUAL(get_num_rows(), 2); + }, cfg); +} + +// Test that commitlog cleanup records are deleted when they become irrelevant. +SEASTAR_TEST_CASE(test_commitlog_cleanup_record_gc) { + BOOST_REQUIRE_EQUAL(smp::count, 1); + auto cfg = cql_test_config(); + cfg.db_config->auto_snapshot.set(false); + cfg.db_config->commitlog_sync.set("batch"); + cfg.db_config->experimental_features.set({db::experimental_features_t::feature::TABLETS}); + cfg.initial_tablets = 1; + + return do_with_cql_env_thread([](cql_test_env& e) { + e.execute_cql("create table ks.cf1 (pk int, ck int, primary key (pk, ck))").get(); + e.execute_cql("create table ks.cf2 (pk int, ck int, primary key (pk, ck))").get(); + + auto insert_mutation = [&] (std::string cf) { + e.execute_cql(fmt::format("insert into ks.{} (pk,ck) values (0, 0)", cf)).get(); + }; + auto cleanup_tablet = [&] (std::string cf) { + auto& db = e.local_db(); + db.find_column_family("ks", cf).cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get(); + }; + auto get_num_records = [&] { + auto res = e.execute_cql("select * from system.commitlog_cleanups;").get(); + auto rows = dynamic_pointer_cast(res); + BOOST_REQUIRE(rows); + return rows->rs().result_set().size(); + }; + auto step_cf = [&] (std::string cf) { + e.local_db().commitlog()->force_new_active_segment().get(); + e.local_db().commitlog()->wait_for_pending_deletes().get(); + insert_mutation(cf); + cleanup_tablet(cf); + }; + + // Insert a mutation to cf1 to pin a commitlog segment. + insert_mutation("cf1"); + + // Run some insertions and cleanups on cf2. + // Commitlog is pinned by cf1, so all cleanup records are relevant, and they + // keep accumulating. + step_cf("cf2"); + BOOST_REQUIRE_EQUAL(get_num_records(), 1); + step_cf("cf2"); + BOOST_REQUIRE_EQUAL(get_num_records(), 2); + step_cf("cf2"); + BOOST_REQUIRE_EQUAL(get_num_records(), 3); + + // Flush all tables and wait for the released commitlog segments to disappear. + e.local_db().flush_all_tables().get(); + e.local_db().commitlog()->wait_for_pending_deletes().get(); + + // Since the old cleanup records refer to commitlog segments which are now gone, + // the next cleanup should delete them, leaving only a single new cleanup entry. + step_cf("cf2"); + BOOST_REQUIRE_EQUAL(get_num_records(), 1); + }, cfg); +} diff --git a/test/boost/suite.yaml b/test/boost/suite.yaml index 81a7e5b530..927d5ebf2c 100644 --- a/test/boost/suite.yaml +++ b/test/boost/suite.yaml @@ -42,5 +42,7 @@ custom_args: - '-c2 -m3G' cache_algorithm_test: - '-c1 -m256M' + commitlog_cleanup_test: + - '-c1 -m2G' run_in_debug: - logalloc_standard_allocator_segment_pool_backend_test