diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index c743a03a19..c7926e5761 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -847,13 +847,25 @@ "paramType":"query" }, { - "name":"snapshot", - "description":"Name of a snapshot to copy SSTables from", + "name":"prefix", + "description":"The prefix of the object keys for the backuped SSTables", "required":true, "allowMultiple":false, "type":"string", "paramType":"query" }, + { + "in": "body", + "name": "sstables", + "description": "The list of the object keys of the TOC component of the SSTables to be restored", + "required":true, + "schema" :{ + "type": "array", + "items": { + "type": "string" + } + } + }, { "name":"keyspace", "description":"Name of a keyspace to copy SSTables to", @@ -865,7 +877,7 @@ { "name":"table", "description":"Name of a table to copy SSTables to", - "required":false, + "required":true, "allowMultiple":false, "type":"string", "paramType":"query" diff --git a/api/storage_service.cc b/api/storage_service.cc index c4c4c31a76..924dcb3f5d 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -54,6 +54,7 @@ #include "locator/abstract_replication_strategy.hh" #include "sstables_loader.hh" #include "db/view/view_builder.hh" +#include "utils/rjson.hh" #include "utils/user_provided_param.hh" using namespace seastar::httpd; @@ -496,13 +497,19 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded& auto keyspace = req->get_query_param("keyspace"); auto table = req->get_query_param("table"); auto bucket = req->get_query_param("bucket"); - auto snapshot_name = req->get_query_param("snapshot"); - if (table.empty()) { - // TODO: If missing, should restore all tables - throw httpd::bad_param_exception("The table name must be specified"); - } + auto prefix = req->get_query_param("prefix"); - auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, endpoint, bucket, snapshot_name); + // TODO: the http_server backing the API does not use content streaming + // should use it for better performance + rjson::value parsed = rjson::parse(req->content); + if (!parsed.IsArray()) { + throw httpd::bad_param_exception("mulformatted sstables in body"); + } + std::vector sstables; + for (const rjson::value& element : parsed.GetArray()) { + sstables.emplace_back(rjson::to_string_view(element)); + } + auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket); co_return json::json_return_type(fmt::to_string(task_id)); }); diff --git a/data_dictionary/data_dictionary.cc b/data_dictionary/data_dictionary.cc index 79dfa79ebc..be80396ea9 100644 --- a/data_dictionary/data_dictionary.cc +++ b/data_dictionary/data_dictionary.cc @@ -333,6 +333,41 @@ bool storage_options::can_update_to(const storage_options& new_options) { return value == new_options.value; } +storage_options storage_options::append_to_s3_prefix(const sstring& s) const { + // when restoring from object storage, the API of /storage_service/restore + // provides: + // 1. a shared prefix + // 2. a list of sstables, each of which has its own partial path + // + // for example, assuming we have following API call: + // - shared prefix: /bucket/ks/cf + // - sstables + // - 3123/me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt + // - 3123/me-3gdq_0bki_2edkg2vx4xtksugjj5-big-TOC.txt + // - 3245/me-3gdq_0bki_2cvk02wubgncy8qd41-big-TOC.txt + // + // note, this example shows three sstables from two different snapshot backups. + // + // we assume all sstables' locations share the same base prefix (storage_options::s3::prefix). + // however, sstable in different backups have different prefixes. to handle this, we compose + // a per-sstable prefix by concatenating the shared prefix and the "parent directory" of the + // sstable's location. the resulting structure looks like: + // + // sstables: + // - prefix: /bucket/ks/cf/3123 + // desc: me-3gdq_0bki_2cvk01yl83nj0tp5gh-big + // - prefix: /bucket/ks/cf/3123 + // desc: me-3gdq_0bki_2edkg2vx4xtksugjj5-big + // - prefix: /bucket/ks/cf/3145 + // desc: me-3gdq_0bki_2cvk02wubgncy8qd41-big + SCYLLA_ASSERT(!is_local_type()); + storage_options ret = *this; + s3 s3_options = std::get(value); + s3_options.prefix += s; + ret.value = std::move(s3_options); + return ret; +} + no_such_keyspace::no_such_keyspace(std::string_view ks_name) : runtime_error{fmt::format("Can't find a keyspace {}", ks_name)} { diff --git a/data_dictionary/storage_options.hh b/data_dictionary/storage_options.hh index 460c55da62..5917c21450 100644 --- a/data_dictionary/storage_options.hh +++ b/data_dictionary/storage_options.hh @@ -47,6 +47,8 @@ struct storage_options { bool can_update_to(const storage_options& new_options); static value_type from_map(std::string_view type, std::map values); + + storage_options append_to_s3_prefix(const sstring& s) const; }; inline storage_options make_local_options(std::filesystem::path dir) { diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 3fc006391b..09982c5e8d 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -234,14 +234,16 @@ distributed_loader::get_sstables_from_upload_dir(distributed& } future>>> -distributed_loader::get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) { - return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix] (auto& global_table, auto& directory) { +distributed_loader::get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) { + return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables)] (auto& global_table, auto& directory) { return directory.start(global_table.as_sharded_parameter(), sharded_parameter([bucket, endpoint, prefix] { data_dictionary::storage_options opts; opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix}; return make_lw_shared(std::move(opts)); - }), &error_handler_gen_for_upload_dir); + }), + sstables, + &error_handler_gen_for_upload_dir); }); } diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 1fb91c95e5..3a4559ac4d 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -89,7 +89,7 @@ public: static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); static future>>> - get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg); + get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg); static future<> process_upload_dir(distributed& db, sharded& vb, sstring ks_name, sstring cf_name); }; diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 7eaf403e2a..204707550a 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -61,6 +61,12 @@ sstable_directory::sstables_registry_components_lister::sstables_registry_compon { } +sstable_directory::restore_components_lister::restore_components_lister(const data_dictionary::storage_options::value_type& options, + std::vector toc_filenames) + : _toc_filenames(std::move(toc_filenames)) +{ +} + std::unique_ptr sstable_directory::make_components_lister() { return std::visit(overloaded_functor { @@ -110,6 +116,23 @@ sstable_directory::sstable_directory(replica::table& table, ) {} +sstable_directory::sstable_directory(replica::table& table, + lw_shared_ptr storage_opts, + std::vector sstables, + io_error_handler_gen error_handler_gen) + : _manager(table.get_sstables_manager()) + , _schema(table.schema()) + , _storage_opts(std::move(storage_opts)) + , _state(sstable_state::upload) + , _error_handler_gen(error_handler_gen) + , _storage(make_storage(_manager, *_storage_opts, _state)) + , _lister(std::make_unique(_storage_opts->value, + std::move(sstables))) + , _sharder_ptr(std::make_unique(table.shared_from_this())) + , _sharder(*_sharder_ptr) + , _unshared_remote_sstables(smp::count) +{} + sstable_directory::sstable_directory(sstables_manager& manager, schema_ptr schema, const dht::sharder& sharder, @@ -293,6 +316,10 @@ future<> sstable_directory::sstables_registry_components_lister::prepare(sstable } } +future<> sstable_directory::restore_components_lister::prepare(sstable_directory& dir, process_flags flags, storage& st) { + return make_ready_future(); +} + future<> sstable_directory::process_sstable_dir(process_flags flags) { return _lister->process(*this, flags); } @@ -406,6 +433,19 @@ future<> sstable_directory::sstables_registry_components_lister::process(sstable }); } +future<> sstable_directory::restore_components_lister::process(sstable_directory& directory, process_flags flags) { + co_await coroutine::parallel_for_each(_toc_filenames, [flags, &directory] (sstring toc_filename) -> future<> { + std::filesystem::path sst_path{toc_filename}; + entry_descriptor desc = sstables::parse_path(sst_path, "", ""); + if (!sstable_generation_generator::maybe_owned_by_this_shard(desc.generation)) { + co_return; + } + dirlog.debug("Processing {} entry from {}", desc.generation, toc_filename); + co_await directory.process_descriptor( + std::move(desc), flags, + [&directory, prefix=sst_path.parent_path().native()] { + return directory._storage_opts->append_to_s3_prefix(prefix);; + }); }); } @@ -425,6 +465,10 @@ future<> sstable_directory::sstables_registry_components_lister::commit() { return make_ready_future<>(); } +future<> sstable_directory::restore_components_lister::commit() { + return make_ready_future<>(); +} + future<> sstable_directory::sstables_registry_components_lister::garbage_collect(storage& st) { std::set gens_to_remove; co_await _sstables_registry.sstables_registry_list(_location, coroutine::lambda([&st, &gens_to_remove] (sstring status, sstable_state state, entry_descriptor desc) -> future<> { diff --git a/sstables/sstable_directory.hh b/sstables/sstable_directory.hh index 2b36daf284..5b2e64588a 100644 --- a/sstables/sstable_directory.hh +++ b/sstables/sstable_directory.hh @@ -143,6 +143,15 @@ public: virtual future<> prepare(sstable_directory&, process_flags, storage&) override; }; + class restore_components_lister final : public components_lister { + std::vector _toc_filenames; + public: + restore_components_lister(const data_dictionary::storage_options::value_type& options, std::vector toc_filenames); + virtual future<> process(sstable_directory& directory, process_flags flags) override; + virtual future<> commit() override; + virtual future<> prepare(sstable_directory&, process_flags, storage&) override; + }; + private: // prevents an object that respects a phaser (usually a table) from disappearing in the middle of the operation. @@ -206,6 +215,10 @@ public: sstable_directory(replica::table& table, lw_shared_ptr storage_opts, io_error_handler_gen error_handler_gen); + sstable_directory(replica::table& table, + lw_shared_ptr storage_opts, + std::vector sstables, + io_error_handler_gen error_handler_gen); sstable_directory(sstables_manager& manager, schema_ptr schema, const dht::sharder& sharder, diff --git a/sstables_loader.cc b/sstables_loader.cc index 1cc3a227ff..89eb324cf4 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -454,6 +454,8 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im sstring _bucket; sstring _ks; sstring _cf; + sstring _prefix; + std::vector _sstables; sstring _snapshot_name; protected: @@ -462,14 +464,15 @@ protected: public: download_task_impl(tasks::task_manager::module_ptr module, sharded& loader, sstring endpoint, sstring bucket, - sstring ks, sstring cf, sstring snapshot) noexcept + sstring ks, sstring cf, sstring prefix, std::vector sstables) noexcept : tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id()) , _loader(loader) , _endpoint(std::move(endpoint)) , _bucket(std::move(bucket)) , _ks(std::move(ks)) , _cf(std::move(cf)) - , _snapshot_name(std::move(snapshot)) + , _prefix(std::move(prefix)) + , _sstables(std::move(sstables)) {} virtual std::string type() const override { @@ -487,10 +490,9 @@ future<> sstables_loader::download_task_impl::run() { sstables::sstable_open_config cfg { .load_bloom_filter = false, }; - auto prefix = format("{}/{}", _cf, _snapshot_name); - llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, prefix); - auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _endpoint, _bucket, prefix, cfg); - llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, prefix); + llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix); + auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg); + llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix); co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> { co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false); }); @@ -517,11 +519,12 @@ future<> sstables_loader::stop() { } future sstables_loader::download_new_sstables(sstring ks_name, sstring cf_name, - sstring endpoint, sstring bucket, sstring snapshot) { + sstring prefix, std::vector sstables, + sstring endpoint, sstring bucket) { if (!_storage_manager.is_known_endpoint(endpoint)) { throw std::invalid_argument(format("endpoint {} not found", endpoint)); } - llog.info("Restore sstables from {}({}) to {}", endpoint, snapshot, ks_name); - auto task = co_await _task_manager_module->make_and_start_task({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(snapshot)); + llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name); + auto task = co_await _task_manager_module->make_and_start_task({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables)); co_return task->id(); } diff --git a/sstables_loader.hh b/sstables_loader.hh index 402977345d..0d5de4cb7c 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -8,6 +8,7 @@ #pragma once +#include #include #include "schema/schema_fwd.hh" #include "sstables/shared_sstable.hh" @@ -88,7 +89,8 @@ public: * Download new SSTables not currently tracked by the system from object store */ future download_new_sstables(sstring ks_name, sstring cf_name, - sstring endpoint, sstring bucket, sstring snapshot); + sstring prefix, std::vector sstables, + sstring endpoint, sstring bucket); class download_task_impl; }; diff --git a/test/nodetool/test_restore.py b/test/nodetool/test_restore.py index 1173c4ea22..4b7889194b 100644 --- a/test/nodetool/test_restore.py +++ b/test/nodetool/test_restore.py @@ -8,26 +8,21 @@ import pytest from test.nodetool.rest_api_mock import expected_request -@pytest.mark.parametrize("table", - ["cf", - pytest.param("", - marks=pytest.mark.xfail( - reason="full keyspace restore not implemented yet"))]) @pytest.mark.parametrize("nowait,task_state,task_error", [(False, "failed", "error"), (False, "done", ""), (True, "", "")]) -def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error): +def test_restore(nodetool, scylla_only, nowait, task_state, task_error): endpoint = "s3.us-east-2.amazonaws.com" bucket = "bucket-foo" keyspace = "ks" + table = "cf" + prefix = "foo/bar" - snapshot = "ss" params = {"endpoint": endpoint, "bucket": bucket, - "snapshot": snapshot, - "keyspace": keyspace} - if table: - params["table"] = table + "prefix": prefix, + "keyspace": keyspace, + "table": table} task_id = "2c4a3e5f" start_time = "2024-08-08T14:29:25Z" @@ -48,25 +43,29 @@ def test_restore(nodetool, scylla_only, table, nowait, task_state, task_error): "progress_completed": 1.0, "children_ids": [] } + # just generate filenames of TOC components of random sstables + sstables = [f"me-{id}-big-TOC.txt" for id in range(12)] expected_requests = [ expected_request( "POST", "/storage_service/restore", params, + sstables, response=task_id) ] args = ["restore", "--endpoint", endpoint, "--bucket", bucket, - "--snapshot", snapshot, - "--keyspace", keyspace] - if table: - args.extend(["--table", table]) + "--prefix", prefix, + "--keyspace", keyspace, + "--table", table] if nowait: args.append("--nowait") + args.extend(sstables) res = nodetool(*args, expected_requests=expected_requests) assert task_id in res.stdout else: + args.extend(sstables) # wait for the completion of backup task expected_requests.append( expected_request( diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index 5b6092c2a4..9d6faa7e25 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -153,6 +153,8 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server): orig_res = cql.execute(f"SELECT * FROM {ks}.{cf}") orig_rows = { x.name: x.value for x in orig_res } + toc_names = [entry.name for entry in list_sstables() if entry.name.endswith('TOC.txt')] + prefix = f'{cf}/{snap_name}' tid = await manager.api.backup(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix) status = await manager.api.wait_task(server.ip_addr, tid) @@ -166,7 +168,7 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server): assert not res print(f'Try to restore') - tid = await manager.api.restore(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name) + tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names) status = await manager.api.wait_task(server.ip_addr, tid) assert (status is not None) and (status['state'] == 'done') print(f'Check that sstables came back') diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index a06edf0763..d6cad3d958 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -317,14 +317,14 @@ class ScyllaRESTAPIClient(): "snapshot": tag} return await self.client.post_json(f"/storage_service/backup", host=node_ip, params=params) - async def restore(self, node_ip: str, ks: str, cf: str, tag: str, dest: str, bucket: str) -> str: + async def restore(self, node_ip: str, ks: str, cf: str, dest: str, bucket: str, prefix: str, sstables: list[str]) -> str: """Restore keyspace:table from backup""" params = {"keyspace": ks, "table": cf, "endpoint": dest, "bucket": bucket, - "snapshot": tag} - return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params) + "prefix": prefix} + return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables) async def take_snapshot(self, node_ip: str, ks: str, tag: str) -> None: """Take keyspace snapshot""" diff --git a/tools/scylla-nodetool.cc b/tools/scylla-nodetool.cc index e1fb785807..515bf9b3ca 100644 --- a/tools/scylla-nodetool.cc +++ b/tools/scylla-nodetool.cc @@ -1507,16 +1507,24 @@ void repair_operation(scylla_rest_client& client, const bpo::variables_map& vm) void restore_operation(scylla_rest_client& client, const bpo::variables_map& vm) { std::unordered_map params; - for (auto required_param : {"endpoint", "bucket", "snapshot", "keyspace"}) { + for (auto required_param : {"endpoint", "bucket", "prefix", "keyspace", "table"}) { if (!vm.count(required_param)) { throw std::invalid_argument(fmt::format("missing required parameter: {}", required_param)); } params[required_param] = vm[required_param].as(); } - if (vm.count("table")) { - params["table"] = vm["table"].as(); - } - const auto restore_res = client.post("/storage_service/restore", std::move(params)); + sstring sstables_body = std::invoke([&vm] { + std::stringstream output; + rjson::streaming_writer writer(output); + writer.StartArray(); + for (auto& toc_fn : vm["sstables"].as>()) { + writer.String(toc_fn); + } + writer.EndArray(); + return make_sstring(output.view()); + }); + const auto restore_res = client.post("/storage_service/restore", std::move(params), + request_body{"application/json", std::move(sstables_body)}); const auto task_id = rjson::to_string_view(restore_res); if (vm.count("nowait")) { fmt::print(R"(The task id of this operation is {} @@ -3889,12 +3897,14 @@ For more information, see: {}" { typed_option("endpoint", "ID of the configured object storage endpoint to copy SSTables from"), typed_option("bucket", "Name of the bucket to copy SSTables from"), - typed_option("snapshot", "Name of a snapshot to copy sstables from"), + typed_option("prefix", "The shared prefix of the object keys for the backuped SSTables"), typed_option("keyspace", "Name of a keyspace to copy SSTables to"), typed_option("table", "Name of a table to copy SSTables to"), typed_option<>("nowait", "Don't wait on the restore process"), }, - + { + typed_option>("sstables", "The object keys of the TOC component of the SSTables to be restored", -1), + }, }, restore_operation },