treewide: accept list of sstables in "restore" API

before this change, we enumerate the sstables tracked by the
system.sstables table, and restore them when serving
requests to "storage_service/restore" API. this works fine with
"storage_service/backup" API. but this "restore" API cannot be
used as a drop-in replacement of the rclone based API currently
used by scylla-manager.

in order to fill the gap, in this change:

* add the "prefix" parameter for specifying the shared prefix of
  sstables
* add the "sstables" parameter for specifying the list of  TOC
  components of sstables
* remove the "snapshot" parameter, as we don't encode the prefix
  on scylla's end anymore.
* make the "table" parameter mandatory.

Fixes scylladb/scylladb#20461
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2024-10-01 23:18:25 +08:00
parent 17181c2eca
commit 787ea4b1d4
14 changed files with 180 additions and 49 deletions

View File

@@ -847,13 +847,25 @@
"paramType":"query"
},
{
"name":"snapshot",
"description":"Name of a snapshot to copy SSTables from",
"name":"prefix",
"description":"The prefix of the object keys for the backuped SSTables",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"in": "body",
"name": "sstables",
"description": "The list of the object keys of the TOC component of the SSTables to be restored",
"required":true,
"schema" :{
"type": "array",
"items": {
"type": "string"
}
}
},
{
"name":"keyspace",
"description":"Name of a keyspace to copy SSTables to",
@@ -865,7 +877,7 @@
{
"name":"table",
"description":"Name of a table to copy SSTables to",
"required":false,
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"

View File

@@ -54,6 +54,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "sstables_loader.hh"
#include "db/view/view_builder.hh"
#include "utils/rjson.hh"
#include "utils/user_provided_param.hh"
using namespace seastar::httpd;
@@ -496,13 +497,19 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto keyspace = req->get_query_param("keyspace");
auto table = req->get_query_param("table");
auto bucket = req->get_query_param("bucket");
auto snapshot_name = req->get_query_param("snapshot");
if (table.empty()) {
// TODO: If missing, should restore all tables
throw httpd::bad_param_exception("The table name must be specified");
}
auto prefix = req->get_query_param("prefix");
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, endpoint, bucket, snapshot_name);
// TODO: the http_server backing the API does not use content streaming
// should use it for better performance
rjson::value parsed = rjson::parse(req->content);
if (!parsed.IsArray()) {
throw httpd::bad_param_exception("mulformatted sstables in body");
}
std::vector<sstring> sstables;
for (const rjson::value& element : parsed.GetArray()) {
sstables.emplace_back(rjson::to_string_view(element));
}
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket);
co_return json::json_return_type(fmt::to_string(task_id));
});

View File

@@ -333,6 +333,41 @@ bool storage_options::can_update_to(const storage_options& new_options) {
return value == new_options.value;
}
storage_options storage_options::append_to_s3_prefix(const sstring& s) const {
// when restoring from object storage, the API of /storage_service/restore
// provides:
// 1. a shared prefix
// 2. a list of sstables, each of which has its own partial path
//
// for example, assuming we have following API call:
// - shared prefix: /bucket/ks/cf
// - sstables
// - 3123/me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt
// - 3123/me-3gdq_0bki_2edkg2vx4xtksugjj5-big-TOC.txt
// - 3245/me-3gdq_0bki_2cvk02wubgncy8qd41-big-TOC.txt
//
// note, this example shows three sstables from two different snapshot backups.
//
// we assume all sstables' locations share the same base prefix (storage_options::s3::prefix).
// however, sstable in different backups have different prefixes. to handle this, we compose
// a per-sstable prefix by concatenating the shared prefix and the "parent directory" of the
// sstable's location. the resulting structure looks like:
//
// sstables:
// - prefix: /bucket/ks/cf/3123
// desc: me-3gdq_0bki_2cvk01yl83nj0tp5gh-big
// - prefix: /bucket/ks/cf/3123
// desc: me-3gdq_0bki_2edkg2vx4xtksugjj5-big
// - prefix: /bucket/ks/cf/3145
// desc: me-3gdq_0bki_2cvk02wubgncy8qd41-big
SCYLLA_ASSERT(!is_local_type());
storage_options ret = *this;
s3 s3_options = std::get<s3>(value);
s3_options.prefix += s;
ret.value = std::move(s3_options);
return ret;
}
no_such_keyspace::no_such_keyspace(std::string_view ks_name)
: runtime_error{fmt::format("Can't find a keyspace {}", ks_name)}
{

View File

@@ -47,6 +47,8 @@ struct storage_options {
bool can_update_to(const storage_options& new_options);
static value_type from_map(std::string_view type, std::map<sstring, sstring> values);
storage_options append_to_s3_prefix(const sstring& s) const;
};
inline storage_options make_local_options(std::filesystem::path dir) {

View File

@@ -234,14 +234,16 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix] (auto& global_table, auto& directory) {
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables)] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, prefix] {
data_dictionary::storage_options opts;
opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix};
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}), &error_handler_gen_for_upload_dir);
}),
sstables,
&error_handler_gen_for_upload_dir);
});
}

