sstables_loader: add function for parsing backup manifests

This change adds functionality for parsing backup manifests
and populating system_distributed.snapshot_sstables with
the content of the manifests.
This change is useful for tablet-aware restore. The function
introduced here will be called by the coordinator node
when restore starts to populate the snapshot_sstables table
with the data that workers need to execute the restore process.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
Co-authored-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Robert Bindar
2026-02-09 10:41:21 +02:00
committed by Pavel Emelyanov
parent f0e8d6c9dd
commit c97232bb7b
3 changed files with 167 additions and 0 deletions

View File

@@ -33,6 +33,10 @@
#include "service/storage_service.hh"
#include "utils/error_injection.hh"
#include "sstables/object_storage_client.hh"
#include "utils/rjson.hh"
#include "db/system_distributed_keyspace.hh"
#include <cfloat>
#include <algorithm>
@@ -1020,3 +1024,84 @@ future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_test
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
}
static future<size_t> process_manifest(input_stream<char>& is, const sstring& expected_snapshot_name,
const sstring& manifest_prefix, db::system_distributed_keyspace& sys_dist_ks,
db::consistency_level cl) {
// Read the entire JSON content
rjson::chunked_content content = co_await util::read_entire_stream(is);
rjson::value parsed = rjson::parse(std::move(content));
// Basic validation that tablet_count is a power of 2, as expected by our restore process
size_t tablet_count = parsed["table"]["tablet_count"].GetUint64();
if (!std::has_single_bit(tablet_count)) {
on_internal_error(llog, fmt::format("Invalid tablet_count {} in manifest {}, expected a power of 2", tablet_count, manifest_prefix));
}
// Extract the necessary fields from the manifest
// Expected JSON structure documented in docs/dev/object_storage.md
auto snapshot_name = rjson::to_sstring(parsed["snapshot"]["name"]);
if (snapshot_name != expected_snapshot_name) {
throw std::runtime_error(fmt::format("Manifest {} belongs to snapshot '{}', expected '{}'",
manifest_prefix, snapshot_name, expected_snapshot_name));
}
auto keyspace = rjson::to_sstring(parsed["table"]["keyspace_name"]);
auto table = rjson::to_sstring(parsed["table"]["table_name"]);
auto datacenter = rjson::to_sstring(parsed["node"]["datacenter"]);
auto rack = rjson::to_sstring(parsed["node"]["rack"]);
// Process each sstable entry in the manifest
// FIXME: cleanup of the snapshot-related rows is needed in case anything throws in here.
auto sstables = rjson::find(parsed, "sstables");
if (!sstables) {
co_return tablet_count;
}
if (!sstables->IsArray()) {
throw std::runtime_error("Malformed manifest, 'sstables' is not array");
}
for (auto& sstable_entry : sstables->GetArray()) {
auto id = rjson::to_sstable_id(sstable_entry["id"]);
auto first_token = rjson::to_token(sstable_entry["first_token"]);
auto last_token = rjson::to_token(sstable_entry["last_token"]);
auto toc_name = rjson::to_sstring(sstable_entry["toc_name"]);
auto prefix = sstring(std::filesystem::path(manifest_prefix).parent_path().string());
// Insert the snapshot sstable metadata into system_distributed.snapshot_sstables with a TTL of 3 days, that should be enough
// for any snapshot restore operation to complete, and after that the metadata will be automatically cleaned up from the table
co_await sys_dist_ks.insert_snapshot_sstable(snapshot_name, keyspace, table, datacenter, rack, id, first_token, last_token,
toc_name, prefix, cl);
}
co_return tablet_count;
}
future<size_t> populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector<sstring> manifest_prefixes, db::consistency_level cl) {
if (manifest_prefixes.empty()) {
throw std::invalid_argument("manifest prefixes list must not be empty");
}
// Download manifests in parallel and populate system_distributed.snapshot_sstables
// with the content extracted from each manifest
auto client = sm.get_endpoint_client(endpoint);
// tablet_count to be returned by this function, we also validate that all manifests passed contain the same tablet count
std::optional<size_t> tablet_count;
co_await seastar::max_concurrent_for_each(manifest_prefixes, 16, [&] (const sstring& manifest_prefix) {
// Download the manifest JSON file
sstables::object_name name(bucket, manifest_prefix);
auto source = client->make_download_source(name);
return seastar::with_closeable(input_stream<char>(std::move(source)), [&] (input_stream<char>& is) {
return process_manifest(is, expected_snapshot_name, manifest_prefix, sys_dist_ks, cl).then([&](size_t count) {
if (!tablet_count) {
tablet_count = count;
} else if (*tablet_count != count) {
throw std::runtime_error(fmt::format("Inconsistent tablet_count values in manifest {}: expected {}, got {}", manifest_prefix, *tablet_count, count));
}
});
});
});
co_return *tablet_count;
}

