mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
test: optimize twcs_reshape_with_disjoint_set for remote storage
Parallelize SSTable creation across all sub-tests using parallel_for_each and reduce the SSTable count from 256 to 64 for S3/GCS variants. Re-enable the S3 test variant that was previously disabled due to taking 4+ minutes. With parallel creation and reduced count, the test now completes in a reasonable time.
This commit is contained in:
@@ -5288,8 +5288,7 @@ SEASTAR_FIXTURE_TEST_CASE(test_offstrategy_sstable_compaction_gcs, gcs_fixture,
|
||||
test_env_config{.storage = make_test_object_storage_options("GS")});
|
||||
}
|
||||
|
||||
void twcs_reshape_with_disjoint_set_fn(test_env& env) {
|
||||
static constexpr unsigned disjoint_sstable_count = 256;
|
||||
void twcs_reshape_with_disjoint_set_fn(test_env& env, unsigned disjoint_sstable_count = 256) {
|
||||
auto builder = schema_builder("tests", "twcs_reshape_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", ::timestamp_type, column_kind::clustering_key)
|
||||
@@ -5346,27 +5345,23 @@ void twcs_reshape_with_disjoint_set_fn(test_env& env) {
|
||||
auto sst_gen = env.make_sst_factory(s);
|
||||
|
||||
{
|
||||
// create set of 256 disjoint ssts that belong to the same time window and expect that twcs reshape allows them all to be compacted at once
|
||||
// create set of disjoint ssts that belong to the same time window and expect that twcs reshape allows them all to be compacted at once
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(1))}).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
std::vector<sstables::shared_sstable> sstables(disjoint_sstable_count);
|
||||
parallel_for_each(std::views::iota(0u, disjoint_sstable_count), [&](unsigned i) -> future<> {
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(1))});
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, compaction::reshape_mode::strict).sstables.size(), disjoint_sstable_count);
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 disjoint ssts that belong to different windows and expect that twcs reshape allows them all to be compacted at once
|
||||
// create set of disjoint ssts that belong to different windows and expect that twcs reshape allows them all to be compacted at once
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(i))}).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
std::vector<sstables::shared_sstable> sstables(disjoint_sstable_count);
|
||||
parallel_for_each(std::views::iota(0u, disjoint_sstable_count), [&](unsigned i) -> future<> {
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(i))});
|
||||
}).get();
|
||||
|
||||
auto reshaping_count = get_reshaping_job(cs, sstables, s, compaction::reshape_mode::strict).sstables.size();
|
||||
BOOST_REQUIRE_GE(reshaping_count, disjoint_sstable_count - min_threshold + 1);
|
||||
@@ -5374,30 +5369,25 @@ void twcs_reshape_with_disjoint_set_fn(test_env& env) {
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 disjoint ssts that belong to different windows with none over the threshold and expect that twcs reshape selects none of them
|
||||
// create set of disjoint ssts that belong to different windows with none over the threshold and expect that twcs reshape selects none of them
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(24*i))}).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
i++;
|
||||
sst = make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(24*i + 1))}).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
std::vector<sstables::shared_sstable> sstables(disjoint_sstable_count);
|
||||
parallel_for_each(std::views::iota(0u, disjoint_sstable_count / 2), [&](unsigned pair_idx) -> future<> {
|
||||
unsigned i = pair_idx * 2;
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, {make_row(i, std::chrono::hours(24*i))});
|
||||
sstables[i+1] = co_await make_sstable_containing(sst_gen, {make_row(i+1, std::chrono::hours(24*(i+1) + 1))});
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, compaction::reshape_mode::strict).sstables.size(), 0);
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 overlapping ssts that belong to the same time window and expect that twcs reshape allows only 32 to be compacted at once
|
||||
// create set of overlapping ssts that belong to the same time window and expect that twcs reshape allows only 32 to be compacted at once
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(0, std::chrono::hours(1))}).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
std::vector<sstables::shared_sstable> sstables(disjoint_sstable_count);
|
||||
parallel_for_each(std::views::iota(0u, disjoint_sstable_count), [&](unsigned i) -> future<> {
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, {make_row(0, std::chrono::hours(1))});
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, compaction::reshape_mode::strict).sstables.size(), uint64_t(s->max_compaction_threshold()));
|
||||
}
|
||||
@@ -5416,21 +5406,21 @@ void twcs_reshape_with_disjoint_set_fn(test_env& env) {
|
||||
|
||||
std::unordered_set<sstables::generation_type> generations_for_small_files;
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(64);
|
||||
std::vector<sstables::shared_sstable> sstables(64);
|
||||
|
||||
// Track which indices are small files
|
||||
parallel_for_each(std::views::iota(0u, 64u), [&](unsigned i) -> future<> {
|
||||
if (i % 2 == 0) {
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, mutations_for_small_files);
|
||||
} else {
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, mutations_for_big_files);
|
||||
}
|
||||
}).get();
|
||||
|
||||
for (unsigned i = 0; i < 64; i++) {
|
||||
sstables::shared_sstable sst;
|
||||
//
|
||||
// intermix big and small files, to make sure STCS logic is really applied to favor similar-sized reshape jobs.
|
||||
//
|
||||
if (i % 2 == 0) {
|
||||
sst = make_sstable_containing(sst_gen, mutations_for_small_files).get();
|
||||
generations_for_small_files.insert(sst->generation());
|
||||
} else {
|
||||
sst = make_sstable_containing(sst_gen, mutations_for_big_files).get();
|
||||
generations_for_small_files.insert(sstables[i]->generation());
|
||||
}
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
auto check_mode_correctness = [&] (compaction::reshape_mode mode) {
|
||||
@@ -5448,19 +5438,17 @@ void twcs_reshape_with_disjoint_set_fn(test_env& env) {
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 disjoint ssts that spans multiple windows (essentially what happens in off-strategy during node op)
|
||||
// create set of disjoint ssts that spans multiple windows (essentially what happens in off-strategy during node op)
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (auto i = 0U; i < disjoint_sstable_count; i++) {
|
||||
std::vector<sstables::shared_sstable> sstables(disjoint_sstable_count);
|
||||
parallel_for_each(std::views::iota(0u, disjoint_sstable_count), [&](unsigned i) -> future<> {
|
||||
utils::chunked_vector<mutation> muts;
|
||||
muts.reserve(5);
|
||||
for (auto j = 0; j < 5; j++) {
|
||||
muts.push_back(make_row(i, std::chrono::hours(j * 8)));
|
||||
}
|
||||
auto sst = make_sstable_containing(sst_gen, std::move(muts)).get();
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
sstables[i] = co_await make_sstable_containing(sst_gen, std::move(muts));
|
||||
}).get();
|
||||
|
||||
auto job_size = [] (auto&& sst_range) {
|
||||
return std::ranges::fold_left(sst_range | std::views::transform(std::mem_fn(&sstable::bytes_on_disk)), uint64_t(0), std::plus{});
|
||||
@@ -5492,18 +5480,12 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_s3_test, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
|
||||
// TODO: Deeper investigation needed to figure out why it takes 4+ minutes to run on S3 storage, while it runs in seconds on local storage. For now,
|
||||
// skipping the test for S3.
|
||||
testlog.info("cleanup_during_offstrategy_incremental_compaction_test_s3 is not supported for S3 storage yet, skipping test");
|
||||
return make_ready_future();
|
||||
#if 0
|
||||
return test_env::do_with_async([](test_env& env) { twcs_reshape_with_disjoint_set_fn(env); },
|
||||
return test_env::do_with_async([](test_env& env) { twcs_reshape_with_disjoint_set_fn(env, 64); },
|
||||
test_env_config{.storage = make_test_object_storage_options("S3")});
|
||||
#endif
|
||||
}
|
||||
|
||||
SEASTAR_FIXTURE_TEST_CASE(twcs_reshape_with_disjoint_set_gcs_test, gcs_fixture, *tests::check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) {
|
||||
return test_env::do_with_async([](test_env& env) { twcs_reshape_with_disjoint_set_fn(env); },
|
||||
return test_env::do_with_async([](test_env& env) { twcs_reshape_with_disjoint_set_fn(env, 64); },
|
||||
test_env_config{.storage = make_test_object_storage_options("GS")});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user