View File

@@ -89,7 +89,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg);
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg);
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
};

View File

@@ -61,6 +61,12 @@ sstable_directory::sstables_registry_components_lister::sstables_registry_compon
{
}
sstable_directory::restore_components_lister::restore_components_lister(const data_dictionary::storage_options::value_type& options,
std::vector<sstring> toc_filenames)
: _toc_filenames(std::move(toc_filenames))
{
}
std::unique_ptr<sstable_directory::components_lister>
sstable_directory::make_components_lister() {
return std::visit(overloaded_functor {
@@ -110,6 +116,23 @@ sstable_directory::sstable_directory(replica::table& table,
)
{}
sstable_directory::sstable_directory(replica::table& table,
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
std::vector<sstring> sstables,
io_error_handler_gen error_handler_gen)
: _manager(table.get_sstables_manager())
, _schema(table.schema())
, _storage_opts(std::move(storage_opts))
, _state(sstable_state::upload)
, _error_handler_gen(error_handler_gen)
, _storage(make_storage(_manager, *_storage_opts, _state))
, _lister(std::make_unique<sstable_directory::restore_components_lister>(_storage_opts->value,
std::move(sstables)))
, _sharder_ptr(std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this()))
, _sharder(*_sharder_ptr)
, _unshared_remote_sstables(smp::count)
{}
sstable_directory::sstable_directory(sstables_manager& manager,
schema_ptr schema,
const dht::sharder& sharder,
@@ -293,6 +316,10 @@ future<> sstable_directory::sstables_registry_components_lister::prepare(sstable
}
}
future<> sstable_directory::restore_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) {
return make_ready_future();
}
future<> sstable_directory::process_sstable_dir(process_flags flags) {
return _lister->process(*this, flags);
}
@@ -406,6 +433,19 @@ future<> sstable_directory::sstables_registry_components_lister::process(sstable
});
}
future<> sstable_directory::restore_components_lister::process(sstable_directory& directory, process_flags flags) {
co_await coroutine::parallel_for_each(_toc_filenames, [flags, &directory] (sstring toc_filename) -> future<> {
std::filesystem::path sst_path{toc_filename};
entry_descriptor desc = sstables::parse_path(sst_path, "", "");
if (!sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) {
co_return;
}
dirlog.debug("Processing {} entry from {}", desc.generation, toc_filename);
co_await directory.process_descriptor(
std::move(desc), flags,
[&directory, prefix=sst_path.parent_path().native()] {
return directory._storage_opts->append_to_s3_prefix(prefix);;
});
});
}
@@ -425,6 +465,10 @@ future<> sstable_directory::sstables_registry_components_lister::commit() {
return make_ready_future<>();
}
future<> sstable_directory::restore_components_lister::commit() {
return make_ready_future<>();
}
future<> sstable_directory::sstables_registry_components_lister::garbage_collect(storage& st) {
std::set<generation_type> gens_to_remove;
co_await _sstables_registry.sstables_registry_list(_location, coroutine::lambda([&st, &gens_to_remove] (sstring status, sstable_state state, entry_descriptor desc) -> future<> {

View File

@@ -143,6 +143,15 @@ public:
virtual future<> prepare(sstable_directory&, process_flags, storage&) override;
};
class restore_components_lister final : public components_lister {
std::vector<sstring> _toc_filenames;
public:
restore_components_lister(const data_dictionary::storage_options::value_type& options, std::vector<sstring> toc_filenames);
virtual future<> process(sstable_directory& directory, process_flags flags) override;
virtual future<> commit() override;
virtual future<> prepare(sstable_directory&, process_flags, storage&) override;
};
private:
// prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation.
@@ -206,6 +215,10 @@ public:
sstable_directory(replica::table& table,
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
io_error_handler_gen error_handler_gen);
sstable_directory(replica::table& table,
lw_shared_ptr<const data_dictionary::storage_options> storage_opts,
std::vector<sstring> sstables,
io_error_handler_gen error_handler_gen);
sstable_directory(sstables_manager& manager,
schema_ptr schema,
const dht::sharder& sharder,

View File

@@ -454,6 +454,8 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
sstring _bucket;
sstring _ks;
sstring _cf;
sstring _prefix;
std::vector<sstring> _sstables;
sstring _snapshot_name;
protected:
@@ -462,14 +464,15 @@ protected:
public:
download_task_impl(tasks::task_manager::module_ptr module, sharded<sstables_loader>& loader,
sstring endpoint, sstring bucket,
sstring ks, sstring cf, sstring snapshot) noexcept
sstring ks, sstring cf, sstring prefix, std::vector<sstring> sstables) noexcept
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
, _loader(loader)
, _endpoint(std::move(endpoint))
, _bucket(std::move(bucket))
, _ks(std::move(ks))
, _cf(std::move(cf))
, _snapshot_name(std::move(snapshot))
, _prefix(std::move(prefix))
, _sstables(std::move(sstables))
{}
virtual std::string type() const override {
@@ -487,10 +490,9 @@ future<> sstables_loader::download_task_impl::run() {
sstables::sstable_open_config cfg {
.load_bloom_filter = false,
};
auto prefix = format("{}/{}", _cf, _snapshot_name);
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, prefix);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _endpoint, _bucket, prefix, cfg);
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, prefix);
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg);
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false);
});
@@ -517,11 +519,12 @@ future<> sstables_loader::stop() {
}
future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name,
sstring endpoint, sstring bucket, sstring snapshot) {
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket) {
if (!_storage_manager.is_known_endpoint(endpoint)) {
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
llog.info("Restore sstables from {}({}) to {}", endpoint, snapshot, ks_name);
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(snapshot));
llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name);
auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables));
co_return task->id();
}