View File

@@ -15,6 +15,8 @@
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
#include "db/consistency_level_type.hh"
using namespace seastar;
@@ -172,3 +174,5 @@ struct tablet_sstable_collection {
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
future<size_t> populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector<sstring> manifest_prefixes, db::consistency_level cl = db::consistency_level::EACH_QUORUM);

View File

@@ -11,6 +11,8 @@
#include "test/lib/cql_test_env.hh"
#include "utils/assert.hh"
#include <seastar/core/sstring.hh>
#include <fmt/ranges.h>
#include <fmt/format.h>
#include <seastar/core/future.hh>
#include <seastar/testing/test_case.hh>
@@ -19,8 +21,18 @@
#include "db/config.hh"
#include "db/consistency_level_type.hh"
#include "db/system_distributed_keyspace.hh"
#include "sstables/object_storage_client.hh"
#include "sstables/storage.hh"
#include "sstables_loader.hh"
#include "replica/database_fwd.hh"
#include "tasks/task_handler.hh"
#include "db/system_distributed_keyspace.hh"
#include "service/storage_proxy.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/sstable_test_env.hh"
#include "test/boost/database_test.hh"
SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
@@ -76,3 +88,69 @@ SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::pr
BOOST_CHECK_EQUAL(empty.size(), 0);
}, db_cfg_ptr);
}
using namespace sstables;
future<> backup(cql_test_env& env, sstring endpoint, sstring bucket) {
sharded<db::snapshot_ctl> ctl;
co_await ctl.start(std::ref(env.db()), std::ref(env.get_storage_proxy()), std::ref(env.get_task_manager()), std::ref(env.get_sstorage_manager()), db::snapshot_ctl::config{});
auto prefix = "/backup";
auto task_id = co_await ctl.local().start_backup(endpoint, bucket, prefix, "ks", "cf", "snapshot", false);
auto task = tasks::task_handler{env.get_task_manager().local(), task_id};
auto status = co_await task.wait_for_task(30s);
BOOST_REQUIRE(status.state == tasks::task_manager::task_state::done);
co_await ctl.stop();
}
future<> check_snapshot_sstables(cql_test_env& env) {
auto& topology = env.get_storage_proxy().local().get_token_metadata_ptr()->get_topology();
auto dc = topology.get_datacenter();
auto rack = topology.get_rack();
auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables("snapshot", "ks", "cf", dc, rack, db::consistency_level::ONE);
// Check that the sstables in system_distributed.snapshot_sstables match the sstables in the snapshot directory on disk
auto& cf = env.local_db().find_column_family("ks", "cf");
auto tabledir = tests::table_dir(cf);
auto snapshot_dir = tabledir / sstables::snapshots_dir / "snapshot";
std::set<sstring> expected_sstables;
directory_lister lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
while (auto de = co_await lister.get()) {
if (!de->name.ends_with("-TOC.txt")) {
continue;
}
expected_sstables.insert(de->name);
}
BOOST_CHECK_EQUAL(sstables.size(), expected_sstables.size());
for (const auto& sstable : sstables) {
BOOST_CHECK(expected_sstables.contains(sstable.toc_name));
}
}
SEASTAR_TEST_CASE(test_populate_snapshot_sstables_from_manifests, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
using namespace sstables;
auto db_cfg_ptr = make_shared<db::config>();
db_cfg_ptr->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
db_cfg_ptr->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS});
auto storage_options = make_test_object_storage_options("S3");
db_cfg_ptr->object_storage_endpoints(make_storage_options_config(storage_options));
return do_with_some_data_in_thread({"cf"}, [storage_options = std::move(storage_options)] (cql_test_env& env) {
take_snapshot(env, "ks", "cf", "snapshot").get();
auto ep = storage_options.to_map()["endpoint"];
auto bucket = storage_options.to_map()["bucket"];
backup(env, ep, bucket).get();
BOOST_REQUIRE_THROW(populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), ep, bucket, "unexpected_snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get(), std::runtime_error);;
// populate system_distributed.snapshot_sstables with the content of the snapshot manifest
populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), ep, bucket, "snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get();
check_snapshot_sstables(env).get();
}, false, db_cfg_ptr, 10);
}