db: snapshot_ctl: add deletion of expired snapshots

Add a task running on shard 0 that deletes expired snapshots.
Deletion is scheduled whenever an automatic snapshot is taken with a ttl.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2026-02-19 17:11:53 +02:00
parent 9e72079402
commit 4234cd8e16
7 changed files with 132 additions and 2 deletions

View File

@@ -13,6 +13,7 @@
#include <algorithm>
#include <stdexcept>
#include <seastar/core/coroutine.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
#include <seastar/coroutine/parallel_for_each.hh>
@@ -26,6 +27,8 @@
#include "sstables/sstables_manager.hh"
#include "service/storage_proxy.hh"
using namespace std::chrono_literals;
logging::logger snap_log("snapshots");
namespace db {
@@ -39,6 +42,10 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
, _storage_manager(sstm)
{
tm.register_module("snapshot", _task_manager_module);
// FIXME: scan existing snapshots on disk and schedule expiration for those with ttl.
if (this_shard_id() == 0) {
_delete_expired_snapshots = delete_expired_snapshots();
}
}
future<> snapshot_ctl::stop() {
@@ -53,6 +60,9 @@ future<> snapshot_ctl::disable_all_operations() {
}
co_await _ops.close();
}
// Wake up the expiration task and await for it to finish.
_expiration_cond.signal();
co_await std::exchange(_delete_expired_snapshots, make_ready_future<>());
}
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
@@ -183,10 +193,14 @@ future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vect
t = resolve_table_name(ks_name, t);
}
co_await check_snapshot_not_exist(ks_name, tag, tables);
snap_log.debug("take_snapshot: tag={} keyspace={} tables={}: skip_flush={} created_at={} expires_at={}",
tag, ks_name, fmt::join(tables, ","),
opts.skip_flush, opts.created_at, opts.expires_at.value_or(gc_clock::time_point::min()));
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
}
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
snap_log.debug("clear_snapshot: tag={} keyspaces={} table={}", tag, fmt::join(keyspace_names, ","), cf_name);
co_return co_await run_snapshot_modify_operation([this, tag = std::move(tag), keyspace_names = std::move(keyspace_names), cf_name = std::move(cf_name)] (this auto) -> future<> {
// clear_snapshot enumerates keyspace_names and uses cf_name as a
// filter in each. When cf_name needs resolution (e.g. logical index
@@ -299,4 +313,57 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
}));
}
future<> snapshot_ctl::delete_expired_snapshots() {
auto delete_expired_snapshot = [this](expiration_info info) {
snap_log.info("Deleting expired snapshot {} of table {}.{}", info.tag, info.ks_name, info.table_name);
// Awaited indirectly using the `_ops` gate
(void)run_snapshot_modify_operation([this, info = std::move(info)]() mutable {
return _db.local().clear_snapshot(info.tag, {info.ks_name}, info.table_name).handle_exception([info = std::move(info)](std::exception_ptr ep) {
snap_log.warn("Failed to delete expired snapshot {} of table {}.{}: {}: Ignored", info.tag, info.ks_name, info.table_name, ep);
});
});
};
auto expiration_timer = timer([this] {
snap_log.debug("Expiration timer fired: queued={}", _expiration_queue.size());
_expiration_cond.signal();
});
while (!_ops.is_closed()) {
if (!_expiration_queue.empty()) {
auto now = gc_clock::now();
if (_expiration_queue.front().expires_at <= now) {
// FIXME: do not delete expired snapshots during backup
auto info = _expiration_queue.front();
std::ranges::pop_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at);
_expiration_queue.resize(_expiration_queue.size() - 1);
delete_expired_snapshot(std::move(info));
continue;
} else {
auto wait_duration = _expiration_queue.front().expires_at - now;
snap_log.debug("Expiration waiting for {}: queued={}", wait_duration, _expiration_queue.size());
expiration_timer.rearm(timer<>::clock::now() + wait_duration);
}
} else {
snap_log.debug("Expiration waiting indefinitely: queue is empty");
}
co_await _expiration_cond.wait();
}
}
void snapshot_ctl::schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag) {
if (this_shard_id() != 0) {
on_internal_error(snap_log, "schedule_expiration must be called on shard 0");
}
if (!_ops.is_closed()) {
snap_log.info("Scheduling expiration of snapshot {} of table {}.{} at {}", tag, ks_name, table_name, when);
_expiration_queue.emplace_back(expiration_info{
.expires_at = when,
.ks_name = std::move(ks_name),
.table_name = std::move(table_name),
.tag = std::move(tag)
});
std::ranges::push_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at);
_expiration_cond.signal();
}
}
} // namespace db

View File