View File

@@ -8,6 +8,7 @@
#pragma once
#include <vector>
#include <seastar/core/sharded.hh>
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
@@ -88,7 +89,8 @@ public:
* Download new SSTables not currently tracked by the system from object store
*/
future<tasks::task_id> download_new_sstables(sstring ks_name, sstring cf_name,
sstring endpoint, sstring bucket, sstring snapshot);
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket);
class download_task_impl;
};

View File

@@ -8,26 +8,21 @@ import pytest
from test.nodetool.rest_api_mock import expected_request
@pytest.mark.parametrize("table",
["cf",
pytest.param("",
marks=pytest.mark.xfail(
reason="full keyspace restore not implemented yet"))])
@pytest.mark.parametrize("nowait,task_state,task_error", [(False, "failed", "error"),
(False, "done", ""),
(True, "", "")])
def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error):
def test_restore(nodetool, scylla_only, nowait, task_state, task_error):
endpoint = "s3.us-east-2.amazonaws.com"
bucket = "bucket-foo"
keyspace = "ks"
table = "cf"
prefix = "foo/bar"
snapshot = "ss"
params = {"endpoint": endpoint,
"bucket": bucket,
"snapshot": snapshot,
"keyspace": keyspace}
if table:
params["table"] = table
"prefix": prefix,
"keyspace": keyspace,
"table": table}
task_id = "2c4a3e5f"
start_time = "2024-08-08T14:29:25Z"
@@ -48,25 +43,29 @@ def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error):
"progress_completed": 1.0,
"children_ids": []
}
# just generate filenames of TOC components of random sstables
sstables = [f"me-{id}-big-TOC.txt" for id in range(12)]
expected_requests = [
expected_request(
"POST",
"/storage_service/restore",
params,
sstables,
response=task_id)
]
args = ["restore",
"--endpoint", endpoint,
"--bucket", bucket,
"--snapshot", snapshot,
"--keyspace", keyspace]
if table:
args.extend(["--table", table])
"--prefix", prefix,
"--keyspace", keyspace,
"--table", table]
if nowait:
args.append("--nowait")
args.extend(sstables)
res = nodetool(*args, expected_requests=expected_requests)
assert task_id in res.stdout
else:
args.extend(sstables)
# wait for the completion of backup task
expected_requests.append(
expected_request(

View File

@@ -153,6 +153,8 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
orig_res = cql.execute(f"SELECT * FROM {ks}.{cf}")
orig_rows = { x.name: x.value for x in orig_res }
toc_names = [entry.name for entry in list_sstables() if entry.name.endswith('TOC.txt')]
prefix = f'{cf}/{snap_name}'
tid = await manager.api.backup(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
status = await manager.api.wait_task(server.ip_addr, tid)
@@ -166,7 +168,7 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
assert not res
print(f'Try to restore')
tid = await manager.api.restore(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name)
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
print(f'Check that sstables came back')

View File

@@ -317,14 +317,14 @@ class ScyllaRESTAPIClient():
"snapshot": tag}
return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params)
async def restore(self, node_ip: str, ks: str, cf: str, tag: str, dest: str, bucket: str) -> str:
async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str]) -> str:
"""Restore keyspace:table from backup"""
params = {"keyspace": ks,
"table": cf,
"endpoint": dest,
"bucket": bucket,
"snapshot": tag}
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params)
"prefix": prefix}
return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables)
async def take_snapshot(self, node_ip: str, ks: str, tag: str) -> None:
"""Take keyspace snapshot"""

