mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
test: parallelize SSTable creation in run_incremental_compaction_test
Pre-extract mutation pairs and use parallel_for_each with make_sstable_containing_async to create SSTables concurrently instead of sequentially. The post-creation loop still runs serially to collect token ranges and generations that depend on SSTable order.
This commit is contained in:
@@ -6828,24 +6828,32 @@ static future<> run_incremental_compaction_test(sstables::offstrategy offstrateg
|
||||
|
||||
std::unordered_set<sstables::generation_type> gens; // input sstable generations
|
||||
run_id run_identifier = run_id::create_random_id();
|
||||
|
||||
// Pre-extract mutation pairs for parallel SSTable creation
|
||||
std::vector<std::pair<mutation, mutation>> mutation_pairs;
|
||||
mutation_pairs.reserve(sstables_nr);
|
||||
auto merged_it = merged.begin();
|
||||
for (unsigned i = 0; i < sstables_nr; i++) {
|
||||
auto mut1 = std::move(*merged_it);
|
||||
merged_it++;
|
||||
auto mut2 = std::move(*merged_it);
|
||||
merged_it++;
|
||||
auto sst = make_sstable_containing(sst_gen, {
|
||||
std::move(mut1),
|
||||
std::move(mut2)
|
||||
}).get();
|
||||
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
|
||||
sst->set_sstable_level(offstrategy ? 0 : 1);
|
||||
mutation_pairs.emplace_back(std::move(mut1), std::move(mut2));
|
||||
}
|
||||
|
||||
// every sstable will be eligible for cleanup, by having both an owned and unowned token.
|
||||
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));
|
||||
ssts.resize(sstables_nr);
|
||||
parallel_for_each(std::views::iota(size_t(0), sstables_nr), [&](size_t i) -> future<> {
|
||||
ssts[i] = co_await make_sstable_containing(sst_gen, {
|
||||
std::move(mutation_pairs[i].first),
|
||||
std::move(mutation_pairs[i].second)
|
||||
});
|
||||
sstables::test(ssts[i]).set_run_identifier(run_identifier);
|
||||
ssts[i]->set_sstable_level(offstrategy ? 0 : 1);
|
||||
}).get();
|
||||
|
||||
gens.insert(sst->generation());
|
||||
ssts.push_back(std::move(sst));
|
||||
for (unsigned i = 0; i < sstables_nr; i++) {
|
||||
owned_token_ranges.push_back(dht::token_range::make_singular(ssts[i]->get_last_decorated_key().token()));
|
||||
gens.insert(ssts[i]->generation());
|
||||
}
|
||||
|
||||
size_t last_input_sstable_count = sstables_nr;
|
||||
|
||||
Reference in New Issue
Block a user