Files
scylladb/test/boost/sstable_compressor_factory_test.cc
Michał Chojnowski 7c6e84e2ec test/boost/sstable_compressor_factory_test: fix thread-unsafe usage of Boost.Test
It turns out that Boost assertions are thread-unsafe,
(and can't be used from multiple threads concurrently).
This causes the test to fail with cryptic log corruptions sometimes.
Fix that by switching to thread-safe checks.

Fixes scylladb/scylladb#24982

Closes scylladb/scylladb#26472
2025-10-12 17:16:51 +03:00

139 lines
6.6 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include <seastar/testing/thread_test_case.hh>
#include "sstables/sstable_compressor_factory.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/test_utils.hh"
BOOST_AUTO_TEST_SUITE(sstable_compressor_factory_test)
// 1. Create a random message.
// 2. Set this random message as the recommended dict.
// 3. On all shards, create compressors.
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
// 7. Repeat this a few times for both lz4 and zstd.
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
// Create a compressor factory.
tests::require(shard_to_numa_mapping.size() == smp::count);
auto config = default_sstable_compressor_factory::config{
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
};
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
sstable_compressor_factory.start(std::cref(config)).get();
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
auto table = table_id::create_random_id();
// Retry a few times just to check that it works more than once.
for (int retry = 0; retry < 3; ++retry) {
// Generate a random (and hence uhcompressible without a dict) message.
auto message = tests::random::get_sstring(4096);
auto dict_view = std::as_bytes(std::span(message));
// Set the message as the dict to make the message perfectly compressible.
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
std::vector<unsigned> compressor_numa_nodes(smp::count);
std::vector<unsigned> decompressor_numa_nodes(smp::count);
// Try for both algorithms, just in case there are some differences in how dictionary
// distribution over shards is implemented between them.
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
// Validate that the dictionaries work as intended,
// and check that their owner is as expected.
auto params = compression_parameters(algo);
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
// Check that the dictionary used by this shard lies on the same NUMA node.
// This is important to avoid cross-node memory accesses on the hot path.
tests::require_equal(our_numa_node, compressor_numa_node);
tests::require_equal(our_numa_node, decompressor_numa_node);
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
auto output_max_size = compressor->compress_max_size(message.size());
auto compressed = std::vector<char>(output_max_size);
auto compressed_size = compressor->compress(
reinterpret_cast<const char*>(message.data()), message.size(),
reinterpret_cast<char*>(compressed.data()), compressed.size());
tests::require_greater_equal(compressed_size, 0);
compressed.resize(compressed_size);
// Validate that the recommeded dict was actually used.
tests::require_less(compressed.size(), message.size() / 10);
auto decompressed = std::vector<char>(message.size());
auto decompressed_size = decompressor->uncompress(
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
tests::require_greater_equal(decompressed_size, 0);
decompressed.resize(decompressed_size);
// Validate that the roundtrip through compressor and decompressor
// resulted in the original message.
tests::require(std::equal(message.begin(), message.end(), decompressed.begin(), decompressed.end()));
})).get();
}
// Check that the number of owners (and hence, copies) is equal to the number
// of NUMA nodes.
// This isn't that important, but we don't want to duplicate dictionaries
// within a NUMA node unnecessarily.
tests::require_equal(
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
tests::require_equal(
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
}
}
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
{
std::vector<unsigned> one_numa_node(smp::count);
test_one_numa_topology(one_numa_node);
}
{
std::vector<unsigned> two_numa_nodes(smp::count);
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
two_numa_nodes[i] = i % 2;
}
test_one_numa_topology(two_numa_nodes);
}
{
std::vector<unsigned> n_numa_nodes(smp::count);
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
n_numa_nodes[i] = i;
}
test_one_numa_topology(n_numa_nodes);
}
}
BOOST_AUTO_TEST_SUITE_END()