Add system_distributed_keyspace.snapshot_sstables

This patch adds the snapshot_sstables table with the following
schema:
```cql
CREATE TABLE system_distributed.snapshot_sstables (
    snapshot_name text,
    keyspace text, table text,
    datacenter text, rack text,
    id uuid,
    first_token bigint, last_token bigint,
    toc_name text, prefix text)
  PRIMARY KEY ((snapshot_name, keyspace, table, datacenter, rack), first_token, id);
```
The table will be populated by the coordinator node during the restore
phase (and later on during the backup phase to accomodate live-restore).
The content of this table is meant to be consumed by the restore worker nodes
which will use this data to filter and file-based download sstables.

Fixes SCYLLADB-263

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
This commit is contained in:
Robert Bindar
2026-01-29 15:31:51 +02:00
committed by Pavel Emelyanov
parent 84517fbf98
commit daa09abc2a
7 changed files with 257 additions and 1 deletions

View File

@@ -560,6 +560,7 @@ scylla_tests = set([
'test/boost/crc_test',
'test/boost/dict_trainer_test',
'test/boost/dirty_memory_manager_test',
'test/boost/tablet_aware_restore_test',
'test/boost/double_decker_test',
'test/boost/duration_test',
'test/boost/dynamic_bitset_test',

View File

@@ -169,6 +169,36 @@ schema_ptr service_levels() {
return schema;
}
schema_ptr snapshot_sstables() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES);
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES, std::make_optional(id))
// Name of the snapshot
.with_column("snapshot_name", utf8_type, column_kind::partition_key)
// Keyspace where the snapshot was taken
.with_column("keyspace", utf8_type, column_kind::partition_key)
// Table within the keyspace
.with_column("table", utf8_type, column_kind::partition_key)
// Datacenter where this SSTable is located
.with_column("datacenter", utf8_type, column_kind::partition_key)
// Rack where this SSTable is located
.with_column("rack", utf8_type, column_kind::partition_key)
// First token in the token range covered by this SSTable
.with_column("first_token", long_type, column_kind::clustering_key)
// Unique identifier for the SSTable (UUID)
.with_column("sstable_id", uuid_type, column_kind::clustering_key)
// Last token in the token range covered by this SSTable
.with_column("last_token", long_type)
// TOC filename of the SSTable
.with_column("toc_name", utf8_type)
// Prefix path in object storage where the SSTable was backed up
.with_column("prefix", utf8_type)
.with_hash_version()
.build();
}();
return schema;
}
// This is the set of tables which this node ensures to exist in the cluster.
// It does that by announcing the creation of these schemas on initialization
// of the `system_distributed_keyspace` service (see `start()`), unless it first
@@ -186,11 +216,12 @@ static std::vector<schema_ptr> ensured_tables() {
cdc_desc(),
cdc_timestamps(),
service_levels(),
snapshot_sstables(),
};
}
std::vector<schema_ptr> system_distributed_keyspace::all_distributed_tables() {
return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels()};
return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels(), snapshot_sstables()};
}
std::vector<schema_ptr> system_distributed_keyspace::all_everywhere_tables() {
@@ -691,4 +722,62 @@ future<> system_distributed_keyspace::drop_service_level(sstring service_level_n
return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}, cql3::query_processor::cache_internal::no).discard_result();
}
future<> system_distributed_keyspace::insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, db::consistency_level cl) {
static constexpr uint64_t ttl_seconds = std::chrono::seconds(std::chrono::days(3)).count();
static const sstring query = format("INSERT INTO {}.{} (snapshot_name, \"keyspace\", \"table\", datacenter, rack, first_token, sstable_id, last_token, toc_name, prefix) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL {}", NAME, SNAPSHOT_SSTABLES, ttl_seconds);
return _qp.execute_internal(
query,
cl,
internal_distributed_query_state(),
{ std::move(snapshot_name), std::move(ks), std::move(table), std::move(dc), std::move(rack),
dht::token::to_int64(first_token), sstable_id.uuid(), dht::token::to_int64(last_token), std::move(toc_name), std::move(prefix) },
cql3::query_processor::cache_internal::yes).discard_result();
}
future<utils::chunked_vector<snapshot_sstable_entry>>
system_distributed_keyspace::get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl, std::optional<dht::token> start_token, std::optional<dht::token> end_token) const {
utils::chunked_vector<snapshot_sstable_entry> sstables;
static const sstring base_query = format("SELECT toc_name, prefix, sstable_id, first_token, last_token FROM {}.{}"
" WHERE snapshot_name = ? AND \"keyspace\" = ? AND \"table\" = ? AND datacenter = ? AND rack = ?", NAME, SNAPSHOT_SSTABLES);
auto read_row = [&] (const cql3::untyped_result_set_row& row) {
sstables.emplace_back(sstables::sstable_id(row.get_as<utils::UUID>("sstable_id")), dht::token::from_int64(row.get_as<int64_t>("first_token")), dht::token::from_int64(row.get_as<int64_t>("last_token")), row.get_as<sstring>("toc_name"), row.get_as<sstring>("prefix"));
return make_ready_future<stop_iteration>(stop_iteration::no);
};
if (start_token && end_token) {
co_await _qp.query_internal(
base_query + " AND first_token >= ? AND first_token <= ?",
cl,
{ snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token), dht::token::to_int64(*end_token) },
1000,
read_row);
} else if (start_token) {
co_await _qp.query_internal(
base_query + " AND first_token >= ?",
cl,
{ snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token) },
1000,
read_row);
} else if (end_token) {
co_await _qp.query_internal(
base_query + " AND first_token <= ?",
cl,
{ snapshot_name, ks, table, dc, rack, dht::token::to_int64(*end_token) },
1000,
read_row);
} else {
co_await _qp.query_internal(
base_query,
cl,
{ snapshot_name, ks, table, dc, rack },
1000,
read_row);
}
co_return sstables;
}
}

