From daa09abc2a75716f25415bc45f11b5ef9c7af4e0 Mon Sep 17 00:00:00 2001 From: Robert Bindar Date: Thu, 29 Jan 2026 15:31:51 +0200 Subject: [PATCH] Add system_distributed_keyspace.snapshot_sstables This patch adds the snapshot_sstables table with the following schema: ```cql CREATE TABLE system_distributed.snapshot_sstables ( snapshot_name text, keyspace text, table text, datacenter text, rack text, id uuid, first_token bigint, last_token bigint, toc_name text, prefix text) PRIMARY KEY ((snapshot_name, keyspace, table, datacenter, rack), first_token, id); ``` The table will be populated by the coordinator node during the restore phase (and later on during the backup phase to accomodate live-restore). The content of this table is meant to be consumed by the restore worker nodes which will use this data to filter and file-based download sstables. Fixes SCYLLADB-263 Signed-off-by: Robert Bindar --- configure.py | 1 + db/system_distributed_keyspace.cc | 91 ++++++++++++++++++++++++- db/system_distributed_keyspace.hh | 29 ++++++++ docs/dev/object_storage.md | 5 ++ docs/dev/snapshot_sstables.md | 52 ++++++++++++++ test/boost/CMakeLists.txt | 2 + test/boost/tablet_aware_restore_test.cc | 78 +++++++++++++++++++++ 7 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 docs/dev/snapshot_sstables.md create mode 100644 test/boost/tablet_aware_restore_test.cc diff --git a/configure.py b/configure.py index 23b1822388..300b699a79 100755 --- a/configure.py +++ b/configure.py @@ -560,6 +560,7 @@ scylla_tests = set([ 'test/boost/crc_test', 'test/boost/dict_trainer_test', 'test/boost/dirty_memory_manager_test', + 'test/boost/tablet_aware_restore_test', 'test/boost/double_decker_test', 'test/boost/duration_test', 'test/boost/dynamic_bitset_test', diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index b922720569..f2b53df54d 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -169,6 +169,36 @@ schema_ptr service_levels() { return schema; } +schema_ptr snapshot_sstables() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES, std::make_optional(id)) + // Name of the snapshot + .with_column("snapshot_name", utf8_type, column_kind::partition_key) + // Keyspace where the snapshot was taken + .with_column("keyspace", utf8_type, column_kind::partition_key) + // Table within the keyspace + .with_column("table", utf8_type, column_kind::partition_key) + // Datacenter where this SSTable is located + .with_column("datacenter", utf8_type, column_kind::partition_key) + // Rack where this SSTable is located + .with_column("rack", utf8_type, column_kind::partition_key) + // First token in the token range covered by this SSTable + .with_column("first_token", long_type, column_kind::clustering_key) + // Unique identifier for the SSTable (UUID) + .with_column("sstable_id", uuid_type, column_kind::clustering_key) + // Last token in the token range covered by this SSTable + .with_column("last_token", long_type) + // TOC filename of the SSTable + .with_column("toc_name", utf8_type) + // Prefix path in object storage where the SSTable was backed up + .with_column("prefix", utf8_type) + .with_hash_version() + .build(); + }(); + return schema; +} + // This is the set of tables which this node ensures to exist in the cluster. // It does that by announcing the creation of these schemas on initialization // of the `system_distributed_keyspace` service (see `start()`), unless it first @@ -186,11 +216,12 @@ static std::vector ensured_tables() { cdc_desc(), cdc_timestamps(), service_levels(), + snapshot_sstables(), }; } std::vector system_distributed_keyspace::all_distributed_tables() { - return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels()}; + return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels(), snapshot_sstables()}; } std::vector system_distributed_keyspace::all_everywhere_tables() { @@ -691,4 +722,62 @@ future<> system_distributed_keyspace::drop_service_level(sstring service_level_n return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}, cql3::query_processor::cache_internal::no).discard_result(); } +future<> system_distributed_keyspace::insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, db::consistency_level cl) { + static constexpr uint64_t ttl_seconds = std::chrono::seconds(std::chrono::days(3)).count(); + static const sstring query = format("INSERT INTO {}.{} (snapshot_name, \"keyspace\", \"table\", datacenter, rack, first_token, sstable_id, last_token, toc_name, prefix) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL {}", NAME, SNAPSHOT_SSTABLES, ttl_seconds); + + return _qp.execute_internal( + query, + cl, + internal_distributed_query_state(), + { std::move(snapshot_name), std::move(ks), std::move(table), std::move(dc), std::move(rack), + dht::token::to_int64(first_token), sstable_id.uuid(), dht::token::to_int64(last_token), std::move(toc_name), std::move(prefix) }, + cql3::query_processor::cache_internal::yes).discard_result(); +} + +future> +system_distributed_keyspace::get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl, std::optional start_token, std::optional end_token) const { + utils::chunked_vector sstables; + + static const sstring base_query = format("SELECT toc_name, prefix, sstable_id, first_token, last_token FROM {}.{}" + " WHERE snapshot_name = ? AND \"keyspace\" = ? AND \"table\" = ? AND datacenter = ? AND rack = ?", NAME, SNAPSHOT_SSTABLES); + + auto read_row = [&] (const cql3::untyped_result_set_row& row) { + sstables.emplace_back(sstables::sstable_id(row.get_as("sstable_id")), dht::token::from_int64(row.get_as("first_token")), dht::token::from_int64(row.get_as("last_token")), row.get_as("toc_name"), row.get_as("prefix")); + return make_ready_future(stop_iteration::no); + }; + + if (start_token && end_token) { + co_await _qp.query_internal( + base_query + " AND first_token >= ? AND first_token <= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token), dht::token::to_int64(*end_token) }, + 1000, + read_row); + } else if (start_token) { + co_await _qp.query_internal( + base_query + " AND first_token >= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token) }, + 1000, + read_row); + } else if (end_token) { + co_await _qp.query_internal( + base_query + " AND first_token <= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*end_token) }, + 1000, + read_row); + } else { + co_await _qp.query_internal( + base_query, + cl, + { snapshot_name, ks, table, dc, rack }, + 1000, + read_row); + } + + co_return sstables; +} + } diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 1e3a3ab805..87c057bb66 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -11,12 +11,17 @@ #include "schema/schema_fwd.hh" #include "service/qos/qos_common.hh" #include "utils/UUID.hh" +#include "utils/chunked_vector.hh" #include "cdc/generation_id.hh" +#include "db/consistency_level_type.hh" #include "locator/host_id.hh" +#include "dht/token.hh" +#include "sstables/types.hh" #include #include +#include #include namespace cql3 { @@ -34,8 +39,17 @@ namespace service { class migration_manager; } + namespace db { +struct snapshot_sstable_entry { + sstables::sstable_id sstable_id; + dht::token first_token; + dht::token last_token; + sstring toc_name; + sstring prefix; +}; + class system_distributed_keyspace { public: static constexpr auto NAME = "system_distributed"; @@ -62,6 +76,10 @@ public: * in the old table also appear in the new table, if necessary. */ static constexpr auto CDC_DESC_V1 = "cdc_streams_descriptions"; + /* This table is used by the backup and restore code to store per-sstable metadata. + * The data the coordinator node puts in this table comes from the snapshot manifests. */ + static constexpr auto SNAPSHOT_SSTABLES = "snapshot_sstables"; + /* Information required to modify/query some system_distributed tables, passed from the caller. */ struct context { /* How many different token owners (endpoints) are there in the token ring? */ @@ -102,6 +120,17 @@ public: future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const; future<> drop_service_level(sstring service_level_name) const; + /* Inserts a single SSTable entry for a given snapshot, keyspace, table, datacenter, + * and rack. The row is written with the specified TTL (in seconds). Uses consistency + * level `EACH_QUORUM` by default.*/ + future<> insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, db::consistency_level cl = db::consistency_level::EACH_QUORUM); + + /* Retrieves all SSTable entries for a given snapshot, keyspace, table, datacenter, and rack. + * If `start_token` and `end_token` are provided, only entries whose `first_token` is in the range [`start_token`, `end_token`] will be returned. + * Returns a vector of `snapshot_sstable_entry` structs containing `sstable_id`, `first_token`, `last_token`, + * `toc_name`, and `prefix`. Uses consistency level `LOCAL_QUORUM` by default. */ + future> get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl = db::consistency_level::LOCAL_QUORUM, std::optional start_token = std::nullopt, std::optional end_token = std::nullopt) const; + private: future<> create_tables(std::vector tables); }; diff --git a/docs/dev/object_storage.md b/docs/dev/object_storage.md index 441bc88b54..c9e31cbf4a 100644 --- a/docs/dev/object_storage.md +++ b/docs/dev/object_storage.md @@ -167,6 +167,11 @@ All tables in a keyspace are uploaded, the destination object names will look li or `gs://bucket/some/prefix/to/store/data/.../sstable` +# System tables +There are a few system tables that object storage related code needs to touch in order to operate. +* [system_distributed.snapshot_sstables](docs/dev/snapshot_sstables.md) - Used during restore by worker nodes to get the list of SSTables that need to be downloaded from object storage and restored locally. +* [system.sstables](docs/dev/system_keyspace.md#systemsstables) - Used to keep track of SSTables on object storage when a keyspace is created with object storage storage_options. + # Manipulating S3 data This section intends to give an overview of where, when and how we store data in S3 and provide a quick set of commands diff --git a/docs/dev/snapshot_sstables.md b/docs/dev/snapshot_sstables.md new file mode 100644 index 0000000000..cceb661ae0 --- /dev/null +++ b/docs/dev/snapshot_sstables.md @@ -0,0 +1,52 @@ +# system\_distributed.snapshot\_sstables + +## Purpose + +This table is used during tablet-aware restore to exchange per-SSTable metadata between +the coordinator and worker nodes. When the restore process starts, the coordinator node +populates this table with information about each SSTable extracted from the snapshot +manifests. Worker nodes then read from this table to determine which SSTables need to +be downloaded from object storage and restored locally. + +Rows are inserted with a TTL so that stale restore metadata is automatically cleaned up. + +## Schema + +~~~ +CREATE TABLE system_distributed.snapshot_sstables ( + snapshot_name text, + "keyspace" text, + "table" text, + datacenter text, + rack text, + first_token bigint, + sstable_id uuid, + last_token bigint, + toc_name text, + prefix text, + PRIMARY KEY ((snapshot_name, "keyspace", "table", datacenter, rack), first_token, sstable_id) +) +~~~ + +Column descriptions: + +| Column | Type | Description | +|--------|------|-------------| +| `snapshot_name` | text (partition key) | Name of the snapshot | +| `keyspace` | text (partition key) | Keyspace the snapshot was taken from | +| `table` | text (partition key) | Table within the keyspace | +| `datacenter` | text (partition key) | Datacenter where the SSTable is located | +| `rack` | text (partition key) | Rack where the SSTable is located | +| `first_token` | bigint (clustering key) | First token in the token range covered by this SSTable | +| `sstable_id` | uuid (clustering key) | Unique identifier for the SSTable | +| `last_token` | bigint | Last token in the token range covered by this SSTable | +| `toc_name` | text | TOC filename of the SSTable (e.g. `me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt`) | +| `prefix` | text | Prefix path in object storage where the SSTable was backed up | + +## APIs + +The following C++ APIs are provided in `db::system_distributed_keyspace`: + +- insert\_snapshot\_sstable + +- get\_snapshot\_sstables diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 5a63607873..d2df039288 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -314,6 +314,8 @@ add_scylla_test(symmetric_key_test KIND SEASTAR LIBRARIES encryption) +add_scylla_test(tablet_aware_restore_test + KIND SEASTAR) add_scylla_test(combined_tests KIND SEASTAR diff --git a/test/boost/tablet_aware_restore_test.cc b/test/boost/tablet_aware_restore_test.cc new file mode 100644 index 0000000000..d335144c73 --- /dev/null +++ b/test/boost/tablet_aware_restore_test.cc @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + + + +#include "test/lib/cql_test_env.hh" +#include "utils/assert.hh" +#include + +#include +#include +#include + +#include "db/config.hh" +#include "db/consistency_level_type.hh" +#include "db/system_distributed_keyspace.hh" + +#include "test/lib/test_utils.hh" + + +SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + auto db_cfg_ptr = make_shared(); + + return do_with_cql_env([] (cql_test_env& env) -> future<> { + auto snapshot_name = "snapshot"; + auto ks = "ks"; + auto table = "cf"; + auto dc = "dc1"; + auto rack = "r1"; + auto sstable_id = utils::UUID_gen::get_time_UUID(); + auto last_token = dht::token::from_int64(100); + auto toc_name = "me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt"; + auto prefix = "some/prefix"; + auto num_iter = 5; + + for (int i = num_iter - 1; i >= 0; --i) { + // insert some test data into snapshot_sstables table + auto first_token = dht::token::from_int64(i); + co_await env.get_system_distributed_keyspace().local().insert_snapshot_sstable(snapshot_name, ks, table, dc, rack, sstables::sstable_id(sstable_id), first_token, last_token, toc_name, prefix, db::consistency_level::ONE); + } + // read it back and check if it is correct + auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(snapshot_name, ks, table, dc, rack, db::consistency_level::ONE); + + BOOST_CHECK_EQUAL(sstables.size(), num_iter); + + for (int i = 0; i < num_iter; ++i) { + const auto& sstable = sstables[i]; + BOOST_CHECK_EQUAL(sstable.toc_name, toc_name); + BOOST_CHECK_EQUAL(sstable.prefix, prefix); + BOOST_CHECK_EQUAL(sstable.first_token, dht::token::from_int64(i)); + BOOST_CHECK_EQUAL(sstable.last_token, last_token); + BOOST_CHECK_EQUAL(sstable.sstable_id.uuid(), sstable_id); + } + + // test token range filtering: matching range should return the sstable + auto filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(-10), dht::token::from_int64(num_iter + 1)); + BOOST_CHECK_EQUAL(filtered.size(), num_iter); + + // test token range filtering: the interval is inclusive, if start and end are equal to first_token, it should return one sstable + filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(0), dht::token::from_int64(0)); + BOOST_CHECK_EQUAL(filtered.size(), 1); + + // test token range filtering: non-matching range should return nothing + auto empty = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(num_iter + 10), dht::token::from_int64(num_iter + 20)); + BOOST_CHECK_EQUAL(empty.size(), 0); + }, db_cfg_ptr); +}