test: boost: add commitlog_cleanup_test

Adds a test for the commitlog cleanup functionality added
earlier in the series.
This commit is contained in:
Michał Chojnowski
2024-01-23 20:58:30 +01:00
parent a246bb39ef
commit 94cdfcaa94
3 changed files with 221 additions and 0 deletions

View File

@@ -503,6 +503,7 @@ scylla_tests = set([
'test/boost/crc_test',
'test/boost/data_listeners_test',
'test/boost/database_test',
'test/boost/commitlog_cleanup_test',
'test/boost/dirty_memory_manager_test',
'test/boost/duration_test',
'test/boost/dynamic_bitset_test',

View File

@@ -0,0 +1,218 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/scylla_test_case.hh"
#include "test/lib/cql_test_env.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
// Test that `canonical_token_range(tr)` contains the same tokens as `tr`.
SEASTAR_TEST_CASE(test_canonical_token_range) {
const int64_t arbitrary_token = -42;
// Define some "interesting tokens":
// an arbitrary integer, integers which are offset by 1 from the arbitrary one,
// minimum and maximum finite tokens, infinite tokens.
//
// The idea is that combinations of these should be enough to cover all logic.
std::vector<dht::token> interesting_tokens = {
dht::minimum_token(),
dht::first_token(),
dht::token::from_int64(arbitrary_token - 1),
dht::token::from_int64(arbitrary_token),
dht::token::from_int64(arbitrary_token + 1),
dht::token::from_int64(std::numeric_limits<int64_t>::max()),
dht::maximum_token(),
};
// Define interesting bounds:
// inclusive/exclusive bound for each interesting token,
// and empty (infinite) bound.
std::vector<std::optional<dht::token_range::bound>> interesting_bounds = {
std::nullopt,
};
for (const auto& t : interesting_tokens) {
interesting_bounds.push_back(dht::token_range::bound{t, false});
interesting_bounds.push_back(dht::token_range::bound{t, true});
}
// For every interesting range `tr` (every valid pair of interesting bounds),
// check that `tr` it is semantically equal to `canonical_token_range(tr)`,
// by testing that it contains the same subset of finite interesting tokens.
for (const auto& a : interesting_bounds) {
for (const auto& b : interesting_bounds) {
auto wrapping_orig = wrapping_interval<dht::token>(a, b);
if (wrapping_orig.is_wrap_around(dht::token_comparator())) {
continue;
}
auto orig = dht::token_range(wrapping_orig);
auto canon = db::system_keyspace::canonical_token_range(orig);
for (auto t : {
std::numeric_limits<int64_t>::min() + 1,
arbitrary_token - 1,
arbitrary_token,
arbitrary_token + 1,
std::numeric_limits<int64_t>::max(),
}) {
bool in_tr = orig.contains(dht::token::from_int64(t), dht::token_comparator());
bool in_canon = t > canon.first && t <= canon.second;
BOOST_REQUIRE_EQUAL(in_tr, in_canon);
}
}
}
return make_ready_future<>();
}
// Test basic functionality of system_keyspace::save_range_cleanup_record()
// and system_keyspace::get_range_cleanup_records().
SEASTAR_TEST_CASE(test_commitlog_cleanup_records) {
return do_with_cql_env_thread([](cql_test_env& e) {
auto tableid = table_id(utils::UUID_gen::get_time_UUID());
auto insert_record = [&] (table_id tid, std::pair<int64_t, bool> lo, std::pair<int64_t, bool> hi, db::replay_position rp) {
auto tr = dht::token_range::make({dht::token::from_int64(lo.first), lo.second}, {dht::token::from_int64(hi.first), hi.second});
e.get_system_keyspace().local().save_commitlog_cleanup_record(tid, tr, rp).get();
};
// Insert an interval.
insert_record(tableid, {0, true}, {1, true}, {42, 1337, 1});
// Overlap it with a range with higher replay position.
insert_record(tableid, {1, true}, {2, true}, {42, 1337, 2});
// Overlap both with a range with lower replay position.
insert_record(tableid, {-1, true}, {3, true}, {42, 1337, 0});
// Insert an empty range, it shouldn't affect the result at all.
insert_record(tableid, {0, false}, {1, false}, {42, 1337, 3});
// Insert a record for a different table.
insert_record(table_id(utils::UUID_gen::get_time_UUID()), {0, false}, {1, false}, {42, 1337, 5});
// Insert a record for a different shard.
insert_record(tableid, {0, false}, {1, false}, {1, 1337, 5});
// Read the records.
auto map = e.get_system_keyspace().local().get_commitlog_cleanup_records().get();
BOOST_REQUIRE_EQUAL(map.size(), 3); // 3 <shard, table> combinations
auto local_map = map.find({tableid, 42});
BOOST_REQUIRE(local_map != map.end());
auto get_rp = [&] (int64_t t) -> std::optional<db::replay_position> {
return local_map->second.get(t);
};
// Check that the resulting map is as we expect.
BOOST_REQUIRE(get_rp(-2) == std::nullopt);
BOOST_REQUIRE(get_rp(-1) == db::replay_position(42, 1337, 0));
BOOST_REQUIRE(get_rp(0) == db::replay_position(42, 1337, 1));
BOOST_REQUIRE(get_rp(1) == db::replay_position(42, 1337, 2));
BOOST_REQUIRE(get_rp(2) == db::replay_position(42, 1337, 2));
BOOST_REQUIRE(get_rp(3) == db::replay_position(42, 1337, 0));
BOOST_REQUIRE(get_rp(4) == std::nullopt);
});
}
// Test that commitlog doesn't resurrect data after table cleanup.
SEASTAR_TEST_CASE(test_commitlog_cleanups) {
auto cfg = cql_test_config();
cfg.db_config->auto_snapshot.set(false);
cfg.db_config->commitlog_sync.set("batch");
cfg.db_config->experimental_features.set({db::experimental_features_t::feature::TABLETS});
cfg.initial_tablets = 1;
return do_with_cql_env_thread([](cql_test_env& e) {
// Create a table.
e.execute_cql("create table ks.cf (pk int, ck int, primary key (pk, ck))").get();
auto get_num_rows = [&] {
auto res = e.execute_cql("select * from ks.cf;").get();
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE(rows);
return rows->rs().result_set().size();
};
// Insert a row into the table.
e.execute_cql("insert into ks.cf (pk,ck) values (0, 0)").get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 1);
// Cleanup the tablet.
e.db().invoke_on_all([&] (replica::database& db) {
return db.find_column_family("ks", "cf").cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0));
}).get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 0);
// Insert more rows into the table.
e.execute_cql("insert into ks.cf (pk,ck) values (0, 1)").get();
e.execute_cql("insert into ks.cf (pk,ck) values (0, 2)").get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 2);
// Drop memtables.
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
return db.find_column_family("ks", "cf").clear();
}).get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 0);
// Commitlog replay should resurrect the 2 dropped rows,
// but shouldn't resurrect the 1 cleaned row.
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
auto cl = db.commitlog();
auto rp = co_await db::commitlog_replayer::create_replayer(e.db(), e.get_system_keyspace());
auto paths = co_await cl->list_existing_segments();
co_await rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX);
}).get();
BOOST_REQUIRE_EQUAL(get_num_rows(), 2);
}, cfg);
}
// Test that commitlog cleanup records are deleted when they become irrelevant.
SEASTAR_TEST_CASE(test_commitlog_cleanup_record_gc) {
BOOST_REQUIRE_EQUAL(smp::count, 1);
auto cfg = cql_test_config();
cfg.db_config->auto_snapshot.set(false);
cfg.db_config->commitlog_sync.set("batch");
cfg.db_config->experimental_features.set({db::experimental_features_t::feature::TABLETS});
cfg.initial_tablets = 1;
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("create table ks.cf1 (pk int, ck int, primary key (pk, ck))").get();
e.execute_cql("create table ks.cf2 (pk int, ck int, primary key (pk, ck))").get();
auto insert_mutation = [&] (std::string cf) {
e.execute_cql(fmt::format("insert into ks.{} (pk,ck) values (0, 0)", cf)).get();
};
auto cleanup_tablet = [&] (std::string cf) {
auto& db = e.local_db();
db.find_column_family("ks", cf).cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get();
};
auto get_num_records = [&] {
auto res = e.execute_cql("select * from system.commitlog_cleanups;").get();
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
BOOST_REQUIRE(rows);
return rows->rs().result_set().size();
};
auto step_cf = [&] (std::string cf) {
e.local_db().commitlog()->force_new_active_segment().get();
e.local_db().commitlog()->wait_for_pending_deletes().get();
insert_mutation(cf);
cleanup_tablet(cf);
};
// Insert a mutation to cf1 to pin a commitlog segment.
insert_mutation("cf1");
// Run some insertions and cleanups on cf2.
// Commitlog is pinned by cf1, so all cleanup records are relevant, and they
// keep accumulating.
step_cf("cf2");
BOOST_REQUIRE_EQUAL(get_num_records(), 1);
step_cf("cf2");
BOOST_REQUIRE_EQUAL(get_num_records(), 2);
step_cf("cf2");
BOOST_REQUIRE_EQUAL(get_num_records(), 3);
// Flush all tables and wait for the released commitlog segments to disappear.
e.local_db().flush_all_tables().get();
e.local_db().commitlog()->wait_for_pending_deletes().get();
// Since the old cleanup records refer to commitlog segments which are now gone,
// the next cleanup should delete them, leaving only a single new cleanup entry.
step_cf("cf2");
BOOST_REQUIRE_EQUAL(get_num_records(), 1);
}, cfg);
}

View File

@@ -42,5 +42,7 @@ custom_args:
- '-c2 -m3G'
cache_algorithm_test:
- '-c1 -m256M'
commitlog_cleanup_test:
- '-c1 -m2G'
run_in_debug:
- logalloc_standard_allocator_segment_pool_backend_test