mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 22:13:19 +00:00
Merge 'Add basic support for snapshot ttl to auto_snapshot and api' from Benny Halevy
The snapshot TTL is applied to the snapshot manifest.json as the `expires_at` attribute. It will be used in the future by an external service like scylla-manager or siren to manage the snapshot life cycle. In Scylla, it is used just to garbage collect orphaned snapshots that were not backed up and cleaned up in time. A garbage collector thread was added to snapshot_ctl that cleans up the snapshot when it expires. The series adds 2 paths setting the snapshot ttl: - db/config: add auto_snapshot_ttl - api, nodetool: add snapshot ttl option The new functionality in Scylla is comparable to the corresponding features in Cassandra (comparison based on https://github.com/scylladb/scylladb/issues/13409): 1) Cassandra added in release 4.1 the auto_snapshot_ttl option which is described in [cassandra.apache.org/doc/latest/cassandra/configuration/cass_yaml_file.html#auto_snapshot_ttl](https://cassandra.apache.org/doc/latest/cassandra/configuration/cass_yaml_file.html#auto_snapshot_ttl) as: > Adds a time-to-live (TTL) to auto snapshots generated by table truncation or drop (when enabled). After the TTL is elapsed, the snapshot is automatically cleared. The behavior is now the same in Scylla > By default, auto snapshots do not have TTL In scylla, existing clusters will have no auto_snapshot_ttl, however new clusters installed with the updated scylla.conf will have a default auto_snapshot_ttl of 10 days (864000 seconds) > Accepted units: d (days), h (hours) or m (minutes) The configuration option is always in seconds, no support for unit suffix. TTL values passed to the api directly or via and nodetool can be optionally followed by 's' for seconds (the default), 'm' for minutes, 'h' for hours, or 'd' for days. > [issues.apache.org/jira/browse/CASSANDRA-16789](https://issues.apache.org/jira/browse/CASSANDRA-16789), commitad24942481- add a thread that every minute checks to see if there are TTLed snapshots to be deleted, and also add support in nodetool. In scylla, the background thread wakes up if there are scheduled expirations. Clearing of expired snapshots on restart is not implemented yet. > This is for automatically-created snapshots. Additionally, Cassandra added the ability to set a ttl on manually created snapshots by the nodetool snapshot command - by adding a --ttl ...option to that command. The equivalent functionality is to pass a --ttl option when taking a snapshot. There is no support to set a TTL on an existing snapshot (nor there is a plan to do so). > `nodetool listsnapshots` was also updated to list the snapshots' TTLs. See [issues.apache.org/jira/browse/CASSANDRA-16789](https://issues.apache.org/jira/browse/CASSANDRA-16789), commitad24942481. TODO, see https://scylladb.atlassian.net/browse/SCYLLADB-1078 Fixes SCYLLADB-190 Fixes SCYLLADB-191 Fixes SCYLLADB-787 Fixes SCYLLADB-789 * New feature, no backport required Closes scylladb/scylladb#28759 * github.com:scylladb/scylladb: db: snapshot-ctl: add cancel_expiration api, nodetool: add snapshot ttl option test/cqlpy/test_virtual_tables: add verfication of snapshot directory test/cqlpy/test_virtual_tables: consistenly pass a set of expected tables to verify_snapshots db: snapshot_ctl: add deletion of expired snapshots database: apply auto_snapshot_ttl db/config: add auto_snapshot_ttl db/config: make auto_snapshot live-updateable
This commit is contained in:
@@ -24,6 +24,8 @@
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <chrono>
|
||||
#include <boost/regex.hpp>
|
||||
#include <string>
|
||||
#include <boost/algorithm/string/trim_all.hpp>
|
||||
#include <boost/algorithm/string/case_conv.hpp>
|
||||
#include <boost/functional/hash.hpp>
|
||||
@@ -145,6 +147,37 @@ std::pair<sstring, std::vector<table_info>> parse_table_infos(const http_context
|
||||
return std::make_pair(std::move(keyspace), std::move(tis));
|
||||
}
|
||||
|
||||
std::optional<std::chrono::seconds> validate_ttl(const std::string& value) {
|
||||
if (value.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// Match an integer, optional whitespace, and an optional single-character suffix
|
||||
static const boost::regex re(R"((\d+)\s*([smhd])?)", boost::regex_constants::icase);
|
||||
boost::smatch match;
|
||||
if (!boost::regex_match(value, match, re)) {
|
||||
throw bad_param_exception(fmt::format("TTL value '{}' is not valid, expected a non-negative integer with an optional suffix [smhd]", value));
|
||||
}
|
||||
|
||||
int res;
|
||||
try {
|
||||
res = std::stoi(match[1].str());
|
||||
} catch (...) {
|
||||
throw bad_param_exception(fmt::format("Parsing TTL value '{}' failed: {}", value, std::current_exception()));
|
||||
}
|
||||
|
||||
auto suffix = match[2].str();
|
||||
auto c = suffix.empty() ? 's' : std::tolower(suffix[0]);
|
||||
switch (c) {
|
||||
case 's': return std::chrono::seconds(res);
|
||||
case 'm': return std::chrono::minutes(res);
|
||||
case 'h': return std::chrono::hours(res);
|
||||
case 'd': return std::chrono::days(res);
|
||||
}
|
||||
|
||||
std::unreachable();
|
||||
}
|
||||
|
||||
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
|
||||
ss::token_range r;
|
||||
r.start_token = d._start_token;
|
||||
@@ -2132,6 +2165,10 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
};
|
||||
auto ttl = validate_ttl(req->get_query_param("ttl"));
|
||||
if (ttl && *ttl > 0s) {
|
||||
opts.expires_at = opts.created_at + std::chrono::seconds(*ttl);
|
||||
}
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
try {
|
||||
|
||||
@@ -66,6 +66,13 @@ struct scrub_info {
|
||||
|
||||
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
|
||||
|
||||
// TTL is given as a number with an optional a suffix of:
|
||||
// s - seconds (default)
|
||||
// m - minutes
|
||||
// h - hours
|
||||
// d - days
|
||||
std::optional<std::chrono::seconds> validate_ttl(const std::string& value);
|
||||
|
||||
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
void unset_storage_service(http_context& ctx, httpd::routes& r);
|
||||
void set_sstables_loader(http_context& ctx, httpd::routes& r, sharded<sstables_loader>& sst_loader);
|
||||
|
||||
@@ -372,7 +372,16 @@ commitlog_total_space_in_mb: -1
|
||||
# or dropping of column families. The STRONGLY advised default of true
|
||||
# should be used to provide data safety. If you set this flag to false, you will
|
||||
# lose data on truncation or drop.
|
||||
# auto_snapshot: true
|
||||
#
|
||||
# `auto_snapshot_ttl` specifies the time-to-live (TTL) for automatic snapshots in seconds.
|
||||
# A value of 0 means snapshots are kept indefinitely.
|
||||
# The auto-snapshot will be deleted automatically when it expires.
|
||||
# It is a good practice to set `auto_snapshot_ttl` to a reasonable time allowing backup
|
||||
# of the auto-snapshot, but to have a safety net that will automatically clean up stale snapshots
|
||||
# to reclaim their disk space.
|
||||
#
|
||||
auto_snapshot: true
|
||||
auto_snapshot_ttl: 864000
|
||||
|
||||
# When executing a scan, within or across a partition, we need to keep the
|
||||
# tombstones seen in memory so we can return them to the coordinator, which
|
||||
|
||||
@@ -1053,8 +1053,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
/**
|
||||
* @Group Advanced automatic backup setting
|
||||
*/
|
||||
, auto_snapshot(this, "auto_snapshot", value_status::Used, true,
|
||||
, auto_snapshot(this, "auto_snapshot", liveness::LiveUpdate, value_status::Used, true,
|
||||
"Enable or disable whether a snapshot is taken of the data before keyspace truncation or dropping of tables. To prevent data loss, using the default setting is strongly advised. If you set to false, you will lose data on truncation or drop.")
|
||||
, auto_snapshot_ttl(this, "auto_snapshot_ttl", liveness::LiveUpdate, value_status::Used, 0,
|
||||
"The time-to-live (TTL) for automatic snapshots in seconds. A value of 0 means snapshots are kept indefinitely.")
|
||||
/**
|
||||
* @Group Key caches and global row properties
|
||||
* @GroupDescription When creating or modifying tables, you enable or disable the key cache (partition key cache) or row cache for that table by setting the caching parameter. Other row and key cache tuning and configuration options are set at the global (node) level. Cassandra uses these settings to automatically distribute memory for each table on the node based on the overall workload and specific table usage. You can also configure the save periods for these caches globally.
|
||||
|
||||
@@ -301,6 +301,7 @@ public:
|
||||
named_value<sstring> partitioner;
|
||||
named_value<uint16_t> storage_port;
|
||||
named_value<bool> auto_snapshot;
|
||||
named_value<uint32_t> auto_snapshot_ttl;
|
||||
named_value<uint32_t> key_cache_keys_to_save;
|
||||
named_value<uint32_t> key_cache_save_period;
|
||||
named_value<uint32_t> key_cache_size_in_mb;
|
||||
|
||||
@@ -12,7 +12,10 @@
|
||||
|
||||
#include <algorithm>
|
||||
#include <stdexcept>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/switch_to.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
@@ -26,6 +29,8 @@
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
logging::logger snap_log("snapshots");
|
||||
|
||||
namespace db {
|
||||
@@ -39,6 +44,10 @@ snapshot_ctl::snapshot_ctl(sharded<replica::database>& db, sharded<service::stor
|
||||
, _storage_manager(sstm)
|
||||
{
|
||||
tm.register_module("snapshot", _task_manager_module);
|
||||
// FIXME: scan existing snapshots on disk and schedule expiration for those with ttl.
|
||||
if (this_shard_id() == 0) {
|
||||
_delete_expired_snapshots = delete_expired_snapshots();
|
||||
}
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::stop() {
|
||||
@@ -53,6 +62,9 @@ future<> snapshot_ctl::disable_all_operations() {
|
||||
}
|
||||
co_await _ops.close();
|
||||
}
|
||||
// Wake up the expiration task and await for it to finish.
|
||||
_expiration_cond.signal();
|
||||
co_await std::exchange(_delete_expired_snapshots, make_ready_future<>());
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
|
||||
@@ -183,10 +195,17 @@ future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vect
|
||||
t = resolve_table_name(ks_name, t);
|
||||
}
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
snap_log.debug("take_snapshot: tag={} keyspace={} tables={}: skip_flush={} created_at={} expires_at={}",
|
||||
tag, ks_name, fmt::join(tables, ","),
|
||||
opts.skip_flush, opts.created_at, opts.expires_at.value_or(gc_clock::time_point::min()));
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
|
||||
snap_log.debug("clear_snapshot: tag={} keyspaces={} table={}", tag, fmt::join(keyspace_names, ","), cf_name);
|
||||
co_await container().invoke_on(0, [&] (auto& sc) {
|
||||
return sc.cancel_expiration(tag, keyspace_names, cf_name);
|
||||
});
|
||||
co_return co_await run_snapshot_modify_operation([this, tag = std::move(tag), keyspace_names = std::move(keyspace_names), cf_name = std::move(cf_name)] (this auto) -> future<> {
|
||||
// clear_snapshot enumerates keyspace_names and uses cf_name as a
|
||||
// filter in each. When cf_name needs resolution (e.g. logical index
|
||||
@@ -262,6 +281,7 @@ future<tasks::task_id> snapshot_ctl::start_backup(sstring endpoint, sstring buck
|
||||
if (!storage_options.is_local_type()) {
|
||||
throw std::invalid_argument("not able to backup a non-local table");
|
||||
}
|
||||
cancel_expiration(snapshot_name, {keyspace}, table);
|
||||
auto& local_storage_options = std::get<data_dictionary::storage_options::local>(storage_options.value);
|
||||
//
|
||||
// The keyspace data directories and their snapshots are arranged as follows:
|
||||
@@ -299,4 +319,79 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
|
||||
}));
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::delete_expired_snapshots() {
|
||||
auto delete_expired_snapshot = [this](expiration_info info) {
|
||||
snap_log.info("Deleting expired snapshot {} of table {}.{}", info.tag, info.ks_name, info.table_name);
|
||||
// Awaited indirectly using the `_ops` gate
|
||||
(void)run_snapshot_modify_operation([this, info = std::move(info)]() mutable {
|
||||
return _db.local().clear_snapshot(info.tag, {info.ks_name}, info.table_name).handle_exception([info = std::move(info)](std::exception_ptr ep) {
|
||||
snap_log.warn("Failed to delete expired snapshot {} of table {}.{}: {}: Ignored", info.tag, info.ks_name, info.table_name, ep);
|
||||
});
|
||||
});
|
||||
};
|
||||
auto expiration_timer = timer([this] {
|
||||
snap_log.debug("Expiration timer fired: queued={}", _expiration_queue.size());
|
||||
_expiration_cond.signal();
|
||||
});
|
||||
while (!_ops.is_closed()) {
|
||||
if (!_expiration_queue.empty()) {
|
||||
auto now = gc_clock::now();
|
||||
if (_expiration_queue.front().expires_at <= now) {
|
||||
// FIXME: do not delete expired snapshots during backup
|
||||
auto info = _expiration_queue.front();
|
||||
std::ranges::pop_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at);
|
||||
_expiration_queue.resize(_expiration_queue.size() - 1);
|
||||
delete_expired_snapshot(std::move(info));
|
||||
continue;
|
||||
} else {
|
||||
auto wait_duration = _expiration_queue.front().expires_at - now;
|
||||
snap_log.debug("Expiration waiting for {}: queued={}", wait_duration, _expiration_queue.size());
|
||||
expiration_timer.rearm(timer<>::clock::now() + wait_duration);
|
||||
}
|
||||
} else {
|
||||
snap_log.debug("Expiration waiting indefinitely: queue is empty");
|
||||
}
|
||||
co_await _expiration_cond.wait();
|
||||
}
|
||||
}
|
||||
|
||||
void snapshot_ctl::schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(snap_log, "schedule_expiration must be called on shard 0");
|
||||
}
|
||||
if (!_ops.is_closed()) {
|
||||
snap_log.info("Scheduling expiration of snapshot {} of table {}.{} at {}", tag, ks_name, table_name, when);
|
||||
_expiration_queue.emplace_back(expiration_info{
|
||||
.expires_at = when,
|
||||
.ks_name = std::move(ks_name),
|
||||
.table_name = std::move(table_name),
|
||||
.tag = std::move(tag)
|
||||
});
|
||||
std::ranges::push_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at);
|
||||
_expiration_cond.signal();
|
||||
}
|
||||
}
|
||||
|
||||
void snapshot_ctl::cancel_expiration(sstring tag, std::vector<sstring> ks_names, sstring table_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(snap_log, "cancel_expiration must be called on shard 0");
|
||||
}
|
||||
snap_log.debug("Cancel expiration of snapshots with tag='{}' in keyspaces={} table={}", tag, fmt::join(ks_names, ","), table_name);
|
||||
std::unordered_set<sstring> keyspaces;
|
||||
std::ranges::move(ks_names, std::inserter(keyspaces, keyspaces.end()));
|
||||
_expiration_queue.erase(std::remove_if(_expiration_queue.begin(), _expiration_queue.end(), [&] (const expiration_info& info) {
|
||||
if (!tag.empty() && info.tag != tag) {
|
||||
return false;
|
||||
}
|
||||
if (!keyspaces.empty() && !keyspaces.contains(info.ks_name)) {
|
||||
return false;
|
||||
}
|
||||
if (!table_name.empty() && info.table_name != table_name) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}), _expiration_queue.end());
|
||||
std::ranges::make_heap(_expiration_queue, std::greater{}, &expiration_info::expires_at);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "tasks/task_manager.hh"
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
@@ -122,6 +123,15 @@ public:
|
||||
future<int64_t> true_snapshots_size(sstring ks, sstring cf);
|
||||
|
||||
future<> disable_all_operations();
|
||||
|
||||
// Must be called on shard 0
|
||||
void schedule_expiration(gc_clock::time_point when, sstring ks_name, sstring table_name, sstring tag);
|
||||
|
||||
// For canceling expiration, ks_name or table_name can be empty
|
||||
// And then all snapshots with the given tag (or all, if `tag` is empty) are erased from the expiration queue
|
||||
// within the given scope.
|
||||
// Must be called on shard 0
|
||||
void cancel_expiration(sstring tag, std::vector<sstring> ks_names = {}, sstring table_name = "");
|
||||
private:
|
||||
config _config;
|
||||
sharded<replica::database>& _db;
|
||||
@@ -130,6 +140,16 @@ private:
|
||||
seastar::named_gate _ops;
|
||||
shared_ptr<snapshot::task_manager_module> _task_manager_module;
|
||||
sstables::storage_manager& _storage_manager;
|
||||
condition_variable _expiration_cond;
|
||||
|
||||
struct expiration_info {
|
||||
gc_clock::time_point expires_at;
|
||||
sstring ks_name;
|
||||
sstring table_name;
|
||||
sstring tag;
|
||||
};
|
||||
std::vector<expiration_info> _expiration_queue;
|
||||
future<> _delete_expired_snapshots = make_ready_future<>();
|
||||
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
|
||||
|
||||
@@ -155,6 +175,8 @@ private:
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
|
||||
future<> delete_expired_snapshots();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,12 @@ Apart from *planned backup* procedure described above, and as a safeguard from *
|
||||
|
||||
The default setting for the ``auto_snapshot`` flag in ``/etc/scylla/scylla.yaml`` file is ``true``. It is **not** recommended to set it to ``false``, unless there is a good backup and recovery strategy in place.
|
||||
|
||||
The automatically created snapshots remain on local storage until they are backed-up using the ``move_files`` option or they are explicitly deleted.
|
||||
Otherwise, stale snapshots might cause a node to run out of storage space by holding up to the SSTables in the snapshot after they are deleted from the table (by compaction, tablet-migration, TRUNCATE, DROP TABLE, and so on).
|
||||
The ``auto_snapshot_ttl`` configuration option can be used to automatically delete stale snapshots.
|
||||
|
||||
The default setting for the ``auto_snapshot_ttl`` option in ``/etc/scylla/scylla.yaml`` file is ``864000`` seconds (10 days). It is recommended to set it to a reasonable time allowing backup of the auto-snapshot, yet have a safety net that will automatically clean up stale snapshots to reclaim their disk space.
|
||||
|
||||
Snapshot Creation
|
||||
-----------------
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ SYNOPSIS
|
||||
[(-u <username> | --username <username>)] snapshot
|
||||
[(-cf <table> | --column-family <table> | --table <table>)]
|
||||
[(-kc <kclist> | --kc.list <kclist>)]
|
||||
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
|
||||
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--ttl <ttl>] [--] [<keyspaces...>]
|
||||
|
||||
OPTIONS
|
||||
.......
|
||||
@@ -38,6 +38,10 @@ Parameter Descriptio
|
||||
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-t <tag> / --tag <tag> The name of the snapshot
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
--ttl <ttl> The time-to-live for the snapshot, optionally followed by:
|
||||
's' for seconds (the default), 'm' for minutes, 'h' for hours, or 'd' for days.
|
||||
Missing TTL, or 0, means no TTL
|
||||
==================================================================== =====================================================================================
|
||||
|
||||
``--`` This option can be used to separate command-line options from the list of argument, (useful when arguments might be mistaken for command-line options.
|
||||
|
||||
4
main.cc
4
main.cc
@@ -2255,7 +2255,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
.backup_sched_group = dbcfg.backup_scheduling_group,
|
||||
};
|
||||
snapshot_ctl.start(std::ref(db), std::ref(proxy), std::ref(task_manager), std::ref(sstm), snap_cfg).get();
|
||||
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl] {
|
||||
db.local().plug_snapshot_ctl(snapshot_ctl.local());
|
||||
auto stop_snapshot_ctl = defer_verbose_shutdown("snapshots", [&snapshot_ctl, &db] {
|
||||
db.local().unplug_snapshot_ctl();
|
||||
snapshot_ctl.stop().get();
|
||||
});
|
||||
auto backup_throughput_update = io_throughput_updater("backup", dbcfg.backup_scheduling_group, cfg->backup_io_throughput_mb_per_sec);
|
||||
|
||||
@@ -3157,7 +3157,15 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
|
||||
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
|
||||
auto name = snapshot_name_opt.value_or(
|
||||
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
|
||||
co_await snapshot_table_on_all_shards(sharded_db, table_shards, name, db::snapshot_options{});
|
||||
auto ttl = sharded_db.local().get_config().auto_snapshot_ttl();
|
||||
db::snapshot_options opts;
|
||||
if (ttl) {
|
||||
// Add one second to compensate for created_at being truncated
|
||||
// to second resolution, ensuring the snapshot lives for at least
|
||||
// auto_snapshot_ttl seconds.
|
||||
opts.expires_at = opts.created_at + (ttl + 1) * 1s;
|
||||
}
|
||||
co_await snapshot_table_on_all_shards(sharded_db, table_shards, name, opts);
|
||||
}
|
||||
|
||||
co_await sharded_db.invoke_on_all([&] (database& db) {
|
||||
@@ -3607,6 +3615,14 @@ void database::unplug_view_update_generator() noexcept {
|
||||
_view_update_generator = nullptr;
|
||||
}
|
||||
|
||||
void database::plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept {
|
||||
_snapshot_ctl = &snapshot_ctl;
|
||||
}
|
||||
|
||||
void database::unplug_snapshot_ctl() noexcept {
|
||||
_snapshot_ctl = nullptr;
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
mutation_reader make_multishard_streaming_reader(sharded<replica::database>& db,
|
||||
|
||||
@@ -1779,6 +1779,8 @@ private:
|
||||
|
||||
utils::disk_space_monitor::subscription _out_of_space_subscription;
|
||||
|
||||
db::snapshot_ctl* _snapshot_ctl = nullptr;
|
||||
|
||||
public:
|
||||
data_dictionary::database as_data_dictionary() const;
|
||||
db::commitlog* commitlog_for(const schema_ptr& schema);
|
||||
@@ -1806,6 +1808,9 @@ public:
|
||||
void plug_view_update_generator(db::view::view_update_generator& generator) noexcept;
|
||||
void unplug_view_update_generator() noexcept;
|
||||
|
||||
void plug_snapshot_ctl(db::snapshot_ctl& snapshot_ctl) noexcept;
|
||||
void unplug_snapshot_ctl() noexcept;
|
||||
|
||||
private:
|
||||
future<> flush_non_system_column_families();
|
||||
future<> flush_system_column_families();
|
||||
@@ -2106,6 +2111,10 @@ public:
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
|
||||
|
||||
db::snapshot_ctl* get_snapshot_ctl_ptr() {
|
||||
return _snapshot_ctl;
|
||||
}
|
||||
|
||||
public:
|
||||
bool update_column_family(schema_ptr s);
|
||||
private:
|
||||
|
||||
@@ -4355,7 +4355,7 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
}
|
||||
}
|
||||
}
|
||||
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, std::move(opts), s,
|
||||
co_await write_manifest(topology, *writer, std::move(sstable_sets), std::move(tablets), name, opts, s,
|
||||
tablet_count, tablet_layout).handle_exception([&] (std::exception_ptr ptr) {
|
||||
tlogger.error("Failed to seal snapshot in {}: {}.", name, ptr);
|
||||
ex = std::move(ptr);
|
||||
@@ -4365,6 +4365,15 @@ future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
}
|
||||
|
||||
co_await writer->sync();
|
||||
|
||||
if (opts.expires_at) {
|
||||
tlogger.info("snapshot {}: scheduled to expire at {}", name, opts.expires_at.value());
|
||||
co_await sharded_db.invoke_on(0, [when = *opts.expires_at, ks_name = s->ks_name(), table_name = s->cf_name(), name] (database& db) {
|
||||
if (auto snap_ctl_ptr = db.get_snapshot_ctl_ptr()) {
|
||||
snap_ctl_ptr->schedule_expiration(when, ks_name, table_name, name);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/bitops.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/util/file.hh>
|
||||
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
@@ -549,7 +550,7 @@ static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, co
|
||||
}
|
||||
|
||||
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
|
||||
static future<> validate_manifest(const locator::topology& topology, const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir, gc_clock::time_point min_time, bool tablets_enabled) {
|
||||
static future<> validate_manifest(const locator::topology& topology, const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir, gc_clock::time_point min_time, bool tablets_enabled, std::optional<int> ttl = std::nullopt) {
|
||||
sstring suffix = "-TOC.txt";
|
||||
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
|
||||
std::set<sstring> sstables_in_manifest;
|
||||
@@ -606,7 +607,15 @@ static future<> validate_manifest(const locator::topology& topology, const fs::p
|
||||
BOOST_REQUIRE(created_at_seconds > 0);
|
||||
auto& expires_at = manifest_snapshot["expires_at"];
|
||||
BOOST_REQUIRE(expires_at.IsNumber());
|
||||
BOOST_REQUIRE_GE(expires_at.GetInt64(), created_at_seconds);
|
||||
auto expires_at_seconds = expires_at.GetInt64();
|
||||
if (ttl) {
|
||||
BOOST_REQUIRE_GE(expires_at_seconds, created_at_seconds + *ttl);
|
||||
BOOST_REQUIRE_LE(expires_at_seconds, created_at_seconds + *ttl + 1);
|
||||
} else {
|
||||
BOOST_REQUIRE_GE(expires_at.GetInt64(), created_at_seconds);
|
||||
}
|
||||
} else if (ttl) {
|
||||
BOOST_FAIL("manifest should have expires_at when ttl is set");
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(manifest_json.HasMember("table"));
|
||||
@@ -757,6 +766,76 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_auto_snapshot_ttl) {
|
||||
bool tablets_enabled = true;
|
||||
bool create_mvs = false;
|
||||
int ttl = 1;
|
||||
#ifdef SCYLLA_BUILD_MODE_DEBUG
|
||||
ttl = 3;
|
||||
#endif
|
||||
|
||||
auto db_cfg_ptr = make_shared<db::config>();
|
||||
db_cfg_ptr->tablets_mode_for_new_keyspaces(tablets_enabled ? db::tablets_mode_t::mode::enabled : db::tablets_mode_t::mode::disabled);
|
||||
db_cfg_ptr->auto_snapshot(true);
|
||||
db_cfg_ptr->auto_snapshot_ttl(ttl);
|
||||
std::string ks_name = "ks";
|
||||
std::string table_name = "test";
|
||||
size_t num_keys = 100;
|
||||
do_with_some_data_in_thread({table_name}, [&] (cql_test_env& e) {
|
||||
sharded<db::snapshot_ctl> sc;
|
||||
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
||||
auto stop_sc = deferred_stop(sc);
|
||||
e.local_db().plug_snapshot_ctl(sc.local());
|
||||
auto unplug_sc = deferred_action([&] () noexcept {
|
||||
e.local_db().unplug_snapshot_ctl();
|
||||
});
|
||||
|
||||
auto min_time = gc_clock::now();
|
||||
take_snapshot(e, ks_name, table_name).get();
|
||||
|
||||
auto& cf = e.local_db().find_column_family(ks_name, table_name);
|
||||
auto table_directory = table_dir(cf);
|
||||
|
||||
auto in_table_dir = collect_files(table_directory).get();
|
||||
// snapshot triggered a flush and wrote the data down.
|
||||
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
|
||||
|
||||
testlog.debug("Dropping table {}.{}", ks_name, table_name);
|
||||
replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true).get();
|
||||
|
||||
fs::path snapshot_dir;
|
||||
auto snapshot_base_dir = table_directory / sstables::snapshots_dir;
|
||||
directory_lister lister(snapshot_base_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
|
||||
while (auto de = lister.get().get()) {
|
||||
if (de->name.starts_with("pre-drop")) {
|
||||
BOOST_REQUIRE(snapshot_dir.empty()); // only one pre-drop snapshot should be present
|
||||
testlog.debug("Found auto-snapshot directory: {}", snapshot_base_dir / de->name);
|
||||
snapshot_dir = snapshot_base_dir / de->name;
|
||||
}
|
||||
}
|
||||
|
||||
auto in_snapshot_dir = collect_files(snapshot_dir).get();
|
||||
|
||||
in_table_dir.insert("manifest.json");
|
||||
in_table_dir.insert("schema.cql");
|
||||
// all files were copied and manifest was generated
|
||||
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
|
||||
|
||||
const auto& topology = e.local_db().get_token_metadata().get_topology();
|
||||
validate_manifest(topology, snapshot_dir, in_snapshot_dir, min_time, tablets_enabled, ttl).get();
|
||||
|
||||
// Wait for snapshot garbage collection after expiry, polling for directory removal
|
||||
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds((ttl + 10) * 1s);
|
||||
sleep(ttl * 1s).get();
|
||||
while (file_exists(snapshot_dir.native()).get()) {
|
||||
if (std::chrono::steady_clock::now() > deadline) {
|
||||
BOOST_FAIL(fmt::format("Snapshot directory {} still exists after waiting for TTL expiry", snapshot_dir));
|
||||
}
|
||||
sleep(100ms).get();
|
||||
}
|
||||
}, create_mvs, db_cfg_ptr, num_keys).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_list_okay) {
|
||||
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
|
||||
@@ -21,6 +21,9 @@ import subprocess
|
||||
import shutil
|
||||
import pytest
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# For a "cql" object connected to one node, find the REST API URL
|
||||
# with the same node and port 10000.
|
||||
@@ -54,14 +57,17 @@ def nodetool_cmd():
|
||||
if nodetool_cmd.cmd:
|
||||
return nodetool_cmd.cmd
|
||||
if not nodetool_cmd.failed:
|
||||
nodetool_cmd.conf = os.getenv('NODETOOL') or 'nodetool'
|
||||
nodetool_cmd.cmd = shutil.which(nodetool_cmd.conf)
|
||||
nodetool_cmd.conf = os.getenv('NODETOOL').split() or ['nodetool']
|
||||
nodetool_cmd.cmd = shutil.which(nodetool_cmd.conf[0])
|
||||
if nodetool_cmd.cmd is None:
|
||||
nodetool_cmd.failed = True
|
||||
elif len(nodetool_cmd.conf) > 1:
|
||||
nodetool_cmd.args = nodetool_cmd.conf[1:]
|
||||
if nodetool_cmd.failed:
|
||||
pytest.fail(f"Error: Can't find {nodetool_cmd.conf}. Please set the NODETOOL environment variable to the path of the nodetool utility.", pytrace=False)
|
||||
return nodetool_cmd.cmd
|
||||
nodetool_cmd.cmd = None
|
||||
nodetool_cmd.args = []
|
||||
nodetool_cmd.failed = False
|
||||
nodetool_cmd.conf = False
|
||||
|
||||
@@ -71,7 +77,10 @@ def run_nodetool(cql, *args, **subprocess_kwargs):
|
||||
# TODO: We may need to change this function or its callers to add proper
|
||||
# support for testing on multi-node clusters.
|
||||
host = cql.cluster.contact_points[0]
|
||||
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
|
||||
cmd = [nodetool_cmd()]
|
||||
cmd.extend(nodetool_cmd.args)
|
||||
cmd.extend(['-h', host, *args])
|
||||
return subprocess.run(cmd, **subprocess_kwargs)
|
||||
|
||||
def flush(cql, table):
|
||||
ks, cf = table.split('.')
|
||||
@@ -115,14 +124,19 @@ def compact_keyspace(cql, ks, flush_memtables=True):
|
||||
args.extend([ks])
|
||||
run_nodetool(cql, "compact", *args)
|
||||
|
||||
def take_snapshot(cql, table, tag, skip_flush):
|
||||
def take_snapshot(cql, table, tag, skip_flush = None, ttl = None):
|
||||
ks, cf = table.split('.')
|
||||
if has_rest_api(cql):
|
||||
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
|
||||
params = {'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush}
|
||||
if ttl is not None:
|
||||
params['ttl'] = str(ttl)
|
||||
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params=params)
|
||||
else:
|
||||
args = ['--tag', tag, '--table', cf]
|
||||
if skip_flush:
|
||||
args.append('--skip-flush')
|
||||
if ttl is not None:
|
||||
args.extend(['--ttl', str(ttl)])
|
||||
args.append(ks)
|
||||
run_nodetool(cql, "snapshot", *args)
|
||||
|
||||
@@ -138,6 +152,54 @@ def del_snapshot(cql, tag:str, keyspaces:list[str] = []):
|
||||
args.extend(keyspaces)
|
||||
run_nodetool(cql, "clearsnapshot", *args)
|
||||
|
||||
def list_snapshots(cql):
|
||||
if has_rest_api(cql):
|
||||
api_res = requests.get(f'{rest_api_url(cql)}/storage_service/snapshots/')
|
||||
api_res.raise_for_status()
|
||||
return api_res.json()
|
||||
else:
|
||||
nodetool_res = run_nodetool(cql, "listsnapshots", capture_output=True, text=True)
|
||||
if nodetool_res.returncode != 0:
|
||||
raise RuntimeError(f"nodetool listsnapshots failed: {nodetool_res.stderr}")
|
||||
|
||||
def with_units(value, units) -> int:
|
||||
mult = 1
|
||||
if units:
|
||||
if units.upper()[0] == 'B':
|
||||
mult = 1
|
||||
elif units.upper()[0] == 'K':
|
||||
mult = 1024
|
||||
elif units.upper()[0] == 'M':
|
||||
mult = 1024*1024
|
||||
elif units.upper()[0] == 'G':
|
||||
mult = 1024*1024*1024
|
||||
elif units.upper()[0] == 'T':
|
||||
mult = 1024*1024*1024*1024
|
||||
else:
|
||||
raise RuntimeError(f"nodetool listsnapshots has unrecognized {units=} for {value=}")
|
||||
return int(float(value) * mult)
|
||||
|
||||
def ts_fromisoformat(value):
|
||||
return datetime.fromisoformat(value).timestamp()
|
||||
|
||||
res = defaultdict(list)
|
||||
for line in nodetool_res.stdout.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped == "There are no snapshots":
|
||||
continue
|
||||
parts = stripped.split()
|
||||
if parts[0] == "Snapshot" or parts[0] == "Total":
|
||||
continue
|
||||
res[parts[0]].append({
|
||||
'ks': parts[1],
|
||||
'cf': parts[2],
|
||||
'live': with_units(parts[3], parts[4]) if len(parts) > 4 else 0,
|
||||
'total': with_units(parts[5], parts[6]) if len(parts) > 6 else 0,
|
||||
'created_at': ts_fromisoformat(parts[7]) if len(parts) > 7 else None,
|
||||
'expires_at': ts_fromisoformat(parts[8]) if len(parts) > 8 else None,
|
||||
})
|
||||
return [{'key': k, 'value': v} for k, v in res.items()]
|
||||
|
||||
def refreshsizeestimates(cql):
|
||||
if has_rest_api(cql):
|
||||
# The "nodetool refreshsizeestimates" is not available, or needed, in Scylla
|
||||
|
||||
45
test/cqlpy/test_snapshot.py
Normal file
45
test/cqlpy/test_snapshot.py
Normal file
@@ -0,0 +1,45 @@
|
||||
# Copyright 2021-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
|
||||
import time
|
||||
|
||||
from . import nodetool
|
||||
from .util import new_test_table
|
||||
|
||||
|
||||
|
||||
def has_snapshot(snapshots, tag):
|
||||
for s in snapshots:
|
||||
if s['key'] == tag:
|
||||
return s['value']
|
||||
return None
|
||||
|
||||
def test_snapshot_ttl(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'k int PRIMARY KEY') as table:
|
||||
write = cql.prepare(f"INSERT INTO {table} (k) VALUES (?)")
|
||||
for i in range(10):
|
||||
cql.execute(write, [i])
|
||||
nodetool.flush(cql, table)
|
||||
|
||||
tag = "test_snapshot"
|
||||
ttl = 60
|
||||
|
||||
# Take a snapshot of the table
|
||||
created_at = time.time()
|
||||
nodetool.take_snapshot(cql, table, tag, ttl=f"{ttl}s")
|
||||
expires_at = time.time() + ttl + 1
|
||||
|
||||
# Verify that the snapshot exists
|
||||
snapshots = nodetool.list_snapshots(cql)
|
||||
print(f"Snapshots for {table}: {snapshots}")
|
||||
infos = has_snapshot(snapshots, tag)
|
||||
assert len(infos) == 1, f"Expected to find {tag} in {snapshots=}, found {len(infos)} infos"
|
||||
# Currently, Cassandra returns the snapshot creation and expiration times in nodetool listsnapshots.
|
||||
# Scylla does not support that yet
|
||||
info = infos[0]
|
||||
if info.get("created_at"):
|
||||
created_at = float(info["created_at"])
|
||||
assert info.get("expires_at")
|
||||
expires_at = float(info["expires_at"])
|
||||
assert expires_at - created_at == ttl
|
||||
@@ -3,42 +3,79 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
from . import nodetool
|
||||
from . import util
|
||||
import json
|
||||
import glob
|
||||
import os
|
||||
import time
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from test.pylib.skip_types import skip_env
|
||||
|
||||
def verify_snapshots(cql, expected_snapshots: dict[str, set[str]]):
|
||||
def verify_snapshots(cql, expected_snapshots: dict[str, set[str]], scylla_data_dir, expiry: int = None):
|
||||
results = list(cql.execute(f"SELECT keyspace_name, table_name, snapshot_name, live, total FROM system.snapshots"))
|
||||
for res in results:
|
||||
if res.snapshot_name in expected_snapshots:
|
||||
t = f"{res.keyspace_name}.{res.table_name}"
|
||||
assert t in expected_snapshots[res.snapshot_name], f"Unexpected snapshot {t}: snapshot_name={res.snapshot_name}: expected_snapshots={expected_snapshots}"
|
||||
expected_snapshots[res.snapshot_name].remove(t)
|
||||
for _, expected_tables in expected_snapshots.items():
|
||||
assert not expected_tables, f"Not all expected snapshots were listed: expected_snapshots={expected_snapshots}"
|
||||
verify_snapshot_dir(scylla_data_dir, res.keyspace_name, res.table_name, res.snapshot_name)
|
||||
|
||||
def test_snapshots_table(scylla_only, cql, test_keyspace):
|
||||
now = time.time()
|
||||
if not expiry or int(now) < int(expiry):
|
||||
for _, expected_tables in expected_snapshots.items():
|
||||
assert not expected_tables, f"Not all expected snapshots were listed: expected_snapshots={expected_snapshots} {now=} {expiry=}"
|
||||
|
||||
def snapshot_dir_exists(scylla_data_dir, keyspace_name, table_name, snapshot_name) -> Tuple[str, bool]:
|
||||
path = os.path.join(scylla_data_dir, keyspace_name, f"{table_name}-*")
|
||||
table_dir = glob.glob(path)
|
||||
assert len(table_dir) == 1, f"Expected single table directory for '{path}', got {table_dir}"
|
||||
snapshot_dir = os.path.join(table_dir[0], "snapshots", snapshot_name)
|
||||
return snapshot_dir, os.path.exists(snapshot_dir)
|
||||
|
||||
def verify_snapshot_dir(scylla_data_dir, keyspace_name, table_name, snapshot_name, expected: bool = True):
|
||||
snapshot_dir, exists = snapshot_dir_exists(scylla_data_dir, keyspace_name, table_name, snapshot_name)
|
||||
if expected:
|
||||
assert exists, f"Snapshots directory '{snapshot_dir}' does not exist"
|
||||
else:
|
||||
assert not exists, f"Snapshots directory '{snapshot_dir}' still exists"
|
||||
|
||||
def test_snapshots_table(scylla_only, cql, test_keyspace, scylla_data_dir):
|
||||
test_tag = util.unique_name()
|
||||
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
|
||||
cql.execute(f"INSERT INTO {table} (pk, v) VALUES (0, 0)")
|
||||
nodetool.take_snapshot(cql, table, test_tag, False)
|
||||
verify_snapshots(cql, {test_tag: [table]})
|
||||
nodetool.del_snapshot(cql, test_tag)
|
||||
verify_snapshots(cql, {test_tag: {table}}, scylla_data_dir)
|
||||
nodetool.del_snapshot(cql, test_tag)
|
||||
keyspace_name, table_name = table.split('.')
|
||||
verify_snapshot_dir(scylla_data_dir, keyspace_name, table_name, test_tag, False)
|
||||
|
||||
def test_snapshots_dropped_table(scylla_only, cql, test_keyspace):
|
||||
@pytest.mark.parametrize("ttl", [0, 5])
|
||||
def test_snapshots_dropped_table(scylla_only, cql, test_keyspace, scylla_data_dir, ttl):
|
||||
cql.execute(f"UPDATE system.config SET value = '{ttl}' WHERE name = 'auto_snapshot_ttl'")
|
||||
test_tag = util.unique_name()
|
||||
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
|
||||
cql.execute(f"INSERT INTO {table} (pk, v) VALUES (0, 0)")
|
||||
nodetool.take_snapshot(cql, table, test_tag, False)
|
||||
verify_snapshots(cql, {test_tag: [table]})
|
||||
nodetool.del_snapshot(cql, test_tag)
|
||||
expiry = int(time.time() + ttl) if ttl else None
|
||||
nodetool.take_snapshot(cql, table, test_tag, False, ttl)
|
||||
verify_snapshots(cql, {test_tag: {table}}, scylla_data_dir, expiry)
|
||||
keyspace_name, table_name = table.split('.')
|
||||
if ttl:
|
||||
deadline = expiry + 10
|
||||
time.sleep(ttl)
|
||||
while snapshot_dir_exists(scylla_data_dir, keyspace_name, table_name, test_tag)[1] and time.time() < deadline:
|
||||
time.sleep(1)
|
||||
else:
|
||||
# For ttl == 0, explicitly delete the snapshot and verify removal.
|
||||
nodetool.del_snapshot(cql, test_tag)
|
||||
verify_snapshot_dir(scylla_data_dir, keyspace_name, table_name, test_tag, False)
|
||||
|
||||
def test_snapshots_multiple_keyspaces(scylla_only, cql):
|
||||
def test_snapshots_multiple_keyspaces(scylla_only, cql, scylla_data_dir):
|
||||
expected_snapshots = defaultdict(set)
|
||||
ks_opts = "WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
|
||||
test_tags = [util.unique_name(), util.unique_name(), util.unique_name()]
|
||||
@@ -59,7 +96,7 @@ def test_snapshots_multiple_keyspaces(scylla_only, cql):
|
||||
nodetool.take_snapshot(cql, table2, test_tags[2], False)
|
||||
expected_snapshots[test_tags[2]].add(table2)
|
||||
|
||||
verify_snapshots(cql, expected_snapshots)
|
||||
verify_snapshots(cql, expected_snapshots, scylla_data_dir)
|
||||
for t in test_tags:
|
||||
nodetool.del_snapshot(cql, t)
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
|
||||
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
|
||||
|
||||
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush):
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush, ttl):
|
||||
"""Check that the output of nodetool snapshot contains the expected messages"""
|
||||
|
||||
if len(ktlist) == 0:
|
||||
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush):
|
||||
pattern = re.compile("Requested creating snapshot\\(s\\)"
|
||||
f" for \\[{keyspaces}\\]"
|
||||
f" with snapshot name \\[(.+)\\]"
|
||||
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
|
||||
f" and options \\{{skipFlush={str(skip_flush).lower()}, ttl={ttl}\\}}")
|
||||
|
||||
print(res)
|
||||
print(pattern)
|
||||
@@ -138,15 +138,15 @@ def test_snapshot_keyspace(nodetool):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False, "0")
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False, "0")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ("-cf", "--column-family", "--table"))
|
||||
@@ -155,15 +155,15 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1", "cf": "tbl"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False, "0")
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False, "0")
|
||||
|
||||
|
||||
class kn_param(NamedTuple):
|
||||
@@ -186,14 +186,14 @@ class kn_param(NamedTuple):
|
||||
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
|
||||
tag = "my_snapshot"
|
||||
|
||||
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
|
||||
req_params = {"tag": tag, "sf": "false", "ttl": "0", "kn": param.kn}
|
||||
if param.cf:
|
||||
req_params["cf"] = param.cf
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, *param.args, expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots", params=req_params)
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, param.snapshot_keyspaces, False)
|
||||
check_snapshot_out(res.stdout, tag, param.snapshot_keyspaces, False, "0")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ("-kt", "--kt-list", "-kc", "--kc.list"))
|
||||
@@ -202,21 +202,21 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1", "cf": "tbl1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False, "0")
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False, "0")
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "ttl": "0", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
|
||||
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False, "0")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tag", [None, "my_snapshot_tag"])
|
||||
@@ -228,52 +228,58 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
{"ks": ["ks1"], "tbl": ["tbl1", "tbl2"]},
|
||||
{"ks": ["ks1", "ks2"], "tbl": []},
|
||||
])
|
||||
@pytest.mark.parametrize("skip_flush", [False, True])
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
|
||||
cmd = ["snapshot"]
|
||||
params = {}
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist):
|
||||
for skip_flush in [False, True]:
|
||||
for ttl in ["1", "1s", "1S", "1m", "1M", "1h", "1H", "1d", "1D"]:
|
||||
cmd = ["snapshot"]
|
||||
params = {}
|
||||
|
||||
if tag is None:
|
||||
tag = int(time.time() * 1000)
|
||||
params["tag"] = approximate_value(value=tag, delta=99999).to_json()
|
||||
else:
|
||||
cmd += ["--tag", tag]
|
||||
params["tag"] = tag
|
||||
if tag is None:
|
||||
cur_tag = int(time.time() * 1000)
|
||||
params["tag"] = approximate_value(value=cur_tag, delta=99999).to_json()
|
||||
else:
|
||||
cur_tag = tag
|
||||
cmd += ["--tag", cur_tag]
|
||||
params["tag"] = cur_tag
|
||||
|
||||
if skip_flush:
|
||||
cmd.append("--skip-flush")
|
||||
if skip_flush:
|
||||
cmd.append("--skip-flush")
|
||||
|
||||
params["sf"] = str(skip_flush).lower()
|
||||
if ttl:
|
||||
cmd += ["--ttl", ttl]
|
||||
|
||||
if ktlist:
|
||||
if "tbl" in ktlist:
|
||||
if len(ktlist["tbl"]) > 0:
|
||||
keyspaces = ktlist["ks"]
|
||||
cmd += ["--table", ",".join(ktlist["tbl"])]
|
||||
cmd += keyspaces
|
||||
params["kn"] = keyspaces[0]
|
||||
params["cf"] = ",".join(ktlist["tbl"])
|
||||
params["sf"] = str(skip_flush).lower()
|
||||
params["ttl"] = ttl
|
||||
|
||||
if ktlist:
|
||||
if "tbl" in ktlist:
|
||||
if len(ktlist["tbl"]) > 0:
|
||||
keyspaces = ktlist["ks"]
|
||||
cmd += ["--table", ",".join(ktlist["tbl"])]
|
||||
cmd += keyspaces
|
||||
params["kn"] = keyspaces[0]
|
||||
params["cf"] = ",".join(ktlist["tbl"])
|
||||
else:
|
||||
keyspaces = ktlist["ks"]
|
||||
cmd += keyspaces
|
||||
params["kn"] = ",".join(keyspaces)
|
||||
else:
|
||||
keyspaces = ktlist["ks"]
|
||||
cmd += keyspaces
|
||||
params["kn"] = ",".join(keyspaces)
|
||||
cmd += ["-kt", ",".join(keyspaces)]
|
||||
if len(keyspaces) == 1:
|
||||
ks, tbl = keyspaces[0].split(".")
|
||||
params["kn"] = ks
|
||||
params["cf"] = tbl
|
||||
elif len(keyspaces) > 1:
|
||||
params["kn"] = ",".join(keyspaces)
|
||||
else:
|
||||
keyspaces = ktlist["ks"]
|
||||
cmd += ["-kt", ",".join(keyspaces)]
|
||||
if len(keyspaces) == 1:
|
||||
ks, tbl = keyspaces[0].split(".")
|
||||
params["kn"] = ks
|
||||
params["cf"] = tbl
|
||||
elif len(keyspaces) > 1:
|
||||
params["kn"] = ",".join(keyspaces)
|
||||
else:
|
||||
keyspaces = []
|
||||
keyspaces = []
|
||||
|
||||
res = nodetool(*cmd, expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots", params=params)
|
||||
])
|
||||
res = nodetool(*cmd, expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots", params=params)
|
||||
])
|
||||
|
||||
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
|
||||
check_snapshot_out(res.stdout, cur_tag, keyspaces, skip_flush, ttl)
|
||||
|
||||
|
||||
def test_snapshot_multiple_keyspace_with_table(nodetool):
|
||||
|
||||
@@ -433,11 +433,13 @@ class ScyllaRESTAPIClient:
|
||||
]
|
||||
return await self.client.post_json(f"/storage_service/tablets/restore", host=node_ip, params=params, json=backup_location)
|
||||
|
||||
async def take_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None) -> None:
|
||||
async def take_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None, ttl: Optional[str] = None) -> None:
|
||||
"""Take keyspace snapshot"""
|
||||
params = { 'kn': ks, 'tag': tag }
|
||||
if tables:
|
||||
params['cf'] = ','.join(tables)
|
||||
if ttl is not None:
|
||||
params['ttl'] = ttl
|
||||
await self.client.post(f"/storage_service/snapshots", host=node_ip, params=params)
|
||||
|
||||
async def take_cluster_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None) -> None:
|
||||
|
||||
@@ -2390,16 +2390,23 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
|
||||
params["sf"] = "false";
|
||||
}
|
||||
|
||||
if (vm.contains("ttl")) {
|
||||
params["ttl"] = vm["ttl"].as<sstring>();
|
||||
} else {
|
||||
params["ttl"] = "0";
|
||||
}
|
||||
|
||||
client.post("/storage_service/snapshots", params);
|
||||
|
||||
if (kn_msg.empty()) {
|
||||
kn_msg = params["kn"];
|
||||
}
|
||||
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}, ttl={}}}\n",
|
||||
kn_msg,
|
||||
params["tag"],
|
||||
params["sf"]);
|
||||
params["sf"],
|
||||
params["ttl"]);
|
||||
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
|
||||
}
|
||||
|
||||
@@ -4830,6 +4837,7 @@ For more information, see: {}
|
||||
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
|
||||
typed_option<sstring>("tag,t", "The name of the snapshot"),
|
||||
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
|
||||
typed_option<sstring>("ttl", "The TTL for the snapshot, optionally followed by 's' for seconds (the default), 'm' for minutes, 'h' for hours, or 'd' for days. Missing TTL, or 0, means no TTL"),
|
||||
},
|
||||
{
|
||||
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),
|
||||
|
||||
Reference in New Issue
Block a user