@@ -20,6 +20,7 @@
#include "tasks/task_manager.hh"
#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/core/condition-variable.hh>
using namespace seastar;
@@ -122,6 +123,9 @@ public:
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
future<> disable_all_operations();
// Must be called on shard 0
void schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag);
private:
config _config;
sharded<replica::database>& _db;
@@ -130,6 +134,16 @@ private:
seastar::named_gate _ops;
shared_ptr<snapshot::task_manager_module> _task_manager_module;
sstables::storage_manager& _storage_manager;
condition_variable _expiration_cond;
struct expiration_info {
gc_clock::time_point expires_at;
sstring ks_name;
sstring table_name;
sstring tag;
};
std::vector<expiration_info> _expiration_queue;
future<> _delete_expired_snapshots = make_ready_future<>();
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
@@ -155,6 +169,8 @@ private:
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> delete_expired_snapshots();
};
}

View File

@@ -2264,7 +2264,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
.backup_sched_group = dbcfg.backup_scheduling_group,
};
snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] {
db.local().plug_snapshot_ctl(snapshot_ctl.local());
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl, &db] {
db.local().unplug_snapshot_ctl();
snapshot_ctl.stop().get();
});
auto backup_throughput_update = io_throughput_updater("backup", dbcfg.backup_scheduling_group, cfg->backup_io_throughput_mb_per_sec);

View File

@@ -3610,6 +3610,14 @@ void database::unplug_view_update_generator() noexcept {
_view_update_generator = nullptr;
}
void database::plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept {
_snapshot_ctl = &snapshot_ctl;
}
void database::unplug_snapshot_ctl() noexcept {
_snapshot_ctl = nullptr;
}
} // namespace replica
mutation_reader make_multishard_streaming_reader(sharded<replica::database>& db,

View File

@@ -1774,6 +1774,8 @@ private:
utils::disk_space_monitor::subscription _out_of_space_subscription;
db::snapshot_ctl* _snapshot_ctl = nullptr;
public:
data_dictionary::database as_data_dictionary() const;
db::commitlog* commitlog_for(const schema_ptr& schema);
@@ -1800,6 +1802,9 @@ public:
void plug_view_update_generator(db::view::view_update_generator& generator) noexcept;
void unplug_view_update_generator() noexcept;
void plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept;
void unplug_snapshot_ctl() noexcept;
private:
future<> flush_non_system_column_families();
future<> flush_system_column_families();
@@ -2100,6 +2105,10 @@ public:
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
db::snapshot_ctl* get_snapshot_ctl_ptr() {
return _snapshot_ctl;
}
public:
bool update_column_family(schema_ptr s);
private:

View File

@@ -4355,7 +4355,7 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
}
}
}
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s,
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, opts, s,
tablet_count, tablet_layout).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
ex = std::move(ptr);
@@ -4365,6 +4365,15 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
}
co_await writer->sync();
if (opts.expires_at) {
tlogger.info("snapshot {}: scheduled to expire at {}", name, opts.expires_at.value());
co_await sharded_db.invoke_on(0, [when = *opts.expires_at, ks_name = s->ks_name(), table_name = s->cf_name(), name] (database& db) {
if (auto snap_ctl_ptr = db.get_snapshot_ctl_ptr()) {
snap_ctl_ptr->schedule_expiration(when, ks_name, table_name, name);
}
});
}
});
}

View File

@@ -14,6 +14,7 @@
#include <seastar/core/thread.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/bitops.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/file.hh>
#undef SEASTAR_TESTING_MAIN
@@ -781,6 +782,14 @@ SEASTAR_THREAD_TEST_CASE(test_auto_snapshot_ttl) {
std::string table_name = "test";
size_t num_keys = 100;
do_with_some_data_in_thread({table_name}, [&] (cql_test_env& e) {
sharded<db::snapshot_ctl> sc;
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
auto stop_sc = deferred_stop(sc);
e.local_db().plug_snapshot_ctl(sc.local());
auto unplug_sc = deferred_action([&] () noexcept {
e.local_db().unplug_snapshot_ctl();
});
auto min_time = gc_clock::now();
take_snapshot(e, ks_name, table_name).get();
@@ -814,6 +823,16 @@ SEASTAR_THREAD_TEST_CASE(test_auto_snapshot_ttl) {
const auto& topology = e.local_db().get_token_metadata().get_topology();
validate_manifest(topology, snapshot_dir, in_snapshot_dir, min_time, tablets_enabled, ttl).get();
// Wait for snapshot garbage collection after expiry, polling for directory removal
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds((ttl + 10) * 1s);
sleep(ttl * 1s).get();
while (file_exists(snapshot_dir.native()).get()) {
if (std::chrono::steady_clock::now() > deadline) {
BOOST_FAIL(fmt::format("Snapshot directory {} still exists after waiting for TTL expiry", snapshot_dir));
}
sleep(100ms).get();
}
}, create_mvs, db_cfg_ptr, num_keys).get();
}