diff --git a/sstables_loader.cc b/sstables_loader.cc index 63902ca889..510147018a 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -33,6 +33,10 @@ #include "service/storage_service.hh" #include "utils/error_injection.hh" +#include "sstables/object_storage_client.hh" +#include "utils/rjson.hh" +#include "db/system_distributed_keyspace.hh" + #include #include @@ -1020,3 +1024,84 @@ future> get_sstables_for_tablets_for_test std::vector&& tablets_ranges) { return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges)); } + +static future process_manifest(input_stream& is, const sstring& expected_snapshot_name, + const sstring& manifest_prefix, db::system_distributed_keyspace& sys_dist_ks, + db::consistency_level cl) { + // Read the entire JSON content + rjson::chunked_content content = co_await util::read_entire_stream(is); + + rjson::value parsed = rjson::parse(std::move(content)); + + // Basic validation that tablet_count is a power of 2, as expected by our restore process + size_t tablet_count = parsed["table"]["tablet_count"].GetUint64(); + if (!std::has_single_bit(tablet_count)) { + on_internal_error(llog, fmt::format("Invalid tablet_count {} in manifest {}, expected a power of 2", tablet_count, manifest_prefix)); + } + + // Extract the necessary fields from the manifest + // Expected JSON structure documented in docs/dev/object_storage.md + auto snapshot_name = rjson::to_sstring(parsed["snapshot"]["name"]); + if (snapshot_name != expected_snapshot_name) { + throw std::runtime_error(fmt::format("Manifest {} belongs to snapshot '{}', expected '{}'", + manifest_prefix, snapshot_name, expected_snapshot_name)); + } + auto keyspace = rjson::to_sstring(parsed["table"]["keyspace_name"]); + auto table = rjson::to_sstring(parsed["table"]["table_name"]); + auto datacenter = rjson::to_sstring(parsed["node"]["datacenter"]); + auto rack = rjson::to_sstring(parsed["node"]["rack"]); + + // Process each sstable entry in the manifest + // FIXME: cleanup of the snapshot-related rows is needed in case anything throws in here. + auto sstables = rjson::find(parsed, "sstables"); + if (!sstables) { + co_return tablet_count; + } + if (!sstables->IsArray()) { + throw std::runtime_error("Malformed manifest, 'sstables' is not array"); + } + + for (auto& sstable_entry : sstables->GetArray()) { + auto id = rjson::to_sstable_id(sstable_entry["id"]); + auto first_token = rjson::to_token(sstable_entry["first_token"]); + auto last_token = rjson::to_token(sstable_entry["last_token"]); + auto toc_name = rjson::to_sstring(sstable_entry["toc_name"]); + auto prefix = sstring(std::filesystem::path(manifest_prefix).parent_path().string()); + // Insert the snapshot sstable metadata into system_distributed.snapshot_sstables with a TTL of 3 days, that should be enough + // for any snapshot restore operation to complete, and after that the metadata will be automatically cleaned up from the table + co_await sys_dist_ks.insert_snapshot_sstable(snapshot_name, keyspace, table, datacenter, rack, id, first_token, last_token, + toc_name, prefix, cl); + } + + co_return tablet_count; +} + +future populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector manifest_prefixes, db::consistency_level cl) { + if (manifest_prefixes.empty()) { + throw std::invalid_argument("manifest prefixes list must not be empty"); + } + + // Download manifests in parallel and populate system_distributed.snapshot_sstables + // with the content extracted from each manifest + auto client = sm.get_endpoint_client(endpoint); + + // tablet_count to be returned by this function, we also validate that all manifests passed contain the same tablet count + std::optional tablet_count; + + co_await seastar::max_concurrent_for_each(manifest_prefixes, 16, [&] (const sstring& manifest_prefix) { + // Download the manifest JSON file + sstables::object_name name(bucket, manifest_prefix); + auto source = client->make_download_source(name); + return seastar::with_closeable(input_stream(std::move(source)), [&] (input_stream& is) { + return process_manifest(is, expected_snapshot_name, manifest_prefix, sys_dist_ks, cl).then([&](size_t count) { + if (!tablet_count) { + tablet_count = count; + } else if (*tablet_count != count) { + throw std::runtime_error(fmt::format("Inconsistent tablet_count values in manifest {}: expected {}, got {}", manifest_prefix, *tablet_count, count)); + } + }); + }); + }); + + co_return *tablet_count; +} diff --git a/sstables_loader.hh b/sstables_loader.hh index c96c839d89..7173f116ee 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -15,6 +15,8 @@ #include "schema/schema_fwd.hh" #include "sstables/shared_sstable.hh" #include "tasks/task_manager.hh" +#include "db/consistency_level_type.hh" + using namespace seastar; @@ -172,3 +174,5 @@ struct tablet_sstable_collection { // Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order. future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges); + +future populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector manifest_prefixes, db::consistency_level cl = db::consistency_level::EACH_QUORUM); diff --git a/test/boost/tablet_aware_restore_test.cc b/test/boost/tablet_aware_restore_test.cc index c8d76b2075..af490f254e 100644 --- a/test/boost/tablet_aware_restore_test.cc +++ b/test/boost/tablet_aware_restore_test.cc @@ -11,6 +11,8 @@ #include "test/lib/cql_test_env.hh" #include "utils/assert.hh" #include +#include +#include #include #include @@ -19,8 +21,18 @@ #include "db/config.hh" #include "db/consistency_level_type.hh" #include "db/system_distributed_keyspace.hh" +#include "sstables/object_storage_client.hh" +#include "sstables/storage.hh" +#include "sstables_loader.hh" +#include "replica/database_fwd.hh" +#include "tasks/task_handler.hh" +#include "db/system_distributed_keyspace.hh" +#include "service/storage_proxy.hh" #include "test/lib/test_utils.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/sstable_test_env.hh" +#include "test/boost/database_test.hh" SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::precondition(tests::has_scylla_test_env)) { @@ -76,3 +88,69 @@ SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::pr BOOST_CHECK_EQUAL(empty.size(), 0); }, db_cfg_ptr); } + +using namespace sstables; + +future<> backup(cql_test_env& env, sstring endpoint, sstring bucket) { + sharded ctl; + co_await ctl.start(std::ref(env.db()), std::ref(env.get_storage_proxy()), std::ref(env.get_task_manager()), std::ref(env.get_sstorage_manager()), db::snapshot_ctl::config{}); + auto prefix = "/backup"; + + auto task_id = co_await ctl.local().start_backup(endpoint, bucket, prefix, "ks", "cf", "snapshot", false); + auto task = tasks::task_handler{env.get_task_manager().local(), task_id}; + auto status = co_await task.wait_for_task(30s); + BOOST_REQUIRE(status.state == tasks::task_manager::task_state::done); + + co_await ctl.stop(); +} + +future<> check_snapshot_sstables(cql_test_env& env) { + auto& topology = env.get_storage_proxy().local().get_token_metadata_ptr()->get_topology(); + auto dc = topology.get_datacenter(); + auto rack = topology.get_rack(); + auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables("snapshot", "ks", "cf", dc, rack, db::consistency_level::ONE); + + // Check that the sstables in system_distributed.snapshot_sstables match the sstables in the snapshot directory on disk + auto& cf = env.local_db().find_column_family("ks", "cf"); + auto tabledir = tests::table_dir(cf); + auto snapshot_dir = tabledir / sstables::snapshots_dir / "snapshot"; + std::set expected_sstables; + directory_lister lister(snapshot_dir, lister::dir_entry_types::of()); + while (auto de = co_await lister.get()) { + if (!de->name.ends_with("-TOC.txt")) { + continue; + } + expected_sstables.insert(de->name); + } + + BOOST_CHECK_EQUAL(sstables.size(), expected_sstables.size()); + + for (const auto& sstable : sstables) { + BOOST_CHECK(expected_sstables.contains(sstable.toc_name)); + } +} + +SEASTAR_TEST_CASE(test_populate_snapshot_sstables_from_manifests, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + using namespace sstables; + + auto db_cfg_ptr = make_shared(); + db_cfg_ptr->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled); + db_cfg_ptr->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS}); + auto storage_options = make_test_object_storage_options("S3"); + db_cfg_ptr->object_storage_endpoints(make_storage_options_config(storage_options)); + + return do_with_some_data_in_thread({"cf"}, [storage_options = std::move(storage_options)] (cql_test_env& env) { + take_snapshot(env, "ks", "cf", "snapshot").get(); + + auto ep = storage_options.to_map()["endpoint"]; + auto bucket = storage_options.to_map()["bucket"]; + backup(env, ep, bucket).get(); + + BOOST_REQUIRE_THROW(populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), ep, bucket, "unexpected_snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get(), std::runtime_error);; + + // populate system_distributed.snapshot_sstables with the content of the snapshot manifest + populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), ep, bucket, "snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get(); + + check_snapshot_sstables(env).get(); + }, false, db_cfg_ptr, 10); +}