replica: Extend table::discard_sstables() to operate on all compaction groups

discard_sstables() runs on context of truncate, which is a table-wide
operation today, and will remain so with multiple static groups.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2022-12-15 17:06:57 -03:00
parent 24c3687c3f
commit 36e11eb2a5

View File

@@ -1709,30 +1709,30 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
struct pruner {
column_family& cf;
compaction_group& cg;
db::replay_position rp;
struct removed_sstable {
compaction_group& cg;
sstables::shared_sstable sst;
replica::enable_backlog_tracker enable_backlog_tracker;
};
std::vector<removed_sstable> remove;
pruner(column_family& cf, compaction_group& cg)
: cf(cf), cg(cg) {}
pruner(column_family& cf)
: cf(cf) {}
void prune(db_clock::time_point truncated_at) {
void prune(compaction_group& cg, db_clock::time_point truncated_at) {
auto gc_trunc = to_gc_clock(truncated_at);
auto pruned = make_lw_shared<sstables::sstable_set>(cf._compaction_strategy.make_sstable_set(cf._schema));
auto maintenance_pruned = cf.make_maintenance_sstable_set();
auto prune = [this, &gc_trunc] (lw_shared_ptr<sstables::sstable_set>& pruned,
auto prune = [this, &cg, &gc_trunc] (lw_shared_ptr<sstables::sstable_set>& pruned,
const lw_shared_ptr<sstables::sstable_set>& pruning,
replica::enable_backlog_tracker enable_backlog_tracker) mutable {
pruning->for_each_sstable([&] (const sstables::shared_sstable& p) mutable {
if (p->max_data_age() <= gc_trunc) {
rp = std::max(p->get_stats_metadata().position, rp);
remove.emplace_back(removed_sstable{p, enable_backlog_tracker});
remove.emplace_back(removed_sstable{cg, p, enable_backlog_tracker});
return;
}
pruned->insert(p);
@@ -1743,18 +1743,20 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
cg.set_main_sstables(std::move(pruned));
cg.set_maintenance_sstables(std::move(maintenance_pruned));
cf.refresh_compound_sstable_set();
}
};
auto p = make_lw_shared<pruner>(*this, *_compaction_group);
co_await _cache.invalidate(row_cache::external_updater([p, truncated_at] {
p->prune(truncated_at);
auto p = make_lw_shared<pruner>(*this);
co_await _cache.invalidate(row_cache::external_updater([this, p, truncated_at] {
for (const compaction_group_ptr& cg : compaction_groups()) {
p->prune(*cg, truncated_at);
}
refresh_compound_sstable_set();
tlogger.debug("cleaning out row cache");
}));
rebuild_statistics();
co_await coroutine::parallel_for_each(p->remove, [this, p] (pruner::removed_sstable& r) {
if (r.enable_backlog_tracker) {
remove_sstable_from_backlog_tracker(p->cg.get_backlog_tracker(), r.sst);
remove_sstable_from_backlog_tracker(r.cg.get_backlog_tracker(), r.sst);
}
return sstables::sstable_directory::delete_atomically({r.sst});
});