View File

@@ -11,12 +11,17 @@
#include "schema/schema_fwd.hh"
#include "service/qos/qos_common.hh"
#include "utils/UUID.hh"
#include "utils/chunked_vector.hh"
#include "cdc/generation_id.hh"
#include "db/consistency_level_type.hh"
#include "locator/host_id.hh"
#include "dht/token.hh"
#include "sstables/types.hh"
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <optional>
#include <unordered_map>
namespace cql3 {
@@ -34,8 +39,17 @@ namespace service {
class migration_manager;
}
namespace db {
struct snapshot_sstable_entry {
sstables::sstable_id sstable_id;
dht::token first_token;
dht::token last_token;
sstring toc_name;
sstring prefix;
};
class system_distributed_keyspace {
public:
static constexpr auto NAME = "system_distributed";
@@ -62,6 +76,10 @@ public:
* in the old table also appear in the new table, if necessary. */
static constexpr auto CDC_DESC_V1 = "cdc_streams_descriptions";
/* This table is used by the backup and restore code to store per-sstable metadata.
* The data the coordinator node puts in this table comes from the snapshot manifests. */
static constexpr auto SNAPSHOT_SSTABLES = "snapshot_sstables";
/* Information required to modify/query some system_distributed tables, passed from the caller. */
struct context {
/* How many different token owners (endpoints) are there in the token ring? */
@@ -102,6 +120,17 @@ public:
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
future<> drop_service_level(sstring service_level_name) const;
/* Inserts a single SSTable entry for a given snapshot, keyspace, table, datacenter,
* and rack. The row is written with the specified TTL (in seconds). Uses consistency
* level `EACH_QUORUM` by default.*/
future<> insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, db::consistency_level cl = db::consistency_level::EACH_QUORUM);
/* Retrieves all SSTable entries for a given snapshot, keyspace, table, datacenter, and rack.
* If `start_token` and `end_token` are provided, only entries whose `first_token` is in the range [`start_token`, `end_token`] will be returned.
* Returns a vector of `snapshot_sstable_entry` structs containing `sstable_id`, `first_token`, `last_token`,
* `toc_name`, and `prefix`. Uses consistency level `LOCAL_QUORUM` by default. */
future<utils::chunked_vector<snapshot_sstable_entry>> get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl = db::consistency_level::LOCAL_QUORUM, std::optional<dht::token> start_token = std::nullopt, std::optional<dht::token> end_token = std::nullopt) const;
private:
future<> create_tables(std::vector<schema_ptr> tables);
};

View File

@@ -167,6 +167,11 @@ All tables in a keyspace are uploaded, the destination object names will look li
or
`gs://bucket/some/prefix/to/store/data/.../sstable`
# System tables
There are a few system tables that object storage related code needs to touch in order to operate.
* [system_distributed.snapshot_sstables](docs/dev/snapshot_sstables.md) - Used during restore by worker nodes to get the list of SSTables that need to be downloaded from object storage and restored locally.
* [system.sstables](docs/dev/system_keyspace.md#systemsstables) - Used to keep track of SSTables on object storage when a keyspace is created with object storage storage_options.
# Manipulating S3 data
This section intends to give an overview of where, when and how we store data in S3 and provide a quick set of commands

View File

@@ -0,0 +1,52 @@
# system\_distributed.snapshot\_sstables
## Purpose
This table is used during tablet-aware restore to exchange per-SSTable metadata between
the coordinator and worker nodes. When the restore process starts, the coordinator node
populates this table with information about each SSTable extracted from the snapshot
manifests. Worker nodes then read from this table to determine which SSTables need to
be downloaded from object storage and restored locally.
Rows are inserted with a TTL so that stale restore metadata is automatically cleaned up.
## Schema
~~~
CREATE TABLE system_distributed.snapshot_sstables (
snapshot_name text,
"keyspace" text,
"table" text,
datacenter text,
rack text,
first_token bigint,
sstable_id uuid,
last_token bigint,
toc_name text,
prefix text,
PRIMARY KEY ((snapshot_name, "keyspace", "table", datacenter, rack), first_token, sstable_id)
)
~~~
Column descriptions:
| Column | Type | Description |
|--------|------|-------------|
| `snapshot_name` | text (partition key) | Name of the snapshot |
| `keyspace` | text (partition key) | Keyspace the snapshot was taken from |
| `table` | text (partition key) | Table within the keyspace |
| `datacenter` | text (partition key) | Datacenter where the SSTable is located |
| `rack` | text (partition key) | Rack where the SSTable is located |
| `first_token` | bigint (clustering key) | First token in the token range covered by this SSTable |
| `sstable_id` | uuid (clustering key) | Unique identifier for the SSTable |
| `last_token` | bigint | Last token in the token range covered by this SSTable |
| `toc_name` | text | TOC filename of the SSTable (e.g. `me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt`) |
| `prefix` | text | Prefix path in object storage where the SSTable was backed up |
## APIs
The following C++ APIs are provided in `db::system_distributed_keyspace`:
- insert\_snapshot\_sstable
- get\_snapshot\_sstables

View File

@@ -314,6 +314,8 @@ add_scylla_test(symmetric_key_test
KIND SEASTAR
LIBRARIES
encryption)
add_scylla_test(tablet_aware_restore_test
KIND SEASTAR)
add_scylla_test(combined_tests
KIND SEASTAR

View File

@@ -0,0 +1,78 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/cql_test_env.hh"
#include "utils/assert.hh"
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/test_fixture.hh>
#include "db/config.hh"
#include "db/consistency_level_type.hh"
#include "db/system_distributed_keyspace.hh"
#include "test/lib/test_utils.hh"
SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
auto db_cfg_ptr = make_shared<db::config>();
return do_with_cql_env([] (cql_test_env& env) -> future<> {
auto snapshot_name = "snapshot";
auto ks = "ks";
auto table = "cf";
auto dc = "dc1";
auto rack = "r1";
auto sstable_id = utils::UUID_gen::get_time_UUID();
auto last_token = dht::token::from_int64(100);
auto toc_name = "me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt";
auto prefix = "some/prefix";
auto num_iter = 5;
for (int i = num_iter - 1; i >= 0; --i) {
// insert some test data into snapshot_sstables table
auto first_token = dht::token::from_int64(i);
co_await env.get_system_distributed_keyspace().local().insert_snapshot_sstable(snapshot_name, ks, table, dc, rack, sstables::sstable_id(sstable_id), first_token, last_token, toc_name, prefix, db::consistency_level::ONE);
}
// read it back and check if it is correct
auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(snapshot_name, ks, table, dc, rack, db::consistency_level::ONE);
BOOST_CHECK_EQUAL(sstables.size(), num_iter);
for (int i = 0; i < num_iter; ++i) {
const auto& sstable = sstables[i];
BOOST_CHECK_EQUAL(sstable.toc_name, toc_name);
BOOST_CHECK_EQUAL(sstable.prefix, prefix);
BOOST_CHECK_EQUAL(sstable.first_token, dht::token::from_int64(i));
BOOST_CHECK_EQUAL(sstable.last_token, last_token);
BOOST_CHECK_EQUAL(sstable.sstable_id.uuid(), sstable_id);
}
// test token range filtering: matching range should return the sstable
auto filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(
snapshot_name, ks, table, dc, rack, db::consistency_level::ONE,
dht::token::from_int64(-10), dht::token::from_int64(num_iter + 1));
BOOST_CHECK_EQUAL(filtered.size(), num_iter);
// test token range filtering: the interval is inclusive, if start and end are equal to first_token, it should return one sstable
filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(
snapshot_name, ks, table, dc, rack, db::consistency_level::ONE,
dht::token::from_int64(0), dht::token::from_int64(0));
BOOST_CHECK_EQUAL(filtered.size(), 1);
// test token range filtering: non-matching range should return nothing
auto empty = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(
snapshot_name, ks, table, dc, rack, db::consistency_level::ONE,
dht::token::from_int64(num_iter + 10), dht::token::from_int64(num_iter + 20));
BOOST_CHECK_EQUAL(empty.size(), 0);
}, db_cfg_ptr);
}