View File

@@ -1507,16 +1507,24 @@ void repair_operation(scylla_rest_client& client, const bpo::variables_map& vm)
void restore_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
std::unordered_map<sstring, sstring> params;
for (auto required_param : {"endpoint", "bucket", "snapshot", "keyspace"}) {
for (auto required_param : {"endpoint", "bucket", "prefix", "keyspace", "table"}) {
if (!vm.count(required_param)) {
throw std::invalid_argument(fmt::format("missing required parameter: {}", required_param));
}
params[required_param] = vm[required_param].as<sstring>();
}
if (vm.count("table")) {
params["table"] = vm["table"].as<sstring>();
}
const auto restore_res = client.post("/storage_service/restore", std::move(params));
sstring sstables_body = std::invoke([&vm] {
std::stringstream output;
rjson::streaming_writer writer(output);
writer.StartArray();
for (auto& toc_fn : vm["sstables"].as<std::vector<sstring>>()) {
writer.String(toc_fn);
}
writer.EndArray();
return make_sstring(output.view());
});
const auto restore_res = client.post("/storage_service/restore", std::move(params),
request_body{"application/json", std::move(sstables_body)});
const auto task_id = rjson::to_string_view(restore_res);
if (vm.count("nowait")) {
fmt::print(R"(The task id of this operation is {}
@@ -3889,12 +3897,14 @@ For more information, see: {}"
{
typed_option<sstring>("endpoint", "ID of the configured object storage endpoint to copy SSTables from"),
typed_option<sstring>("bucket", "Name of the bucket to copy SSTables from"),
typed_option<sstring>("snapshot", "Name of a snapshot to copy sstables from"),
typed_option<sstring>("prefix", "The shared prefix of the object keys for the backuped SSTables"),
typed_option<sstring>("keyspace", "Name of a keyspace to copy SSTables to"),
typed_option<sstring>("table", "Name of a table to copy SSTables to"),
typed_option<>("nowait", "Don't wait on the restore process"),
},
{
typed_option<std::vector<sstring>>("sstables", "The object keys of the TOC component of the SSTables to be restored", -1),
},
},
restore_operation
},