treewide: accept list of sstables in "restore" API
before this change, we enumerate the sstables tracked by the system.sstables table, and restore them when serving requests to "storage_service/restore" API. this works fine with "storage_service/backup" API. but this "restore" API cannot be used as a drop-in replacement of the rclone based API currently used by scylla-manager. in order to fill the gap, in this change: * add the "prefix" parameter for specifying the shared prefix of sstables * add the "sstables" parameter for specifying the list of TOC components of sstables * remove the "snapshot" parameter, as we don't encode the prefix on scylla's end anymore. * make the "table" parameter mandatory. Fixes scylladb/scylladb#20461 Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
@@ -847,13 +847,25 @@
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"snapshot",
|
||||
"description":"Name of a snapshot to copy SSTables from",
|
||||
"name":"prefix",
|
||||
"description":"The prefix of the object keys for the backuped SSTables",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"in": "body",
|
||||
"name": "sstables",
|
||||
"description": "The list of the object keys of the TOC component of the SSTables to be restored",
|
||||
"required":true,
|
||||
"schema" :{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Name of a keyspace to copy SSTables to",
|
||||
@@ -865,7 +877,7 @@
|
||||
{
|
||||
"name":"table",
|
||||
"description":"Name of a table to copy SSTables to",
|
||||
"required":false,
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
|
||||
@@ -54,6 +54,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "sstables_loader.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
@@ -496,13 +497,19 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
|
||||
auto keyspace = req->get_query_param("keyspace");
|
||||
auto table = req->get_query_param("table");
|
||||
auto bucket = req->get_query_param("bucket");
|
||||
auto snapshot_name = req->get_query_param("snapshot");
|
||||
if (table.empty()) {
|
||||
// TODO: If missing, should restore all tables
|
||||
throw httpd::bad_param_exception("The table name must be specified");
|
||||
}
|
||||
auto prefix = req->get_query_param("prefix");
|
||||
|
||||
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, endpoint, bucket, snapshot_name);
|
||||
// TODO: the http_server backing the API does not use content streaming
|
||||
// should use it for better performance
|
||||
rjson::value parsed = rjson::parse(req->content);
|
||||
if (!parsed.IsArray()) {
|
||||
throw httpd::bad_param_exception("mulformatted sstables in body");
|
||||
}
|
||||
std::vector<sstring> sstables;
|
||||
for (const rjson::value& element : parsed.GetArray()) {
|
||||
sstables.emplace_back(rjson::to_string_view(element));
|
||||
}
|
||||
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket);
|
||||
co_return json::json_return_type(fmt::to_string(task_id));
|
||||
});
|
||||
|
||||
|
||||
@@ -333,6 +333,41 @@ bool storage_options::can_update_to(const storage_options& new_options) {
|
||||
return value == new_options.value;
|
||||
}
|
||||
|
||||
storage_options storage_options::append_to_s3_prefix(const sstring& s) const {
|
||||
// when restoring from object storage, the API of /storage_service/restore
|
||||
// provides:
|
||||
// 1. a shared prefix
|
||||
// 2. a list of sstables, each of which has its own partial path
|
||||
//
|
||||
// for example, assuming we have following API call:
|
||||
// - shared prefix: /bucket/ks/cf
|
||||
// - sstables
|
||||
// - 3123/me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt
|
||||
// - 3123/me-3gdq_0bki_2edkg2vx4xtksugjj5-big-TOC.txt
|
||||
// - 3245/me-3gdq_0bki_2cvk02wubgncy8qd41-big-TOC.txt
|
||||
//
|
||||
// note, this example shows three sstables from two different snapshot backups.
|
||||
//
|
||||
// we assume all sstables' locations share the same base prefix (storage_options::s3::prefix).
|
||||
// however, sstable in different backups have different prefixes. to handle this, we compose
|
||||
// a per-sstable prefix by concatenating the shared prefix and the "parent directory" of the
|
||||
// sstable's location. the resulting structure looks like:
|
||||
//
|
||||
// sstables:
|
||||
// - prefix: /bucket/ks/cf/3123
|
||||
// desc: me-3gdq_0bki_2cvk01yl83nj0tp5gh-big
|
||||
// - prefix: /bucket/ks/cf/3123
|
||||
// desc: me-3gdq_0bki_2edkg2vx4xtksugjj5-big
|
||||
// - prefix: /bucket/ks/cf/3145
|
||||
// desc: me-3gdq_0bki_2cvk02wubgncy8qd41-big
|
||||
SCYLLA_ASSERT(!is_local_type());
|
||||
storage_options ret = *this;
|
||||
s3 s3_options = std::get<s3>(value);
|
||||
s3_options.prefix += s;
|
||||
ret.value = std::move(s3_options);
|
||||
return ret;
|
||||
}
|
||||
|
||||
no_such_keyspace::no_such_keyspace(std::string_view ks_name)
|
||||
: runtime_error{fmt::format("Can't find a keyspace {}", ks_name)}
|
||||
{
|
||||
|
||||
@@ -47,6 +47,8 @@ struct storage_options {
|
||||
bool can_update_to(const storage_options& new_options);
|
||||
|
||||
static value_type from_map(std::string_view type, std::map<sstring, sstring> values);
|
||||
|
||||
storage_options append_to_s3_prefix(const sstring& s) const;
|
||||
};
|
||||
|
||||
inline storage_options make_local_options(std::filesystem::path dir) {
|
||||
|
||||
@@ -234,14 +234,16 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
|
||||
}
|
||||
|
||||
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) {
|
||||
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix] (auto& global_table, auto& directory) {
|
||||
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) {
|
||||
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables)] (auto& global_table, auto& directory) {
|
||||
return directory.start(global_table.as_sharded_parameter(),
|
||||
sharded_parameter([bucket, endpoint, prefix] {
|
||||
data_dictionary::storage_options opts;
|
||||
opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix};
|
||||
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
|
||||
}), &error_handler_gen_for_upload_dir);
|
||||
}),
|
||||
sstables,
|
||||
&error_handler_gen_for_upload_dir);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ public:
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
|
||||
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
|
||||
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg);
|
||||
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg);
|
||||
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
|
||||
};
|
||||
|
||||
|
||||
@@ -61,6 +61,12 @@ sstable_directory::sstables_registry_components_lister::sstables_registry_compon
|
||||
{
|
||||
}
|
||||
|
||||
sstable_directory::restore_components_lister::restore_components_lister(const data_dictionary::storage_options::value_type& options,
|
||||
std::vector<sstring> toc_filenames)
|
||||
: _toc_filenames(std::move(toc_filenames))
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_directory::components_lister>
|
||||
sstable_directory::make_components_lister() {
|
||||
return std::visit(overloaded_functor {
|
||||
@@ -110,6 +116,23 @@ sstable_directory::sstable_directory(replica::table& table,
|
||||
)
|
||||
{}
|
||||
|
||||
sstable_directory::sstable_directory(replica::table& table,
|
||||
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
||||
std::vector<sstring> sstables,
|
||||
io_error_handler_gen error_handler_gen)
|
||||
: _manager(table.get_sstables_manager())
|
||||
, _schema(table.schema())
|
||||
, _storage_opts(std::move(storage_opts))
|
||||
, _state(sstable_state::upload)
|
||||
, _error_handler_gen(error_handler_gen)
|
||||
, _storage(make_storage(_manager, *_storage_opts, _state))
|
||||
, _lister(std::make_unique<sstable_directory::restore_components_lister>(_storage_opts->value,
|
||||
std::move(sstables)))
|
||||
, _sharder_ptr(std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this()))
|
||||
, _sharder(*_sharder_ptr)
|
||||
, _unshared_remote_sstables(smp::count)
|
||||
{}
|
||||
|
||||
sstable_directory::sstable_directory(sstables_manager& manager,
|
||||
schema_ptr schema,
|
||||
const dht::sharder& sharder,
|
||||
@@ -293,6 +316,10 @@ future<> sstable_directory::sstables_registry_components_lister::prepare(sstable
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable_directory::restore_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) {
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> sstable_directory::process_sstable_dir(process_flags flags) {
|
||||
return _lister->process(*this, flags);
|
||||
}
|
||||
@@ -406,6 +433,19 @@ future<> sstable_directory::sstables_registry_components_lister::process(sstable
|
||||
});
|
||||
}
|
||||
|
||||
future<> sstable_directory::restore_components_lister::process(sstable_directory& directory, process_flags flags) {
|
||||
co_await coroutine::parallel_for_each(_toc_filenames, [flags, &directory] (sstring toc_filename) -> future<> {
|
||||
std::filesystem::path sst_path{toc_filename};
|
||||
entry_descriptor desc = sstables::parse_path(sst_path, "", "");
|
||||
if (!sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) {
|
||||
co_return;
|
||||
}
|
||||
dirlog.debug("Processing {} entry from {}", desc.generation, toc_filename);
|
||||
co_await directory.process_descriptor(
|
||||
std::move(desc), flags,
|
||||
[&directory, prefix=sst_path.parent_path().native()] {
|
||||
return directory._storage_opts->append_to_s3_prefix(prefix);;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -425,6 +465,10 @@ future<> sstable_directory::sstables_registry_components_lister::commit() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> sstable_directory::restore_components_lister::commit() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> sstable_directory::sstables_registry_components_lister::garbage_collect(storage& st) {
|
||||
std::set<generation_type> gens_to_remove;
|
||||
co_await _sstables_registry.sstables_registry_list(_location, coroutine::lambda([&st, &gens_to_remove] (sstring status, sstable_state state, entry_descriptor desc) -> future<> {
|
||||
|
||||
@@ -143,6 +143,15 @@ public:
|
||||
virtual future<> prepare(sstable_directory&, process_flags, storage&) override;
|
||||
};
|
||||
|
||||
class restore_components_lister final : public components_lister {
|
||||
std::vector<sstring> _toc_filenames;
|
||||
public:
|
||||
restore_components_lister(const data_dictionary::storage_options::value_type& options, std::vector<sstring> toc_filenames);
|
||||
virtual future<> process(sstable_directory& directory, process_flags flags) override;
|
||||
virtual future<> commit() override;
|
||||
virtual future<> prepare(sstable_directory&, process_flags, storage&) override;
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
// prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation.
|
||||
@@ -206,6 +215,10 @@ public:
|
||||
sstable_directory(replica::table& table,
|
||||
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
||||
io_error_handler_gen error_handler_gen);
|
||||
sstable_directory(replica::table& table,
|
||||
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
|
||||
std::vector<sstring> sstables,
|
||||
io_error_handler_gen error_handler_gen);
|
||||
sstable_directory(sstables_manager& manager,
|
||||
schema_ptr schema,
|
||||
const dht::sharder& sharder,
|
||||
|
||||
@@ -454,6 +454,8 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
|
||||
sstring _bucket;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
sstring _prefix;
|
||||
std::vector<sstring> _sstables;
|
||||
sstring _snapshot_name;
|
||||
|
||||
protected:
|
||||
@@ -462,14 +464,15 @@ protected:
|
||||
public:
|
||||
download_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader,
|
||||
sstring endpoint, sstring bucket,
|
||||
sstring ks, sstring cf, sstring snapshot) noexcept
|
||||
sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables) noexcept
|
||||
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
|
||||
, _loader(loader)
|
||||
, _endpoint(std::move(endpoint))
|
||||
, _bucket(std::move(bucket))
|
||||
, _ks(std::move(ks))
|
||||
, _cf(std::move(cf))
|
||||
, _snapshot_name(std::move(snapshot))
|
||||
, _prefix(std::move(prefix))
|
||||
, _sstables(std::move(sstables))
|
||||
{}
|
||||
|
||||
virtual std::string type() const override {
|
||||
@@ -487,10 +490,9 @@ future<> sstables_loader::download_task_impl::run() {
|
||||
sstables::sstable_open_config cfg {
|
||||
.load_bloom_filter = false,
|
||||
};
|
||||
auto prefix = format("{}/{}", _cf, _snapshot_name);
|
||||
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, prefix);
|
||||
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _endpoint, _bucket, prefix, cfg);
|
||||
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, prefix);
|
||||
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
||||
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg);
|
||||
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
|
||||
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
|
||||
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false);
|
||||
});
|
||||
@@ -517,11 +519,12 @@ future<> sstables_loader::stop() {
|
||||
}
|
||||
|
||||
future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name,
|
||||
sstring endpoint, sstring bucket, sstring snapshot) {
|
||||
sstring prefix, std::vector<sstring> sstables,
|
||||
sstring endpoint, sstring bucket) {
|
||||
if (!_storage_manager.is_known_endpoint(endpoint)) {
|
||||
throw std::invalid_argument(format("endpoint {} not found", endpoint));
|
||||
}
|
||||
llog.info("Restore sstables from {}({}) to {}", endpoint, snapshot, ks_name);
|
||||
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(snapshot));
|
||||
llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name);
|
||||
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables));
|
||||
co_return task->id();
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
@@ -88,7 +89,8 @@ public:
|
||||
* Download new SSTables not currently tracked by the system from object store
|
||||
*/
|
||||
future<tasks::task_id> download_new_sstables(sstring ks_name, sstring cf_name,
|
||||
sstring endpoint, sstring bucket, sstring snapshot);
|
||||
sstring prefix, std::vector<sstring> sstables,
|
||||
sstring endpoint, sstring bucket);
|
||||
|
||||
class download_task_impl;
|
||||
};
|
||||
|
||||
@@ -8,26 +8,21 @@ import pytest
|
||||
|
||||
from test.nodetool.rest_api_mock import expected_request
|
||||
|
||||
@pytest.mark.parametrize("table",
|
||||
["cf",
|
||||
pytest.param("",
|
||||
marks=pytest.mark.xfail(
|
||||
reason="full keyspace restore not implemented yet"))])
|
||||
@pytest.mark.parametrize("nowait,task_state,task_error", [(False, "failed", "error"),
|
||||
(False, "done", ""),
|
||||
(True, "", "")])
|
||||
def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error):
|
||||
def test_restore(nodetool, scylla_only, nowait, task_state, task_error):
|
||||
endpoint = "s3.us-east-2.amazonaws.com"
|
||||
bucket = "bucket-foo"
|
||||
keyspace = "ks"
|
||||
table = "cf"
|
||||
prefix = "foo/bar"
|
||||
|
||||
snapshot = "ss"
|
||||
params = {"endpoint": endpoint,
|
||||
"bucket": bucket,
|
||||
"snapshot": snapshot,
|
||||
"keyspace": keyspace}
|
||||
if table:
|
||||
params["table"] = table
|
||||
"prefix": prefix,
|
||||
"keyspace": keyspace,
|
||||
"table": table}
|
||||
|
||||
task_id = "2c4a3e5f"
|
||||
start_time = "2024-08-08T14:29:25Z"
|
||||
@@ -48,25 +43,29 @@ def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error):
|
||||
"progress_completed": 1.0,
|
||||
"children_ids": []
|
||||
}
|
||||
# just generate filenames of TOC components of random sstables
|
||||
sstables = [f"me-{id}-big-TOC.txt" for id in range(12)]
|
||||
expected_requests = [
|
||||
expected_request(
|
||||
"POST",
|
||||
"/storage_service/restore",
|
||||
params,
|
||||
sstables,
|
||||
response=task_id)
|
||||
]
|
||||
args = ["restore",
|
||||
"--endpoint", endpoint,
|
||||
"--bucket", bucket,
|
||||
"--snapshot", snapshot,
|
||||
"--keyspace", keyspace]
|
||||
if table:
|
||||
args.extend(["--table", table])
|
||||
"--prefix", prefix,
|
||||
"--keyspace", keyspace,
|
||||
"--table", table]
|
||||
if nowait:
|
||||
args.append("--nowait")
|
||||
args.extend(sstables)
|
||||
res = nodetool(*args, expected_requests=expected_requests)
|
||||
assert task_id in res.stdout
|
||||
else:
|
||||
args.extend(sstables)
|
||||
# wait for the completion of backup task
|
||||
expected_requests.append(
|
||||
expected_request(
|
||||
|
||||
@@ -153,6 +153,8 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
orig_res = cql.execute(f"SELECT * FROM {ks}.{cf}")
|
||||
orig_rows = { x.name: x.value for x in orig_res }
|
||||
|
||||
toc_names = [entry.name for entry in list_sstables() if entry.name.endswith('TOC.txt')]
|
||||
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
@@ -166,7 +168,7 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
assert not res
|
||||
|
||||
print(f'Try to restore')
|
||||
tid = await manager.api.restore(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name)
|
||||
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
print(f'Check that sstables came back')
|
||||
|
||||
@@ -317,14 +317,14 @@ class ScyllaRESTAPIClient():
|
||||
"snapshot": tag}
|
||||
return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params)
|
||||
|
||||
async def restore(self, node_ip: str, ks: str, cf: str, tag: str, dest: str, bucket: str) -> str:
|
||||
async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str]) -> str:
|
||||
"""Restore keyspace:table from backup"""
|
||||
params = {"keyspace": ks,
|
||||
"table": cf,
|
||||
"endpoint": dest,
|
||||
"bucket": bucket,
|
||||
"snapshot": tag}
|
||||
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params)
|
||||
"prefix": prefix}
|
||||
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables)
|
||||
|
||||
async def take_snapshot(self, node_ip: str, ks: str, tag: str) -> None:
|
||||
"""Take keyspace snapshot"""
|
||||
|
||||
@@ -1507,16 +1507,24 @@ void repair_operation(scylla_rest_client& client, const bpo::variables_map& vm)
|
||||
|
||||
void restore_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
|
||||
std::unordered_map<sstring, sstring> params;
|
||||
for (auto required_param : {"endpoint", "bucket", "snapshot", "keyspace"}) {
|
||||
for (auto required_param : {"endpoint", "bucket", "prefix", "keyspace", "table"}) {
|
||||
if (!vm.count(required_param)) {
|
||||
throw std::invalid_argument(fmt::format("missing required parameter: {}", required_param));
|
||||
}
|
||||
params[required_param] = vm[required_param].as<sstring>();
|
||||
}
|
||||
if (vm.count("table")) {
|
||||
params["table"] = vm["table"].as<sstring>();
|
||||
}
|
||||
const auto restore_res = client.post("/storage_service/restore", std::move(params));
|
||||
sstring sstables_body = std::invoke([&vm] {
|
||||
std::stringstream output;
|
||||
rjson::streaming_writer writer(output);
|
||||
writer.StartArray();
|
||||
for (auto& toc_fn : vm["sstables"].as<std::vector<sstring>>()) {
|
||||
writer.String(toc_fn);
|
||||
}
|
||||
writer.EndArray();
|
||||
return make_sstring(output.view());
|
||||
});
|
||||
const auto restore_res = client.post("/storage_service/restore", std::move(params),
|
||||
request_body{"application/json", std::move(sstables_body)});
|
||||
const auto task_id = rjson::to_string_view(restore_res);
|
||||
if (vm.count("nowait")) {
|
||||
fmt::print(R"(The task id of this operation is {}
|
||||
@@ -3889,12 +3897,14 @@ For more information, see: {}"
|
||||
{
|
||||
typed_option<sstring>("endpoint", "ID of the configured object storage endpoint to copy SSTables from"),
|
||||
typed_option<sstring>("bucket", "Name of the bucket to copy SSTables from"),
|
||||
typed_option<sstring>("snapshot", "Name of a snapshot to copy sstables from"),
|
||||
typed_option<sstring>("prefix", "The shared prefix of the object keys for the backuped SSTables"),
|
||||
typed_option<sstring>("keyspace", "Name of a keyspace to copy SSTables to"),
|
||||
typed_option<sstring>("table", "Name of a table to copy SSTables to"),
|
||||
typed_option<>("nowait", "Don't wait on the restore process"),
|
||||
},
|
||||
|
||||
{
|
||||
typed_option<std::vector<sstring>>("sstables", "The object keys of the TOC component of the SSTables to be restored", -1),
|
||||
},
|
||||
},
|
||||
restore_operation
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user