diff --git a/db/snapshot-ctl.cc b/db/snapshot-ctl.cc index 3fc85bf908..f5907f730a 100644 --- a/db/snapshot-ctl.cc +++ b/db/snapshot-ctl.cc @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,8 @@ #include "sstables/sstables_manager.hh" #include "service/storage_proxy.hh" +using namespace std::chrono_literals; + logging::logger snap_log("snapshots"); namespace db { @@ -39,6 +42,10 @@ snapshot_ctl::snapshot_ctl(sharded& db, sharded snapshot_ctl::stop() { @@ -53,6 +60,9 @@ future<> snapshot_ctl::disable_all_operations() { } co_await _ops.close(); } + // Wake up the expiration task and await for it to finish. + _expiration_cond.signal(); + co_await std::exchange(_delete_expired_snapshots, make_ready_future<>()); } future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional> filter) { @@ -183,10 +193,14 @@ future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vect t = resolve_table_name(ks_name, t); } co_await check_snapshot_not_exist(ks_name, tag, tables); + snap_log.debug("take_snapshot: tag={} keyspace={} tables={}: skip_flush={} created_at={} expires_at={}", + tag, ks_name, fmt::join(tables, ","), + opts.skip_flush, opts.created_at, opts.expires_at.value_or(gc_clock::time_point::min())); co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts); } future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector keyspace_names, sstring cf_name) { + snap_log.debug("clear_snapshot: tag={} keyspaces={} table={}", tag, fmt::join(keyspace_names, ","), cf_name); co_return co_await run_snapshot_modify_operation([this, tag = std::move(tag), keyspace_names = std::move(keyspace_names), cf_name = std::move(cf_name)] (this auto) -> future<> { // clear_snapshot enumerates keyspace_names and uses cf_name as a // filter in each. When cf_name needs resolution (e.g. logical index @@ -299,4 +313,57 @@ future snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) { })); } +future<> snapshot_ctl::delete_expired_snapshots() { + auto delete_expired_snapshot = [this](expiration_info info) { + snap_log.info("Deleting expired snapshot {} of table {}.{}", info.tag, info.ks_name, info.table_name); + // Awaited indirectly using the `_ops` gate + (void)run_snapshot_modify_operation([this, info = std::move(info)]() mutable { + return _db.local().clear_snapshot(info.tag, {info.ks_name}, info.table_name).handle_exception([info = std::move(info)](std::exception_ptr ep) { + snap_log.warn("Failed to delete expired snapshot {} of table {}.{}: {}: Ignored", info.tag, info.ks_name, info.table_name, ep); + }); + }); + }; + auto expiration_timer = timer([this] { + snap_log.debug("Expiration timer fired: queued={}", _expiration_queue.size()); + _expiration_cond.signal(); + }); + while (!_ops.is_closed()) { + if (!_expiration_queue.empty()) { + auto now = gc_clock::now(); + if (_expiration_queue.front().expires_at <= now) { + // FIXME: do not delete expired snapshots during backup + auto info = _expiration_queue.front(); + std::ranges::pop_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at); + _expiration_queue.resize(_expiration_queue.size() - 1); + delete_expired_snapshot(std::move(info)); + continue; + } else { + auto wait_duration = _expiration_queue.front().expires_at - now; + snap_log.debug("Expiration waiting for {}: queued={}", wait_duration, _expiration_queue.size()); + expiration_timer.rearm(timer<>::clock::now() + wait_duration); + } + } else { + snap_log.debug("Expiration waiting indefinitely: queue is empty"); + } + co_await _expiration_cond.wait(); + } } + +void snapshot_ctl::schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag) { + if (this_shard_id() != 0) { + on_internal_error(snap_log, "schedule_expiration must be called on shard 0"); + } + if (!_ops.is_closed()) { + snap_log.info("Scheduling expiration of snapshot {} of table {}.{} at {}", tag, ks_name, table_name, when); + _expiration_queue.emplace_back(expiration_info{ + .expires_at = when, + .ks_name = std::move(ks_name), + .table_name = std::move(table_name), + .tag = std::move(tag) + }); + std::ranges::push_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at); + _expiration_cond.signal(); + } +} + +} // namespace db diff --git a/db/snapshot-ctl.hh b/db/snapshot-ctl.hh index 1f22a20b8a..b0030bfc06 100644 --- a/db/snapshot-ctl.hh +++ b/db/snapshot-ctl.hh @@ -20,6 +20,7 @@ #include "tasks/task_manager.hh" #include #include +#include using namespace seastar; @@ -122,6 +123,9 @@ public: future true_snapshots_size(sstring ks, sstring cf); future<> disable_all_operations(); + + // Must be called on shard 0 + void schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag); private: config _config; sharded& _db; @@ -130,6 +134,16 @@ private: seastar::named_gate _ops; shared_ptr _task_manager_module; sstables::storage_manager& _storage_manager; + condition_variable _expiration_cond; + + struct expiration_info { + gc_clock::time_point expires_at; + sstring ks_name; + sstring table_name; + sstring tag; + }; + std::vector _expiration_queue; + future<> _delete_expired_snapshots = make_ready_future<>(); future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional> filter = {}); @@ -155,6 +169,8 @@ private: future<> do_take_snapshot(sstring tag, std::vector keyspace_names, snapshot_options opts = {} ); future<> do_take_column_family_snapshot(sstring ks_name, std::vector tables, sstring tag, snapshot_options opts = {}); future<> do_take_cluster_column_family_snapshot(std::vector ks_names, std::vector tables, sstring tag, snapshot_options opts = {}); + + future<> delete_expired_snapshots(); }; } diff --git a/main.cc b/main.cc index d56c154f5c..1a1b9c10c3 100644 --- a/main.cc +++ b/main.cc @@ -2264,7 +2264,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl .backup_sched_group = dbcfg.backup_scheduling_group, }; snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get(); - auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] { + db.local().plug_snapshot_ctl(snapshot_ctl.local()); + auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl, &db] { + db.local().unplug_snapshot_ctl(); snapshot_ctl.stop().get(); }); auto backup_throughput_update = io_throughput_updater("backup", dbcfg.backup_scheduling_group, cfg->backup_io_throughput_mb_per_sec); diff --git a/replica/database.cc b/replica/database.cc index f772f4c58c..0ab367d0c9 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -3610,6 +3610,14 @@ void database::unplug_view_update_generator() noexcept { _view_update_generator = nullptr; } +void database::plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept { + _snapshot_ctl = &snapshot_ctl; +} + +void database::unplug_snapshot_ctl() noexcept { + _snapshot_ctl = nullptr; +} + } // namespace replica mutation_reader make_multishard_streaming_reader(sharded& db, diff --git a/replica/database.hh b/replica/database.hh index c76e88ab19..8093c39312 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1774,6 +1774,8 @@ private: utils::disk_space_monitor::subscription _out_of_space_subscription; + db::snapshot_ctl* _snapshot_ctl = nullptr; + public: data_dictionary::database as_data_dictionary() const; db::commitlog* commitlog_for(const schema_ptr& schema); @@ -1800,6 +1802,9 @@ public: void plug_view_update_generator(db::view::view_update_generator& generator) noexcept; void unplug_view_update_generator() noexcept; + void plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept; + void unplug_snapshot_ctl() noexcept; + private: future<> flush_non_system_column_families(); future<> flush_system_column_families(); @@ -2100,6 +2105,10 @@ public: static future<> snapshot_tables_on_all_shards(sharded& sharded_db, std::string_view ks_name, std::vector table_names, sstring tag, db::snapshot_options opts); static future<> snapshot_keyspace_on_all_shards(sharded& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts); + db::snapshot_ctl* get_snapshot_ctl_ptr() { + return _snapshot_ctl; + } + public: bool update_column_family(schema_ptr s); private: diff --git a/replica/table.cc b/replica/table.cc index 3b14f34650..3a8165b79d 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -4355,7 +4355,7 @@ future<> database::snapshot_table_on_all_shards(sharded& sharded_db, c } } } - co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s, + co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, opts, s, tablet_count, tablet_layout).handle_exception([&] (std::exception_ptr ptr) { tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr); ex = std::move(ptr); @@ -4365,6 +4365,15 @@ future<> database::snapshot_table_on_all_shards(sharded& sharded_db, c } co_await writer->sync(); + + if (opts.expires_at) { + tlogger.info("snapshot {}: scheduled to expire at {}", name, opts.expires_at.value()); + co_await sharded_db.invoke_on(0, [when = *opts.expires_at, ks_name = s->ks_name(), table_name = s->cf_name(), name] (database& db) { + if (auto snap_ctl_ptr = db.get_snapshot_ctl_ptr()) { + snap_ctl_ptr->schedule_expiration(when, ks_name, table_name, name); + } + }); + } }); } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 80724a408b..718747a54d 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include #undef SEASTAR_TESTING_MAIN @@ -781,6 +782,14 @@ SEASTAR_THREAD_TEST_CASE(test_auto_snapshot_ttl) { std::string table_name = "test"; size_t num_keys = 100; do_with_some_data_in_thread({table_name}, [&] (cql_test_env& e) { + sharded sc; + sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get(); + auto stop_sc = deferred_stop(sc); + e.local_db().plug_snapshot_ctl(sc.local()); + auto unplug_sc = deferred_action([&] () noexcept { + e.local_db().unplug_snapshot_ctl(); + }); + auto min_time = gc_clock::now(); take_snapshot(e, ks_name, table_name).get(); @@ -814,6 +823,16 @@ SEASTAR_THREAD_TEST_CASE(test_auto_snapshot_ttl) { const auto& topology = e.local_db().get_token_metadata().get_topology(); validate_manifest(topology, snapshot_dir, in_snapshot_dir, min_time, tablets_enabled, ttl).get(); + + // Wait for snapshot garbage collection after expiry, polling for directory removal + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds((ttl + 10) * 1s); + sleep(ttl * 1s).get(); + while (file_exists(snapshot_dir.native()).get()) { + if (std::chrono::steady_clock::now() > deadline) { + BOOST_FAIL(fmt::format("Snapshot directory {} still exists after waiting for TTL expiry", snapshot_dir)); + } + sleep(100ms).get(); + } }, create_mvs, db_cfg_ptr, num_keys).get(); }