diff --git a/CMakeLists.txt b/CMakeLists.txt index 107e38cf2f..1bb41a58c6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,6 +299,7 @@ target_sources(scylla-main serializer.cc service/direct_failure_detector/failure_detector.cc sstables_loader.cc + sstables_loader_helpers.cc table_helper.cc tasks/task_handler.cc tasks/task_manager.cc diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 6f4633ecc8..e5d35cf920 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -974,6 +974,54 @@ } ] }, + { + "path":"/storage_service/tablets/restore", + "operations":[ + { + "method":"POST", + "summary":"Starts copying SSTables from a designated bucket in object storage to a specified keyspace", + "type":"string", + "nickname":"tablet_aware_restore", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"keyspace", + "description":"Name of a keyspace to copy SSTables to", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"table", + "description":"Name of a table to copy SSTables to", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"snapshot", + "description":"Name of the snapshot to restore from", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"backup_location", + "description":"JSON array of backup location objects. Each object must contain: 'datacenter' (string), 'endpoint' (string), 'bucket' (string), and 'manifests' (array of strings). Currently, the array must contain exactly one entry.", + "required":true, + "allowMultiple":false, + "type":"array", + "paramType":"body" + } + ] + } + ] + }, { "path":"/storage_service/keyspace_compaction/{keyspace}", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index e14e06996e..294a13c2c3 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -527,11 +527,52 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded& co_return json::json_return_type(fmt::to_string(task_id)); }); + ss::tablet_aware_restore.set(r, [&ctx, &sst_loader](std::unique_ptr req) -> future { + std::string keyspace = req->get_query_param("keyspace"); + std::string table = req->get_query_param("table"); + std::string snapshot = req->get_query_param("snapshot"); + + rjson::chunked_content content = co_await util::read_entire_stream(*req->content_stream); + rjson::value parsed = rjson::parse(std::move(content)); + if (!parsed.IsArray()) { + throw httpd::bad_param_exception("backup locations (in body) must be a JSON array"); + } + + const auto& locations = parsed.GetArray(); + if (locations.Size() != 1) { + throw httpd::bad_param_exception("backup locations array (in body) must contain exactly one entry"); + } + + const auto& location = locations[0]; + if (!location.IsObject()) { + throw httpd::bad_param_exception("backup location (in body) must be a JSON object"); + } + + auto endpoint = rjson::to_string_view(location["endpoint"]); + auto bucket = rjson::to_string_view(location["bucket"]); + auto dc = rjson::to_string_view(location["datacenter"]); + + if (!location.HasMember("manifests") || !location["manifests"].IsArray()) { + throw httpd::bad_param_exception("backup location entry must have 'manifests' array"); + } + + auto manifests = location["manifests"].GetArray() | + std::views::transform([] (const auto& m) { return sstring(rjson::to_string_view(m)); }) | + std::ranges::to>(); + + apilog.info("Tablet restore for {}:{} called. Parameters: snapshot={} datacenter={} endpoint={} bucket={} manifests_count={}", + keyspace, table, snapshot, dc, endpoint, bucket, manifests.size()); + + auto table_id = validate_table(ctx.db.local(), keyspace, table); + auto task_id = co_await sst_loader.local().restore_tablets(table_id, keyspace, table, snapshot, sstring(endpoint), sstring(bucket), std::move(manifests)); + co_return json::json_return_type(fmt::to_string(task_id)); + }); } void unset_sstables_loader(http_context& ctx, routes& r) { ss::load_new_ss_tables.unset(r); ss::start_restore.unset(r); + ss::tablet_aware_restore.unset(r); } void set_view_builder(http_context& ctx, routes& r, sharded& vb, sharded& g) { diff --git a/configure.py b/configure.py index 8f23fb688b..6dbed29d7a 100755 --- a/configure.py +++ b/configure.py @@ -560,6 +560,7 @@ scylla_tests = set([ 'test/boost/crc_test', 'test/boost/dict_trainer_test', 'test/boost/dirty_memory_manager_test', + 'test/boost/tablet_aware_restore_test', 'test/boost/double_decker_test', 'test/boost/duration_test', 'test/boost/dynamic_bitset_test', @@ -1325,6 +1326,7 @@ scylla_core = (['message/messaging_service.cc', 'ent/ldap/ldap_connection.cc', 'reader_concurrency_semaphore.cc', 'sstables_loader.cc', + 'sstables_loader_helpers.cc', 'utils/utf8.cc', 'utils/ascii.cc', 'utils/like_matcher.cc', @@ -1464,6 +1466,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/frozen_mutation.idl.hh', 'idl/reconcilable_result.idl.hh', 'idl/streaming.idl.hh', + 'idl/sstables_loader.idl.hh', 'idl/paging_state.idl.hh', 'idl/frozen_schema.idl.hh', 'idl/repair.idl.hh', diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index b922720569..9c871298f9 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -169,6 +169,38 @@ schema_ptr service_levels() { return schema; } +schema_ptr snapshot_sstables() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES, std::make_optional(id)) + // Name of the snapshot + .with_column("snapshot_name", utf8_type, column_kind::partition_key) + // Keyspace where the snapshot was taken + .with_column("keyspace", utf8_type, column_kind::partition_key) + // Table within the keyspace + .with_column("table", utf8_type, column_kind::partition_key) + // Datacenter where this SSTable is located + .with_column("datacenter", utf8_type, column_kind::partition_key) + // Rack where this SSTable is located + .with_column("rack", utf8_type, column_kind::partition_key) + // First token in the token range covered by this SSTable + .with_column("first_token", long_type, column_kind::clustering_key) + // Unique identifier for the SSTable (UUID) + .with_column("sstable_id", uuid_type, column_kind::clustering_key) + // Last token in the token range covered by this SSTable + .with_column("last_token", long_type) + // TOC filename of the SSTable + .with_column("toc_name", utf8_type) + // Prefix path in object storage where the SSTable was backed up + .with_column("prefix", utf8_type) + // Flag if the SSTable was downloaded already + .with_column("downloaded", boolean_type) + .with_hash_version() + .build(); + }(); + return schema; +} + // This is the set of tables which this node ensures to exist in the cluster. // It does that by announcing the creation of these schemas on initialization // of the `system_distributed_keyspace` service (see `start()`), unless it first @@ -186,11 +218,12 @@ static std::vector ensured_tables() { cdc_desc(), cdc_timestamps(), service_levels(), + snapshot_sstables(), }; } std::vector system_distributed_keyspace::all_distributed_tables() { - return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels()}; + return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels(), snapshot_sstables()}; } std::vector system_distributed_keyspace::all_everywhere_tables() { @@ -691,4 +724,81 @@ future<> system_distributed_keyspace::drop_service_level(sstring service_level_n return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}, cql3::query_processor::cache_internal::no).discard_result(); } +future<> system_distributed_keyspace::insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, is_downloaded downloaded, db::consistency_level cl) { + static const sstring query = format("INSERT INTO {}.{} (snapshot_name, \"keyspace\", \"table\", datacenter, rack, first_token, sstable_id, last_token, toc_name, prefix, downloaded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL {}", NAME, SNAPSHOT_SSTABLES, SNAPSHOT_SSTABLES_TTL_SECONDS); + + return _qp.execute_internal( + query, + cl, + internal_distributed_query_state(), + { std::move(snapshot_name), std::move(ks), std::move(table), std::move(dc), std::move(rack), + dht::token::to_int64(first_token), sstable_id.uuid(), dht::token::to_int64(last_token), std::move(toc_name), std::move(prefix), downloaded == is_downloaded::yes ? true : false }, + cql3::query_processor::cache_internal::yes).discard_result(); } + +future> +system_distributed_keyspace::get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl, std::optional start_token, std::optional end_token) const { + utils::chunked_vector sstables; + + static const sstring base_query = format("SELECT toc_name, prefix, sstable_id, first_token, last_token, downloaded FROM {}.{}" + " WHERE snapshot_name = ? AND \"keyspace\" = ? AND \"table\" = ? AND datacenter = ? AND rack = ?", NAME, SNAPSHOT_SSTABLES); + + auto read_row = [&] (const cql3::untyped_result_set_row& row) { + sstables.emplace_back(sstables::sstable_id(row.get_as("sstable_id")), dht::token::from_int64(row.get_as("first_token")), dht::token::from_int64(row.get_as("last_token")), row.get_as("toc_name"), row.get_as("prefix"), is_downloaded(row.get_as("downloaded"))); + return make_ready_future(stop_iteration::no); + }; + + if (start_token && end_token) { + co_await _qp.query_internal( + base_query + " AND first_token >= ? AND first_token <= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token), dht::token::to_int64(*end_token) }, + 1000, + read_row); + } else if (start_token) { + co_await _qp.query_internal( + base_query + " AND first_token >= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*start_token) }, + 1000, + read_row); + } else if (end_token) { + co_await _qp.query_internal( + base_query + " AND first_token <= ?", + cl, + { snapshot_name, ks, table, dc, rack, dht::token::to_int64(*end_token) }, + 1000, + read_row); + } else { + co_await _qp.query_internal( + base_query, + cl, + { snapshot_name, ks, table, dc, rack }, + 1000, + read_row); + } + + co_return sstables; +} + +future<> system_distributed_keyspace::update_sstable_download_status(sstring snapshot_name, + sstring ks, + sstring table, + sstring dc, + sstring rack, + sstables::sstable_id sstable_id, + dht::token start_token, + is_downloaded downloaded) const { + static const sstring update_query = format("UPDATE {}.{} USING TTL {} SET downloaded = ? WHERE snapshot_name = ? AND \"keyspace\" = ? AND \"table\" = ? AND " + "datacenter = ? AND rack = ? AND first_token = ? AND sstable_id = ?", + NAME, + SNAPSHOT_SSTABLES, + SNAPSHOT_SSTABLES_TTL_SECONDS); + co_await _qp.execute_internal(update_query, + consistency_level::ONE, + internal_distributed_query_state(), + {downloaded == is_downloaded::yes ? true : false, snapshot_name, ks, table, dc, rack, dht::token::to_int64(start_token), sstable_id.uuid()}, + cql3::query_processor::cache_internal::no); +} + +} // namespace db diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 9b94985ca9..17e188e468 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -11,12 +11,18 @@ #include "schema/schema_fwd.hh" #include "service/qos/qos_common.hh" #include "utils/UUID.hh" +#include "utils/chunked_vector.hh" #include "cdc/generation_id.hh" +#include "db/consistency_level_type.hh" #include "locator/host_id.hh" +#include "dht/token.hh" +#include "sstables/types.hh" #include #include +#include +#include #include namespace cql3 { @@ -34,8 +40,20 @@ namespace service { class migration_manager; } + namespace db { +using is_downloaded = bool_class; + +struct snapshot_sstable_entry { + sstables::sstable_id sstable_id; + dht::token first_token; + dht::token last_token; + sstring toc_name; + sstring prefix; + is_downloaded downloaded{is_downloaded::no}; +}; + class system_distributed_keyspace { public: static constexpr auto NAME = "system_distributed"; @@ -62,6 +80,12 @@ public: * in the old table also appear in the new table, if necessary. */ static constexpr auto CDC_DESC_V1 = "cdc_streams_descriptions"; + /* This table is used by the backup and restore code to store per-sstable metadata. + * The data the coordinator node puts in this table comes from the snapshot manifests. */ + static constexpr auto SNAPSHOT_SSTABLES = "snapshot_sstables"; + + static constexpr uint64_t SNAPSHOT_SSTABLES_TTL_SECONDS = std::chrono::seconds(std::chrono::days(3)).count(); + /* Information required to modify/query some system_distributed tables, passed from the caller. */ struct context { /* How many different token owners (endpoints) are there in the token ring? */ @@ -110,6 +134,26 @@ public: future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const; future<> drop_service_level(sstring service_level_name) const; + /* Inserts a single SSTable entry for a given snapshot, keyspace, table, datacenter, + * and rack. The row is written with the specified TTL (in seconds). Uses consistency + * level `EACH_QUORUM` by default.*/ + future<> insert_snapshot_sstable(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, sstables::sstable_id sstable_id, dht::token first_token, dht::token last_token, sstring toc_name, sstring prefix, is_downloaded downloaded, db::consistency_level cl = db::consistency_level::EACH_QUORUM); + + /* Retrieves all SSTable entries for a given snapshot, keyspace, table, datacenter, and rack. + * If `start_token` and `end_token` are provided, only entries whose `first_token` is in the range [`start_token`, `end_token`] will be returned. + * Returns a vector of `snapshot_sstable_entry` structs containing `sstable_id`, `first_token`, `last_token`, + * `toc_name`, and `prefix`. Uses consistency level `LOCAL_QUORUM` by default. */ + future> get_snapshot_sstables(sstring snapshot_name, sstring ks, sstring table, sstring dc, sstring rack, db::consistency_level cl = db::consistency_level::LOCAL_QUORUM, std::optional start_token = std::nullopt, std::optional end_token = std::nullopt) const; + + future<> update_sstable_download_status(sstring snapshot_name, + sstring ks, + sstring table, + sstring dc, + sstring rack, + sstables::sstable_id sstable_id, + dht::token start_token, + is_downloaded downloaded) const; + private: future<> create_tables(std::vector tables); }; diff --git a/docs/dev/object_storage.md b/docs/dev/object_storage.md index 03547e9a29..f369c460de 100644 --- a/docs/dev/object_storage.md +++ b/docs/dev/object_storage.md @@ -167,6 +167,11 @@ All tables in a keyspace are uploaded, the destination object names will look li or `gs://bucket/some/prefix/to/store/data/.../sstable` +# System tables +There are a few system tables that object storage related code needs to touch in order to operate. +* [system_distributed.snapshot_sstables](docs/dev/snapshot_sstables.md) - Used during restore by worker nodes to get the list of SSTables that need to be downloaded from object storage and restored locally. +* [system.sstables](docs/dev/system_keyspace.md#systemsstables) - Used to keep track of SSTables on object storage when a keyspace is created with object storage storage_options. + # Manipulating S3 data This section intends to give an overview of where, when and how we store data in S3 and provide a quick set of commands diff --git a/docs/dev/snapshot_sstables.md b/docs/dev/snapshot_sstables.md new file mode 100644 index 0000000000..cceb661ae0 --- /dev/null +++ b/docs/dev/snapshot_sstables.md @@ -0,0 +1,52 @@ +# system\_distributed.snapshot\_sstables + +## Purpose + +This table is used during tablet-aware restore to exchange per-SSTable metadata between +the coordinator and worker nodes. When the restore process starts, the coordinator node +populates this table with information about each SSTable extracted from the snapshot +manifests. Worker nodes then read from this table to determine which SSTables need to +be downloaded from object storage and restored locally. + +Rows are inserted with a TTL so that stale restore metadata is automatically cleaned up. + +## Schema + +~~~ +CREATE TABLE system_distributed.snapshot_sstables ( + snapshot_name text, + "keyspace" text, + "table" text, + datacenter text, + rack text, + first_token bigint, + sstable_id uuid, + last_token bigint, + toc_name text, + prefix text, + PRIMARY KEY ((snapshot_name, "keyspace", "table", datacenter, rack), first_token, sstable_id) +) +~~~ + +Column descriptions: + +| Column | Type | Description | +|--------|------|-------------| +| `snapshot_name` | text (partition key) | Name of the snapshot | +| `keyspace` | text (partition key) | Keyspace the snapshot was taken from | +| `table` | text (partition key) | Table within the keyspace | +| `datacenter` | text (partition key) | Datacenter where the SSTable is located | +| `rack` | text (partition key) | Rack where the SSTable is located | +| `first_token` | bigint (clustering key) | First token in the token range covered by this SSTable | +| `sstable_id` | uuid (clustering key) | Unique identifier for the SSTable | +| `last_token` | bigint | Last token in the token range covered by this SSTable | +| `toc_name` | text | TOC filename of the SSTable (e.g. `me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt`) | +| `prefix` | text | Prefix path in object storage where the SSTable was backed up | + +## APIs + +The following C++ APIs are provided in `db::system_distributed_keyspace`: + +- insert\_snapshot\_sstable + +- get\_snapshot\_sstables diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index 518ec32835..b64fc00f3e 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -53,6 +53,7 @@ set(idl_headers group0.idl.hh hinted_handoff.idl.hh sstables.idl.hh + sstables_loader.idl.hh storage_proxy.idl.hh storage_service.idl.hh strong_consistency/state_machine.idl.hh diff --git a/idl/sstables_loader.idl.hh b/idl/sstables_loader.idl.hh new file mode 100644 index 0000000000..83cd353487 --- /dev/null +++ b/idl/sstables_loader.idl.hh @@ -0,0 +1,12 @@ +/* + * Copyright 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +class restore_result { +}; + +verb [[]] restore_tablet (raft::server_id dst_id, locator::global_tablet_id gid) -> restore_result; diff --git a/locator/tablets.cc b/locator/tablets.cc index b45ae2e72f..7889c7dc0e 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -90,6 +90,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage return write_replica_set_selector::previous; case tablet_transition_stage::end_migration: return write_replica_set_selector::next; + case tablet_transition_stage::restore: + return write_replica_set_selector::previous; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -123,6 +125,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) return read_replica_set_selector::previous; case tablet_transition_stage::end_migration: return read_replica_set_selector::next; + case tablet_transition_stage::restore: + return read_replica_set_selector::previous; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -131,12 +135,14 @@ tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_transition_kind transition, tablet_replica_set next, std::optional pending_replica, - service::session_id session_id) + service::session_id session_id, + std::optional restore_cfg) : stage(stage) , transition(transition) , next(std::move(next)) , pending_replica(std::move(pending_replica)) , session_id(session_id) + , restore_cfg(std::move(restore_cfg)) , writes(get_selector_for_writes(stage)) , reads(get_selector_for_reads(stage)) { } @@ -186,12 +192,20 @@ tablet_migration_streaming_info get_migration_streaming_info(const locator::topo return result; } - case tablet_transition_kind::repair: + case tablet_transition_kind::repair: { auto s = std::unordered_set(tinfo.replicas.begin(), tinfo.replicas.end()); result.stream_weight = locator::tablet_migration_stream_weight_repair; result.read_from = s; result.written_to = std::move(s); return result; + } + case tablet_transition_kind::restore: { + auto s = std::unordered_set(tinfo.replicas.begin(), tinfo.replicas.end()); + result.stream_weight = locator::tablet_migration_stream_weight_restore; + result.read_from = s; + result.written_to = std::move(s); + return result; + } } on_internal_error(tablet_logger, format("Invalid tablet transition kind: {}", static_cast(trinfo.transition))); } @@ -847,6 +861,7 @@ static const std::unordered_map tablet_transit {tablet_transition_stage::cleanup_target, "cleanup_target"}, {tablet_transition_stage::revert_migration, "revert_migration"}, {tablet_transition_stage::end_migration, "end_migration"}, + {tablet_transition_stage::restore, "restore"}, }; static const std::unordered_map tablet_transition_stage_from_name = std::invoke([] { @@ -880,6 +895,7 @@ static const std::unordered_map tablet_transiti {tablet_transition_kind::rebuild, "rebuild"}, {tablet_transition_kind::rebuild_v2, "rebuild_v2"}, {tablet_transition_kind::repair, "repair"}, + {tablet_transition_kind::restore, "restore"}, }; static const std::unordered_map tablet_transition_kind_from_name = std::invoke([] { @@ -1126,6 +1142,8 @@ std::optional load_stats::get_tablet_size_in_transition(host_id host, } case tablet_transition_kind::intranode_migration: [[fallthrough]]; + case tablet_transition_kind::restore: + [[fallthrough]]; case tablet_transition_kind::repair: break; } diff --git a/locator/tablets.hh b/locator/tablets.hh index 333d746c44..13cced73b1 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -260,6 +260,13 @@ struct tablet_task_info { static std::unordered_set deserialize_repair_dcs_filter(sstring filter); }; +struct restore_config { + sstring snapshot_name; + sstring endpoint; + sstring bucket; + bool operator==(const restore_config&) const = default; +}; + /// Stores information about a single tablet. struct tablet_info { tablet_replica_set replicas; @@ -315,6 +322,7 @@ enum class tablet_transition_stage { end_migration, repair, end_repair, + restore, }; enum class tablet_transition_kind { @@ -337,6 +345,9 @@ enum class tablet_transition_kind { // Repair the tablet replicas repair, + + // Download sstables for tablet + restore, }; tablet_transition_kind choose_rebuild_transition_kind(const gms::feature_service& features); @@ -360,6 +371,7 @@ struct tablet_transition_info { tablet_replica_set next; std::optional pending_replica; // Optimization (next - tablet_info::replicas) service::session_id session_id; + std::optional restore_cfg; write_replica_set_selector writes; read_replica_set_selector reads; @@ -367,7 +379,8 @@ struct tablet_transition_info { tablet_transition_kind kind, tablet_replica_set next, std::optional pending_replica, - service::session_id session_id = {}); + service::session_id session_id = {}, + std::optional rcfg = std::nullopt); bool operator==(const tablet_transition_info&) const = default; }; @@ -398,6 +411,7 @@ tablet_transition_info migration_to_transition_info(const tablet_info&, const ta /// Describes streaming required for a given tablet transition. constexpr int tablet_migration_stream_weight_default = 1; constexpr int tablet_migration_stream_weight_repair = 2; +constexpr int tablet_migration_stream_weight_restore = 2; struct tablet_migration_streaming_info { std::unordered_set read_from; std::unordered_set written_to; diff --git a/main.cc b/main.cc index 6455be59ad..a52b9eb036 100644 --- a/main.cc +++ b/main.cc @@ -2242,7 +2242,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); checkpoint(stop_signal, "starting sstables loader"); - sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), dbcfg.streaming_scheduling_group).get(); + sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), std::ref(sys_dist_ks), dbcfg.streaming_scheduling_group).get(); auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] { sst_loader.stop().get(); }); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 20ccb921a9..947c8d5df1 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -24,6 +24,7 @@ #include "service/storage_service.hh" #include "service/qos/service_level_controller.hh" #include "streaming/prepare_message.hh" +#include "sstables_loader.hh" #include "gms/gossip_digest_syn.hh" #include "gms/gossip_digest_ack.hh" #include "gms/gossip_digest_ack2.hh" @@ -139,6 +140,7 @@ #include "idl/tasks.dist.impl.hh" #include "idl/forward_cql.dist.impl.hh" #include "gms/feature_service.hh" +#include "idl/sstables_loader.dist.impl.hh" namespace netw { @@ -734,6 +736,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::TABLE_LOAD_STATS: case messaging_verb::WORK_ON_VIEW_BUILDING_TASKS: case messaging_verb::SNAPSHOT_WITH_TABLETS: + case messaging_verb::RESTORE_TABLET: return 1; case messaging_verb::CLIENT_ID: case messaging_verb::MUTATION: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 658d7b74e3..2c19947d03 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -214,7 +214,8 @@ enum class messaging_verb : int32_t { RAFT_READ_BARRIER = 85, FORWARD_CQL_EXECUTE = 86, FORWARD_CQL_PREPARE = 87, - LAST = 88, + RESTORE_TABLET = 88, + LAST = 89, }; } // namespace netw diff --git a/replica/table.cc b/replica/table.cc index 19ee9956c1..1cce24ee30 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -3251,7 +3251,8 @@ bool has_size_on_leaving (locator::tablet_transition_stage stage) { case locator::tablet_transition_stage::revert_migration: [[fallthrough]]; case locator::tablet_transition_stage::rebuild_repair: [[fallthrough]]; case locator::tablet_transition_stage::repair: [[fallthrough]]; - case locator::tablet_transition_stage::end_repair: + case locator::tablet_transition_stage::end_repair: [[fallthrough]]; + case locator::tablet_transition_stage::restore: return true; case locator::tablet_transition_stage::cleanup: [[fallthrough]]; case locator::tablet_transition_stage::end_migration: @@ -3274,7 +3275,8 @@ bool has_size_on_pending (locator::tablet_transition_stage stage) { case locator::tablet_transition_stage::cleanup: [[fallthrough]]; case locator::tablet_transition_stage::end_migration: [[fallthrough]]; case locator::tablet_transition_stage::repair: [[fallthrough]]; - case locator::tablet_transition_stage::end_repair: + case locator::tablet_transition_stage::end_repair: [[fallthrough]]; + case locator::tablet_transition_stage::restore: return true; } } diff --git a/replica/tablet_mutation_builder.hh b/replica/tablet_mutation_builder.hh index 698fa83378..1d618f2c03 100644 --- a/replica/tablet_mutation_builder.hh +++ b/replica/tablet_mutation_builder.hh @@ -50,6 +50,9 @@ public: tablet_mutation_builder& set_resize_task_info(locator::tablet_task_info info, const gms::feature_service& features); tablet_mutation_builder& del_resize_task_info(const gms::feature_service& features); tablet_mutation_builder& set_base_table(table_id base_table); + tablet_mutation_builder& set_restore_config(dht::token last_token, locator::restore_config rcfg); + tablet_mutation_builder& del_restore_config(dht::token last_token); + mutation build() { return std::move(_m); diff --git a/replica/tablets.cc b/replica/tablets.cc index c8b707e724..aa7f1ddb14 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -40,6 +40,9 @@ static thread_local auto tablet_task_info_type = user_type_impl::get_instance( static thread_local auto replica_type = tuple_type_impl::get_instance({uuid_type, int32_type}); static thread_local auto replica_set_type = list_type_impl::get_instance(replica_type, false); static thread_local auto tablet_info_type = tuple_type_impl::get_instance({long_type, long_type, replica_set_type}); +static thread_local auto restore_config_type = user_type_impl::get_instance( + "system", "restore_config", {"snapshot_name", "endpoint", "bucket"}, + {utf8_type, utf8_type, utf8_type}, false); data_type get_replica_set_type() { return replica_set_type; @@ -52,6 +55,7 @@ data_type get_tablet_info_type() { void tablet_add_repair_scheduler_user_types(const sstring& ks, replica::database& db) { db.find_keyspace(ks).add_user_type(repair_scheduler_config_type); db.find_keyspace(ks).add_user_type(tablet_task_info_type); + db.find_keyspace(ks).add_user_type(restore_config_type); } static bool strongly_consistent_tables_enabled = false; @@ -87,7 +91,8 @@ schema_ptr make_tablets_schema() { .with_column("repair_incremental_mode", utf8_type) .with_column("migration_task_info", tablet_task_info_type) .with_column("resize_task_info", tablet_task_info_type, column_kind::static_column) - .with_column("base_table", uuid_type, column_kind::static_column); + .with_column("base_table", uuid_type, column_kind::static_column) + .with_column("restore_config", restore_config_type); if (strongly_consistent_tables_enabled) { builder @@ -221,6 +226,15 @@ data_value tablet_task_info_to_data_value(const locator::tablet_task_info& info) return result; }; +data_value restore_config_to_data_value(const locator::restore_config& cfg) { + data_value result = make_user_value(restore_config_type, { + data_value(cfg.snapshot_name), + data_value(cfg.endpoint), + data_value(cfg.bucket), + }); + return result; +}; + data_value repair_scheduler_config_to_data_value(const locator::repair_scheduler_config& config) { data_value result = make_user_value(repair_scheduler_config_type, { data_value(config.auto_repair_enabled), @@ -444,6 +458,12 @@ tablet_mutation_builder::set_repair_task_info(dht::token last_token, locator::ta return *this; } +tablet_mutation_builder& +tablet_mutation_builder::set_restore_config(dht::token last_token, locator::restore_config rcfg) { + _m.set_clustered_cell(get_ck(last_token), "restore_config", restore_config_to_data_value(rcfg), _ts); + return *this; +} + tablet_mutation_builder& tablet_mutation_builder::del_repair_task_info(dht::token last_token, const gms::feature_service& features) { auto col = _s->get_column_definition("repair_task_info"); @@ -455,6 +475,13 @@ tablet_mutation_builder::del_repair_task_info(dht::token last_token, const gms:: return *this; } +tablet_mutation_builder& +tablet_mutation_builder::del_restore_config(dht::token last_token) { + auto col = _s->get_column_definition("restore_config"); + _m.set_clustered_cell(get_ck(last_token), *col, atomic_cell::make_dead(_ts, gc_clock::now())); + return *this; +} + tablet_mutation_builder& tablet_mutation_builder::set_migration_task_info(dht::token last_token, locator::tablet_task_info migration_task_info, const gms::feature_service& features) { if (features.tablet_migration_virtual_task) { @@ -545,6 +572,22 @@ locator::tablet_task_info deserialize_tablet_task_info(cql3::untyped_result_set_ tablet_task_info_type->deserialize_value(raw_value)); } +locator::restore_config restore_config_from_cell(const data_value& v) { + std::vector dv = value_cast(v); + auto result = locator::restore_config{ + value_cast(dv[0]), + value_cast(dv[1]), + value_cast(dv[2]), + }; + return result; +} + +static +locator::restore_config deserialize_restore_config(cql3::untyped_result_set_row::view_type raw_value) { + return restore_config_from_cell( + restore_config_type->deserialize_value(raw_value)); +} + locator::repair_scheduler_config repair_scheduler_config_from_cell(const data_value& v) { std::vector dv = value_cast(v); auto result = locator::repair_scheduler_config{ @@ -746,6 +789,11 @@ tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map } } + std::optional restore_cfg; + if (row.has("restore_config")) { + restore_cfg = deserialize_restore_config(row.get_view("restore_config")); + } + locator::tablet_task_info migration_task_info; if (row.has("migration_task_info")) { migration_task_info = deserialize_tablet_task_info(row.get_view("migration_task_info")); @@ -769,7 +817,7 @@ tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map session_id = service::session_id(row.get_as("session")); } map.set_tablet_transition_info(tid, tablet_transition_info{stage, transition, - std::move(new_tablet_replicas), pending_replica, session_id}); + std::move(new_tablet_replicas), pending_replica, session_id, std::move(restore_cfg)}); } tablet_logger.debug("Set sstables_repaired_at={} table={} tablet={}", sstables_repaired_at, table, tid); diff --git a/service/storage_service.cc b/service/storage_service.cc index d759194c25..ddc547f764 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5561,6 +5561,71 @@ future<> storage_service::del_tablet_replica(table_id table, dht::token token, l }); } +future<> storage_service::restore_tablets(table_id table, sstring snap_name, sstring endpoint, sstring bucket) { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // group0 is only set on shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.restore_tablets(table, snap_name, endpoint, bucket); + }); + } + + // Holding tm around transit_tablet() can lead to deadlock, if state machine is busy + // with something which executes a barrier. The barrier will wait for tm to die, and + // transit_tablet() will wait for the barrier to finish. + // Due to that, we first collect tablet boundaries, then prepare and submit transition + // mutations. Since this code is called with equal min:max tokens set for the table, + // the tablet map cannot split and merge and, thus, the static vector of tokens should + // map to correct tablet boundaries throughout the whole operation + utils::chunked_vector> tablets; + { + const auto tm = get_token_metadata_ptr(); + const auto& tmap = tm->tablets().get_tablet_map(table); + co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { + auto last_token = tmap.get_last_token(tid); + tablets.push_back(std::make_pair(tid, last_token)); + return make_ready_future<>(); + }); + } + + auto wait_one_transition = [this] (locator::global_tablet_id gid) { + return _topology_state_machine.event.wait([this, gid] { + auto& tmap = get_token_metadata().tablets().get_tablet_map(gid.table); + return !tmap.get_tablet_transition_info(gid.tablet); + }); + }; + + std::vector> wait; + co_await coroutine::parallel_for_each(tablets, [&] (const auto& tablet) -> future<> { + auto [ tid, last_token ] = tablet; + auto gid = locator::global_tablet_id{table, tid}; + while (true) { + auto success = co_await try_transit_tablet(table, last_token, [&] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) { + utils::chunked_vector updates; + updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table) + .set_stage(last_token, locator::tablet_transition_stage::restore) + .set_new_replicas(last_token, tmap.get_tablet_info(tid).replicas) + .set_restore_config(last_token, locator::restore_config{ snap_name, endpoint, bucket }) + .set_transition(last_token, locator::tablet_transition_kind::restore) + .build()); + + sstring reason = format("Restoring tablet {}", gid); + return std::make_tuple(std::move(updates), std::move(reason)); + }); + if (success) { + wait.emplace_back(wait_one_transition(gid)); + break; + } + slogger.debug("Tablet is in transition, waiting"); + co_await wait_one_transition(gid); + } + }); + + co_await when_all_succeed(wait.begin(), wait.end()).discard_result(); + slogger.info("Restoring {} finished", table); +} + future storage_service::load_stats_for_tablet_based_tables() { auto holder = _async_gate.hold(); @@ -5643,6 +5708,21 @@ future storage_service::load_stats_for_tablet_based_tables( } future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) { + auto success = co_await try_transit_tablet(table, token, std::move(prepare_mutations)); + if (!success) { + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + auto tid = tmap.get_tablet_id(token); + throw std::runtime_error(fmt::format("Tablet {} is in transition", locator::global_tablet_id{table, tid})); + } + + // Wait for transition to finish. + co_await _topology_state_machine.event.when([&] { + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token)); + }); +} + +future storage_service::try_transit_tablet(table_id table, dht::token token, noncopyable_function, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) { while (true) { auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{}); bool topology_busy; @@ -5662,7 +5742,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco auto& tmap = get_token_metadata().tablets().get_tablet_map(table); auto tid = tmap.get_tablet_id(token); if (tmap.get_tablet_transition_info(tid)) { - throw std::runtime_error(fmt::format("Tablet {} is in transition", locator::global_tablet_id{table, tid})); + co_return false; } auto [ updates, reason ] = prepare_mutations(tmap, guard.write_timestamp()); @@ -5692,11 +5772,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco } } - // Wait for transition to finish. - co_await _topology_state_machine.event.when([&] { - auto& tmap = get_token_metadata().tablets().get_tablet_map(table); - return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token)); - }); + co_return true; } future<> storage_service::set_tablet_balancing_enabled(bool enabled) { @@ -6109,6 +6185,15 @@ node_state storage_service::get_node_state(locator::host_id id) { return p->second.state; } +void storage_service::check_raft_rpc(raft::server_id dst_id) { + if (!_group0 || !_group0->joined_group0()) { + throw std::runtime_error("The node did not join group 0 yet"); + } + if (_group0->load_my_id() != dst_id) { + throw raft_destination_id_not_correct(_group0->load_my_id(), dst_id); + } +} + void storage_service::init_messaging_service() { ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); @@ -6120,17 +6205,6 @@ void storage_service::init_messaging_service() { return ss.node_ops_cmd_handler(coordinator, coordinator_host_id, std::move(req)); }); }); - auto handle_raft_rpc = [this] (raft::server_id dst_id, auto handler) { - return container().invoke_on(0, [dst_id, handler = std::move(handler)] (auto& ss) mutable { - if (!ss._group0 || !ss._group0->joined_group0()) { - throw std::runtime_error("The node did not join group 0 yet"); - } - if (ss._group0->load_my_id() != dst_id) { - throw raft_destination_id_not_correct(ss._group0->load_my_id(), dst_id); - } - return handler(ss); - }); - }; ser::streaming_rpc_verbs::register_tablet_stream_files(&_messaging.local(), [this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future { streaming::stream_files_response resp; @@ -6142,13 +6216,13 @@ void storage_service::init_messaging_service() { std::plus()); co_return resp; }); - ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { + ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [this] (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, raft_topology_cmd cmd) { return handle_raft_rpc(dst_id, [cmd = std::move(cmd), term, cmd_index] (auto& ss) { check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_topology_cmd"); return ss.raft_topology_cmd_handler(term, cmd_index, cmd); }); }); - ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) { + ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [this] (raft::server_id dst_id, raft_snapshot_pull_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future { check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "raft_pull_snapshot"); utils::chunked_vector mutations; @@ -6243,28 +6317,28 @@ void storage_service::init_messaging_service() { }; }); }); - ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) { + ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [this] (raft::server_id dst_id, locator::global_tablet_id tablet) { return handle_raft_rpc(dst_id, [tablet] (auto& ss) { return ss.stream_tablet(tablet); }); }); - ser::storage_service_rpc_verbs::register_tablet_repair(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet, rpc::optional session_id) { + ser::storage_service_rpc_verbs::register_tablet_repair(&_messaging.local(), [this] (raft::server_id dst_id, locator::global_tablet_id tablet, rpc::optional session_id) { return handle_raft_rpc(dst_id, [tablet, session_id = session_id.value_or(service::session_id::create_null_id())] (auto& ss) -> future { auto res = co_await ss.repair_tablet(tablet, session_id); co_return res; }); }); - ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) { + ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (raft::server_id dst_id, locator::global_tablet_id tablet) { return handle_raft_rpc(dst_id, [tablet] (auto& ss) { return ss.cleanup_tablet(tablet); }); }); - ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) { + ser::storage_service_rpc_verbs::register_table_load_stats(&_messaging.local(), [this] (raft::server_id dst_id) { return handle_raft_rpc(dst_id, [] (auto& ss) mutable { return ss.load_stats_for_tablet_based_tables(); }); }); - ser::storage_service_rpc_verbs::register_table_load_stats_v1(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id) { + ser::storage_service_rpc_verbs::register_table_load_stats_v1(&_messaging.local(), [this] (raft::server_id dst_id) { return handle_raft_rpc(dst_id, [] (auto& ss) mutable { return ss.load_stats_for_tablet_based_tables().then([] (auto stats) { return locator::load_stats_v1{ .tables = std::move(stats.tables) }; @@ -6285,7 +6359,7 @@ void storage_service::init_messaging_service() { ser::storage_service_rpc_verbs::register_sample_sstables(&_messaging.local(), [this] (table_id table, uint64_t chunk_size, uint64_t n_chunks) -> future>> { return _db.local().sample_data_files(table, chunk_size, n_chunks); }); - ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_request_params params) { + ser::join_node_rpc_verbs::register_join_node_request(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_request_params params) { return handle_raft_rpc(dst_id, [params = std::move(params)] (auto& ss) mutable { check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_request"); return ss.join_node_request_handler(std::move(params)); @@ -6301,7 +6375,7 @@ void storage_service::init_messaging_service() { co_return co_await ss.join_node_response_handler(std::move(params)); }); }); - ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) { + ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [this] (raft::server_id dst_id, service::join_node_query_params) { return handle_raft_rpc(dst_id, [] (auto& ss) -> future { check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query"); auto result = join_node_query_result{ diff --git a/service/storage_service.hh b/service/storage_service.hh index 8f70697d4e..e46f1efc70 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -230,9 +230,6 @@ private: shared_ptr _global_topology_requests_module; shared_ptr _vnodes_to_tablets_migration_module; gms::gossip_address_map& _address_map; - future do_tablet_operation(locator::global_tablet_id tablet, - sstring op_name, - std::function(locator::tablet_metadata_guard&)> op); future repair_tablet(locator::global_tablet_id, service::session_id); future<> stream_tablet(locator::global_tablet_id); // Clones storage of leaving tablet into pending one. Done in the context of intra-node migration, @@ -244,7 +241,20 @@ private: future<> process_tablet_split_candidate(table_id) noexcept; void register_tablet_split_candidate(table_id) noexcept; future<> run_tablet_split_monitor(); + void check_raft_rpc(raft::server_id dst); public: + future do_tablet_operation(locator::global_tablet_id tablet, + sstring op_name, + std::function(locator::tablet_metadata_guard&)> op); + + template + auto handle_raft_rpc(raft::server_id dst_id, Func&& handler) { + return container().invoke_on(0, [dst_id, handler = std::forward(handler)] (auto& ss) mutable { + ss.check_raft_rpc(dst_id); + return handler(ss); + }); + }; + storage_service(abort_source& as, sharded& db, gms::gossiper& gossiper, sharded&, @@ -939,6 +949,7 @@ private: future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>(); future<> transit_tablet(table_id, dht::token, noncopyable_function, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations); + future try_transit_tablet(table_id, dht::token, noncopyable_function, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations); future get_guard_for_tablet_update(); future exec_tablet_update(service::group0_guard guard, utils::chunked_vector updates, sstring reason); public: @@ -948,6 +959,7 @@ public: future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); + future<> restore_tablets(table_id, sstring snap_name, sstring endpoint, sstring bucket); future<> set_tablet_balancing_enabled(bool); future<> await_topology_quiesced(); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 13f0e9347c..5bd1443a2c 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -977,6 +977,8 @@ private: return true; case tablet_transition_stage::repair: return true; + case tablet_transition_stage::restore: + return false; case tablet_transition_stage::end_repair: return false; case tablet_transition_stage::write_both_read_new: diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index ec0fb9c2e6..75e610c60c 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -63,6 +63,7 @@ #include "utils/stall_free.hh" #include "utils/to_string.hh" #include "service/endpoint_lifecycle_subscriber.hh" +#include "sstables_loader.hh" #include "idl/join_node.dist.hh" #include "idl/storage_service.dist.hh" @@ -72,6 +73,7 @@ #include "utils/updateable_value.hh" #include "repair/repair.hh" #include "idl/repair.dist.hh" +#include "idl/sstables_loader.dist.hh" #include "service/topology_coordinator.hh" @@ -1472,6 +1474,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber background_action_holder cleanup; background_action_holder repair; background_action_holder repair_update_compaction_ctrl; + background_action_holder restore; std::unordered_map barriers; // Record the repair_time returned by the repair_tablet rpc call db_clock::time_point repair_time; @@ -2165,6 +2168,33 @@ class topology_coordinator : public endpoint_lifecycle_subscriber } } break; + case locator::tablet_transition_stage::restore: { + if (!trinfo.restore_cfg.has_value()) { + on_internal_error(rtlogger, format("Cannot handle restore transition without config for tablet {}", gid)); + } + if (action_failed(tablet_state.restore)) { + rtlogger.debug("Clearing restore transition for {} due to error", gid); + updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build()); + break; + } + if (advance_in_background(gid, tablet_state.restore, "restore", [this, gid, &tmap] () -> future<> { + auto& tinfo = tmap.get_tablet_info(gid.tablet); + auto replicas = tinfo.replicas; + + rtlogger.info("Restoring tablet={} on {}", gid, replicas); + co_await coroutine::parallel_for_each(replicas, [this, gid] (locator::tablet_replica r) -> future<> { + auto dst = raft::server_id(r.host.uuid()); + if (!is_excluded(dst)) { + co_await ser::sstables_loader_rpc_verbs::send_restore_tablet(&_messaging, r.host, dst, gid); + rtlogger.debug("Tablet {} restored on {}", gid, r.host); + } + }); + })) { + rtlogger.debug("Clearing restore transition for {}", gid); + updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build()); + } + } + break; case locator::tablet_transition_stage::end_repair: { if (do_barrier()) { if (tablet_state.session_id.uuid().is_null()) { @@ -2345,6 +2375,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber break; case locator::tablet_transition_kind::repair: [[fallthrough]]; + case locator::tablet_transition_kind::restore: + [[fallthrough]]; case locator::tablet_transition_kind::intranode_migration: break; } diff --git a/sstables_loader.cc b/sstables_loader.cc index 19848f951b..e30624f6da 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -32,6 +32,13 @@ #include "message/messaging_service.hh" #include "service/storage_service.hh" #include "utils/error_injection.hh" +#include "sstables_loader_helpers.hh" +#include "db/system_distributed_keyspace.hh" +#include "idl/sstables_loader.dist.hh" + +#include "sstables/object_storage_client.hh" +#include "utils/rjson.hh" +#include "db/system_distributed_keyspace.hh" #include #include @@ -205,11 +212,6 @@ private: return result; } - struct minimal_sst_info { - sstables::generation_type _generation; - sstables::sstable_version_types _version; - sstables::sstable_format_types _format; - }; using sst_classification_info = std::vector>; future<> attach_sstable(shard_id from_shard, const sstring& ks, const sstring& cf, const minimal_sst_info& min_info) const { @@ -218,7 +220,7 @@ private: auto& table = db.find_column_family(ks, cf); auto& sst_manager = table.get_sstables_manager(); auto sst = sst_manager.make_sstable( - table.schema(), table.get_storage_options(), min_info._generation, sstables::sstable_state::normal, min_info._version, min_info._format); + table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format); sst->set_sstable_level(0); auto units = co_await sst_manager.dir_semaphore().get_units(1); sstables::sstable_open_config cfg { @@ -254,97 +256,10 @@ private: } future download_fully_contained_sstables(std::vector sstables) const { - constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; - constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; sst_classification_info downloaded_sstables(smp::count); for (const auto& sstable : sstables) { - auto components = sstable->all_components(); - - // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care - // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure - // this partially created SSTable will be cleaned up properly at some point. - auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; }); - if (toc_it != components.begin()) { - swap(*toc_it, components.front()); - } - - // Ensure the Scylla component is processed second. - // - // The sstable_sink->output() call for each component may invoke load_metadata() - // and save_metadata(), but these functions only operate correctly if the Scylla - // component file already exists on disk. If the Scylla component is written first, - // load_metadata()/save_metadata() become no-ops, leaving the original Scylla - // component (with outdated metadata) untouched. - // - // By placing the Scylla component second, we guarantee that: - // 1) The first component (TOC) is written and the Scylla component file already - // exists on disk when subsequent output() calls happen. - // 2) Later output() calls will overwrite the Scylla component with the correct, - // updated metadata. - // - // In short: Scylla must be written second so that all following output() calls - // can properly update its metadata instead of silently skipping it. - auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; }); - if (scylla_it != std::next(components.begin())) { - swap(*scylla_it, *std::next(components.begin())); - } - - auto gen = _table.get_sstable_generation_generator()(); - auto files = co_await sstable->readable_file_for_all_components(); - for (auto it = components.cbegin(); it != components.cend(); ++it) { - try { - auto descriptor = sstable->get_descriptor(it->first); - auto sstable_sink = sstables::create_stream_sink( - _table.schema(), - _table.get_sstables_manager(), - _table.get_storage_options(), - sstables::sstable_state::normal, - sstables::sstable::component_basename( - _table.schema()->ks_name(), _table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first), - sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(), - .leave_unsealed = true}); - auto out = co_await sstable_sink->output(foptions, stream_options); - - input_stream src(co_await [this, &it, sstable, f = files.at(it->first)]() -> future> { - const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2}; - - if (it->first != sstables::component_type::Data) { - co_return input_stream( - co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits::max(), fis_options)); - } - auto permit = co_await _db.local().obtain_reader_permit(_table, "download_fully_contained_sstables", db::no_timeout, {}); - co_return co_await ( - sstable->get_compression() - ? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes) - : sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no)); - }()); - - std::exception_ptr eptr; - try { - co_await seastar::copy(src, out); - } catch (...) { - eptr = std::current_exception(); - llog.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr); - } - co_await src.close(); - co_await out.close(); - if (eptr) { - co_await sstable_sink->abort(); - std::rethrow_exception(eptr); - } - if (auto sst = co_await sstable_sink->close()) { - const auto& shards = sstable->get_shards_for_this_sstable(); - if (shards.size() != 1) { - on_internal_error(llog, "Fully-contained sstable must belong to one shard only"); - } - llog.debug("SSTable shards {}", fmt::join(shards, ", ")); - downloaded_sstables[shards.front()].emplace_back(gen, descriptor.version, descriptor.format); - } - } catch (...) { - llog.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception()); - throw; - } - } + auto min_info = co_await download_sstable(_db.local(), _table, sstable, llog); + downloaded_sstables[min_info.shard].emplace_back(min_info); } co_return downloaded_sstables; } @@ -506,26 +421,9 @@ future> tablet_sstable_streamer::get_ssta auto reversed_sstables = sstables | std::views::reverse; for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) { - for (const auto& sst : reversed_sstables) { - auto sst_first = sst->get_first_decorated_key().token(); - auto sst_last = sst->get_last_decorated_key().token(); - - // SSTable entirely after tablet -> no further SSTables (larger keys) can overlap - if (tablet_range.after(sst_first, dht::token_comparator{})) { - break; - } - // SSTable entirely before tablet -> skip and continue scanning later (larger keys) - if (tablet_range.before(sst_last, dht::token_comparator{})) { - continue; - } - - if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) { - sstables_fully_contained.push_back(sst); - } else { - sstables_partially_contained.push_back(sst); - } - co_await coroutine::maybe_yield(); - } + auto [fully, partially] = co_await get_sstables_for_tablet(reversed_sstables, tablet_range, [](const auto& sst) { return sst->get_first_decorated_key().token(); }, [](const auto& sst) { return sst->get_last_decorated_key().token(); }); + sstables_fully_contained = std::move(fully); + sstables_partially_contained = std::move(partially); } co_return std::move(tablets_sstables); } @@ -969,6 +867,7 @@ sstables_loader::sstables_loader(sharded& db, sharded& vbw, tasks::task_manager& tm, sstables::storage_manager& sstm, + db::system_distributed_keyspace& sys_dist_ks, seastar::scheduling_group sg) : _db(db) , _ss(ss) @@ -977,12 +876,24 @@ sstables_loader::sstables_loader(sharded& db, , _view_building_worker(vbw) , _task_manager_module(make_shared(tm)) , _storage_manager(sstm) + , _sys_dist_ks(sys_dist_ks) , _sched_group(std::move(sg)) { tm.register_module("sstables_loader", _task_manager_module); + ser::sstables_loader_rpc_verbs::register_restore_tablet(&_messaging, [this] (raft::server_id dst_id, locator::global_tablet_id gid) -> future { + return _ss.local().handle_raft_rpc(dst_id, [&sl = container(), gid] (auto& ss) { + return ss.do_tablet_operation(gid, "Restore", [&sl, gid] (locator::tablet_metadata_guard& guard) -> future { + co_await sl.local().download_tablet_sstables(gid, guard); + co_return service::tablet_operation_empty_result{}; + }).then([] (auto res) { + return make_ready_future(); + }); + }); + }); } future<> sstables_loader::stop() { + co_await ser::sstables_loader_rpc_verbs::unregister(&_messaging), co_await _task_manager_module->stop(); } @@ -998,7 +909,291 @@ future sstables_loader::download_new_sstables(sstring ks_name, s std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica)); co_return task->id(); } + +future sstables_loader::attach_sstable(table_id tid, const minimal_sst_info& min_info) const { + auto& db = _db.local(); + auto& table = db.find_column_family(tid); + llog.debug("Adding downloaded SSTables to the table {} on shard {}", table.schema()->cf_name(), this_shard_id()); + auto& sst_manager = table.get_sstables_manager(); + auto sst = sst_manager.make_sstable( + table.schema(), table.get_storage_options(), min_info.generation, sstables::sstable_state::normal, min_info.version, min_info.format); + sst->set_sstable_level(0); + auto erm = table.get_effective_replication_map(); + sstables::sstable_open_config cfg { + .unsealed_sstable = true, + .ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch(), + }; + co_await sst->load(erm->get_sharder(*table.schema()), cfg); + if (!sst->sstable_identifier()) { + on_internal_error(llog, "sstable identifier is required for tablet restore"); + } + co_await table.add_new_sstable_and_update_cache(sst, [&sst_manager, sst] (sstables::shared_sstable loading_sst) -> future<> { + if (loading_sst == sst) { + auto writer_cfg = sst_manager.configure_writer(loading_sst->get_origin()); + co_await loading_sst->seal_sstable(writer_cfg.backup); + } + }); + co_return sst; +} + +future<> sstables_loader::download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard& guard) { + auto& tmap = guard.get_tablet_map(); + + auto* trinfo = tmap.get_tablet_transition_info(tid.tablet); + if (!trinfo) { + throw std::runtime_error(fmt::format("No transition info for tablet {}", tid)); + } + if (!trinfo->restore_cfg) { + throw std::runtime_error(format("No restore config for tablet {} restore transition", tid)); + } + + locator::restore_config restore_cfg = *trinfo->restore_cfg; + llog.info("Downloading sstables for tablet {} from {}@{}/{}", tid, restore_cfg.snapshot_name, restore_cfg.endpoint, restore_cfg.bucket); + + auto s = _db.local().find_schema(tid.table); + auto tablet_range = tmap.get_token_range(tid.tablet); + const auto& topo = guard.get_token_metadata()->get_topology(); + auto keyspace_name = s->ks_name(); + auto table_name = s->cf_name(); + auto datacenter = topo.get_datacenter(); + auto rack = topo.get_rack(); + + auto sst_infos = co_await _sys_dist_ks.get_snapshot_sstables(restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack, + db::consistency_level::LOCAL_QUORUM, tablet_range.start().transform([] (auto& v) { return v.value(); }), tablet_range.end().transform([] (auto& v) { return v.value(); })); + llog.debug("{} SSTables found for tablet {}", sst_infos.size(), tid); + if (sst_infos.empty()) { + throw std::runtime_error(format("No SSTables found in system_distributed.snapshot_sstables for {}", restore_cfg.snapshot_name)); + } + + auto [ fully, partially ] = co_await get_sstables_for_tablet(sst_infos, tablet_range, [] (const auto& si) { return si.first_token; }, [] (const auto& si) { return si.last_token; }); + if (!partially.empty()) { + llog.debug("Sstable {} is partially contained", partially.front().sstable_id); + throw std::logic_error("sstables_partially_contained"); + } + llog.debug("{} SSTables filtered by range {} for tablet {}", fully.size(), tablet_range, tid); + co_await utils::get_local_injector().inject("pause_tablet_restore", utils::wait_for_message(60s)); + + if (fully.empty()) { + // It can happen that a tablet exists and contains no data. Just skip it + co_return; + } + + std::unordered_map> toc_names_by_prefix; + for (const auto& e : fully) { + toc_names_by_prefix[e.prefix].emplace_back(e.toc_name); + } + + auto ep_type = _storage_manager.get_endpoint_type(restore_cfg.endpoint); + sstables::sstable_open_config cfg { + .load_bloom_filter = false, + }; + + using sstables_col = std::vector; + using prefix_sstables = std::vector; + + auto sstables_on_shards = co_await map_reduce(toc_names_by_prefix, [&] (auto ent) { + return replica::distributed_loader::get_sstables_from_object_store(_db, s->ks_name(), s->cf_name(), + std::move(ent.second), restore_cfg.endpoint, ep_type, restore_cfg.bucket, std::move(ent.first), cfg, [&] { return nullptr; }).then_unpack([] (table_id, auto sstables) { + return make_ready_future>(std::move(sstables)); + }); + }, std::vector(smp::count), [&] (std::vector a, std::vector b) { + // We can't move individual elements of b[i], because these + // are lw_shared_ptr-s collected on another shard. So we move + // the whole sstables_col here so that subsequent code will + // walk over it and move the pointers where it wants on proper + // shard. + for (unsigned i = 0; i < smp::count; i++) { + a[i].push_back(std::move(b[i])); + } + return a; + }); + + auto downloaded_ssts = co_await container().map_reduce0( + [tid, &sstables_on_shards](auto& loader) -> future>> { + sstables_col sst_chunk; + for (auto& psst : sstables_on_shards[this_shard_id()]) { + for (auto&& sst : psst) { + sst_chunk.push_back(std::move(sst)); + } + } + std::vector> local_min_infos(smp::count); + co_await max_concurrent_for_each(sst_chunk, 16, [&loader, tid, &local_min_infos](const auto& sst) -> future<> { + auto& table = loader._db.local().find_column_family(tid.table); + auto min_info = co_await download_sstable(loader._db.local(), table, sst, llog); + local_min_infos[min_info.shard].emplace_back(std::move(min_info)); + }); + co_return local_min_infos; + }, + std::vector>(smp::count), + [](auto init, auto&& item) -> std::vector> { + for (std::size_t i = 0; i < item.size(); ++i) { + init[i].append_range(std::move(item[i])); + } + return init; + }); + + co_await container().invoke_on_all([tid, &downloaded_ssts, snap_name = restore_cfg.snapshot_name, keyspace_name, table_name, datacenter, rack] (auto& loader) -> future<> { + auto shard_ssts = std::move(downloaded_ssts[this_shard_id()]); + co_await max_concurrent_for_each(shard_ssts, 16, [&loader, tid, snap_name, keyspace_name, table_name, datacenter, rack](const auto& min_info) -> future<> { + sstables::shared_sstable attached_sst = co_await loader.attach_sstable(tid.table, min_info); + co_await loader._sys_dist_ks.update_sstable_download_status(snap_name, + keyspace_name, + table_name, + datacenter, + rack, + *attached_sst->sstable_identifier(), + attached_sst->get_first_decorated_key().token(), + db::is_downloaded::yes); + }); + }); +} + future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges) { return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges)); } + +static future process_manifest(input_stream& is, sstring keyspace, sstring table, + const sstring& expected_snapshot_name, + const sstring& manifest_prefix, db::system_distributed_keyspace& sys_dist_ks, + db::consistency_level cl) { + // Read the entire JSON content + rjson::chunked_content content = co_await util::read_entire_stream(is); + + rjson::value parsed = rjson::parse(std::move(content)); + + // Basic validation that tablet_count is a power of 2, as expected by our restore process + size_t tablet_count = parsed["table"]["tablet_count"].GetUint64(); + if (!std::has_single_bit(tablet_count)) { + on_internal_error(llog, fmt::format("Invalid tablet_count {} in manifest {}, expected a power of 2", tablet_count, manifest_prefix)); + } + + // Extract the necessary fields from the manifest + // Expected JSON structure documented in docs/dev/object_storage.md + auto snapshot_name = rjson::to_sstring(parsed["snapshot"]["name"]); + if (snapshot_name != expected_snapshot_name) { + throw std::runtime_error(fmt::format("Manifest {} belongs to snapshot '{}', expected '{}'", + manifest_prefix, snapshot_name, expected_snapshot_name)); + } + if (keyspace.empty()) { + keyspace = rjson::to_sstring(parsed["table"]["keyspace_name"]); + } + if (table.empty()) { + table = rjson::to_sstring(parsed["table"]["table_name"]); + } + auto datacenter = rjson::to_sstring(parsed["node"]["datacenter"]); + auto rack = rjson::to_sstring(parsed["node"]["rack"]); + + // Process each sstable entry in the manifest + // FIXME: cleanup of the snapshot-related rows is needed in case anything throws in here. + auto sstables = rjson::find(parsed, "sstables"); + if (!sstables) { + co_return tablet_count; + } + if (!sstables->IsArray()) { + throw std::runtime_error("Malformed manifest, 'sstables' is not array"); + } + + for (auto& sstable_entry : sstables->GetArray()) { + auto id = rjson::to_sstable_id(sstable_entry["id"]); + auto first_token = rjson::to_token(sstable_entry["first_token"]); + auto last_token = rjson::to_token(sstable_entry["last_token"]); + auto toc_name = rjson::to_sstring(sstable_entry["toc_name"]); + auto prefix = sstring(std::filesystem::path(manifest_prefix).parent_path().string()); + // Insert the snapshot sstable metadata into system_distributed.snapshot_sstables with a TTL of 3 days, that should be enough + // for any snapshot restore operation to complete, and after that the metadata will be automatically cleaned up from the table + co_await sys_dist_ks.insert_snapshot_sstable(snapshot_name, keyspace, table, datacenter, rack, id, first_token, last_token, + toc_name, prefix, db::is_downloaded::no, cl); + } + + co_return tablet_count; +} + +future populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring keyspace, sstring table, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector manifest_prefixes, db::consistency_level cl) { + // Download manifests in parallel and populate system_distributed.snapshot_sstables + // with the content extracted from each manifest + auto client = sm.get_endpoint_client(endpoint); + + // tablet_count to be returned by this function, we also validate that all manifests passed contain the same tablet count + std::optional tablet_count; + + co_await seastar::max_concurrent_for_each(manifest_prefixes, 16, [&] (const sstring& manifest_prefix) { + // Download the manifest JSON file + sstables::object_name name(bucket, manifest_prefix); + auto source = client->make_download_source(name); + return seastar::with_closeable(input_stream(std::move(source)), [&] (input_stream& is) { + return process_manifest(is, keyspace, table, expected_snapshot_name, manifest_prefix, sys_dist_ks, cl).then([&](size_t count) { + if (!tablet_count) { + tablet_count = count; + } else if (*tablet_count != count) { + throw std::runtime_error(fmt::format("Inconsistent tablet_count values in manifest {}: expected {}, got {}", manifest_prefix, *tablet_count, count)); + } + }); + }); + }); + + co_return *tablet_count; +} + +class sstables_loader::tablet_restore_task_impl : public tasks::task_manager::task::impl { + sharded& _loader; + table_id _tid; + sstring _snap_name; + sstring _endpoint; + sstring _bucket; + +public: + tablet_restore_task_impl(tasks::task_manager::module_ptr module, sharded& loader, sstring ks, + table_id tid, sstring snap_name, sstring endpoint, sstring bucket) noexcept + : tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id()) + , _loader(loader) + , _tid(std::move(tid)) + , _snap_name(std::move(snap_name)) + , _endpoint(std::move(endpoint)) + , _bucket(std::move(bucket)) + { + _status.progress_units = "batches"; + } + + virtual std::string type() const override { + return "restore_tablets"; + } + + virtual tasks::is_internal is_internal() const noexcept override { + return tasks::is_internal::no; + } + + virtual tasks::is_user_task is_user_task() const noexcept override { + return tasks::is_user_task::yes; + } + + tasks::is_abortable is_abortable() const noexcept override { + return tasks::is_abortable::no; + } + +protected: + virtual future<> run() override { + auto& loader = _loader.local(); + co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket); + + auto& db = loader._db.local(); + auto s = db.find_schema(_tid); + const auto& topo = db.get_token_metadata().get_topology(); + auto dc = topo.get_datacenter(); + + for (const auto& rack : topo.get_datacenter_racks().at(dc) | std::views::keys) { + auto result = co_await loader._sys_dist_ks.get_snapshot_sstables(_snap_name, s->ks_name(), s->cf_name(), dc, rack); + auto it = std::find_if(result.begin(), result.end(), [] (const auto& ent) { return !ent.downloaded; }); + if (it != result.end()) { + llog.warn("Some replicas failed to download SSTables for {}:{} from {}", s->ks_name(), s->cf_name(), _snap_name); + throw std::runtime_error(format("Failed to download {}", it->toc_name)); + } + } + } +}; + +future sstables_loader::restore_tablets(table_id tid, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector manifests) { + co_await populate_snapshot_sstables_from_manifests(_storage_manager, _sys_dist_ks, keyspace, table, endpoint, bucket, snap_name, std::move(manifests)); + auto task = co_await _task_manager_module->make_and_start_task({}, container(), keyspace, tid, std::move(snap_name), std::move(endpoint), std::move(bucket)); + co_return task->id(); +} diff --git a/sstables_loader.hh b/sstables_loader.hh index e005cc52ea..987818ad25 100644 --- a/sstables_loader.hh +++ b/sstables_loader.hh @@ -15,6 +15,8 @@ #include "schema/schema_fwd.hh" #include "sstables/shared_sstable.hh" #include "tasks/task_manager.hh" +#include "db/consistency_level_type.hh" +#include "locator/tablets.hh" using namespace seastar; @@ -22,10 +24,15 @@ namespace replica { class database; } +struct minimal_sst_info; +struct restore_result { +}; + namespace sstables { class storage_manager; } namespace netw { class messaging_service; } namespace db { +class system_distributed_keyspace; namespace view { class view_builder; class view_building_worker; @@ -36,6 +43,7 @@ class storage_service; } namespace locator { class effective_replication_map; +class tablet_metadata_guard; } struct stream_progress { @@ -80,6 +88,7 @@ private: sharded& _view_building_worker; shared_ptr _task_manager_module; sstables::storage_manager& _storage_manager; + db::system_distributed_keyspace& _sys_dist_ks; seastar::scheduling_group _sched_group; // Note that this is obviously only valid for the current shard. Users of @@ -96,6 +105,9 @@ private: shared_ptr progress); future> await_topology_quiesced_and_get_erm(table_id table_id); + future<> download_tablet_sstables(locator::global_tablet_id tid, locator::tablet_metadata_guard&); + future attach_sstable(table_id tid, const minimal_sst_info& min_info) const; + public: sstables_loader(sharded& db, sharded& ss, @@ -104,6 +116,7 @@ public: sharded& vbw, tasks::task_manager& tm, sstables::storage_manager& sstm, + db::system_distributed_keyspace& sys_dist_ks, seastar::scheduling_group sg); future<> stop(); @@ -134,7 +147,10 @@ public: sstring prefix, std::vector sstables, sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica); + future restore_tablets(table_id, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector manifests); + class download_task_impl; + class tablet_restore_task_impl; }; template <> @@ -169,3 +185,5 @@ struct tablet_sstable_collection { // Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order. future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges); + +future populate_snapshot_sstables_from_manifests(sstables::storage_manager& sm, db::system_distributed_keyspace& sys_dist_ks, sstring keyspace, sstring table, sstring endpoint, sstring bucket, sstring expected_snapshot_name, utils::chunked_vector manifest_prefixes, db::consistency_level cl = db::consistency_level::EACH_QUORUM); diff --git a/sstables_loader_helpers.cc b/sstables_loader_helpers.cc new file mode 100644 index 0000000000..9aa97e4004 --- /dev/null +++ b/sstables_loader_helpers.cc @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#include "sstables_loader_helpers.hh" + +#include +#include +#include +#include "replica/database.hh" +#include "sstables/shared_sstable.hh" +#include "sstables/sstables.hh" +#include "utils/error_injection.hh" + +future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger) { + constexpr auto foptions = file_open_options{.extent_allocation_size_hint = 32_MiB, .sloppy_size = true}; + constexpr auto stream_options = file_output_stream_options{.buffer_size = 128_KiB, .write_behind = 10}; + auto components = sstable->all_components(); + + utils::get_local_injector().inject("fail_download_sstable", [] { throw std::runtime_error("Failing sstable download"); }); + + // Move the TOC to the front to be processed first since `sstables::create_stream_sink` takes care + // of creating behind the scene TemporaryTOC instead of usual one. This assures that in case of failure + // this partially created SSTable will be cleaned up properly at some point. + auto toc_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::TOC; }); + if (toc_it != components.begin()) { + swap(*toc_it, components.front()); + } + + // Ensure the Scylla component is processed second. + // + // The sstable_sink->output() call for each component may invoke load_metadata() + // and save_metadata(), but these functions only operate correctly if the Scylla + // component file already exists on disk. If the Scylla component is written first, + // load_metadata()/save_metadata() become no-ops, leaving the original Scylla + // component (with outdated metadata) untouched. + // + // By placing the Scylla component second, we guarantee that: + // 1) The first component (TOC) is written and the Scylla component file already + // exists on disk when subsequent output() calls happen. + // 2) Later output() calls will overwrite the Scylla component with the correct, + // updated metadata. + // + // In short: Scylla must be written second so that all following output() calls + // can properly update its metadata instead of silently skipping it. + auto scylla_it = std::ranges::find_if(components, [](const auto& component) { return component.first == component_type::Scylla; }); + if (scylla_it != std::next(components.begin())) { + swap(*scylla_it, *std::next(components.begin())); + } + + auto gen = table.get_sstable_generation_generator()(); + auto files = co_await sstable->readable_file_for_all_components(); + for (auto it = components.cbegin(); it != components.cend(); ++it) { + try { + auto descriptor = sstable->get_descriptor(it->first); + auto sstable_sink = + sstables::create_stream_sink(table.schema(), + table.get_sstables_manager(), + table.get_storage_options(), + sstables::sstable_state::normal, + sstables::sstable::component_basename( + table.schema()->ks_name(), table.schema()->cf_name(), descriptor.version, gen, descriptor.format, it->first), + sstables::sstable_stream_sink_cfg{.last_component = std::next(it) == components.cend(), .leave_unsealed = true}); + auto out = co_await sstable_sink->output(foptions, stream_options); + + input_stream src(co_await [&it, sstable, f = files.at(it->first), &db, &table]() -> future> { + const auto fis_options = file_input_stream_options{.buffer_size = 128_KiB, .read_ahead = 2}; + + if (it->first != sstables::component_type::Data) { + co_return input_stream( + co_await sstable->get_storage().make_source(*sstable, it->first, f, 0, std::numeric_limits::max(), fis_options)); + } + auto permit = co_await db.obtain_reader_permit(table, "download_fully_contained_sstables", db::no_timeout, {}); + co_return co_await ( + sstable->get_compression() + ? sstable->data_stream(0, sstable->ondisk_data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::yes) + : sstable->data_stream(0, sstable->data_size(), std::move(permit), nullptr, nullptr, sstables::sstable::raw_stream::no)); + }()); + + std::exception_ptr eptr; + try { + co_await seastar::copy(src, out); + } catch (...) { + eptr = std::current_exception(); + logger.info("Error downloading SSTable component {}. Reason: {}", it->first, eptr); + } + co_await src.close(); + co_await out.close(); + if (eptr) { + co_await sstable_sink->abort(); + std::rethrow_exception(eptr); + } + if (auto sst = co_await sstable_sink->close()) { + const auto& shards = sstable->get_shards_for_this_sstable(); + if (shards.size() != 1) { + on_internal_error(logger, "Fully-contained sstable must belong to one shard only"); + } + logger.debug("SSTable shards {}", fmt::join(shards, ", ")); + co_return minimal_sst_info{shards.front(), gen, descriptor.version, descriptor.format}; + } + } catch (...) { + logger.info("Error downloading SSTable component {}. Reason: {}", it->first, std::current_exception()); + throw; + } + } + throw std::logic_error("SSTable must have at least one component"); +} diff --git a/sstables_loader_helpers.hh b/sstables_loader_helpers.hh new file mode 100644 index 0000000000..fb5bae14df --- /dev/null +++ b/sstables_loader_helpers.hh @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#pragma once + +#include +#include +#include "dht/i_partitioner_fwd.hh" +#include "dht/token.hh" +#include "sstables/generation_type.hh" +#include "sstables/shared_sstable.hh" +#include "sstables/version.hh" +#include "utils/log.hh" + +#include + +namespace replica { +class table; +class database; +} // namespace replica + +struct minimal_sst_info { + shard_id shard; + sstables::generation_type generation; + sstables::sstable_version_types version; + sstables::sstable_format_types format; +}; + +future download_sstable(replica::database& db, replica::table& table, sstables::shared_sstable sstable, logging::logger& logger); + +template > +seastar::future, std::vector>> +get_sstables_for_tablet(Range&& ranges, const dht::token_range& token_range, auto&& get_first, auto&& get_last) { + std::vector fully_contained; + std::vector partially_contained; + for (const auto& range : ranges) { + auto first_token = get_first(range); + auto last_token = get_last(range); + + // Range entirely after token range -> no further ranges (larger keys) can overlap + if (token_range.after(first_token, dht::token_comparator{})) { + break; + } + // Range entirely before token range -> skip and continue scanning later (larger keys) + if (token_range.before(last_token, dht::token_comparator{})) { + continue; + } + + if (token_range.contains(dht::token_range{first_token, last_token}, dht::token_comparator{})) { + fully_contained.push_back(range); + } else { + partially_contained.push_back(range); + } + co_await coroutine::maybe_yield(); + } + co_return std::make_tuple(std::move(fully_contained), std::move(partially_contained)); +} diff --git a/test/boost/CMakeLists.txt b/test/boost/CMakeLists.txt index 4f0fa203ec..8424c1834f 100644 --- a/test/boost/CMakeLists.txt +++ b/test/boost/CMakeLists.txt @@ -314,6 +314,8 @@ add_scylla_test(symmetric_key_test KIND SEASTAR LIBRARIES encryption) +add_scylla_test(tablet_aware_restore_test + KIND SEASTAR) add_scylla_test(combined_tests KIND SEASTAR diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index b14d4330a3..43baa4d6ec 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -56,6 +56,7 @@ #include "db/system_keyspace.hh" #include "db/view/view_builder.hh" #include "replica/mutation_dump.hh" +#include "test/boost/database_test.hh" using namespace std::chrono_literals; using namespace sstables; @@ -519,77 +520,12 @@ SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) { }, test_config).get(); } -// Snapshot tests and their helpers -// \param func: function to be called back, in a seastar thread. -future<> do_with_some_data_in_thread(std::vector cf_names, std::function func, bool create_mvs = false, shared_ptr db_cfg_ptr = {}, size_t num_keys = 2) { - return seastar::async([cf_names = std::move(cf_names), func = std::move(func), create_mvs, db_cfg_ptr = std::move(db_cfg_ptr), num_keys] () mutable { - lw_shared_ptr tmpdir_for_data; - if (!db_cfg_ptr) { - tmpdir_for_data = make_lw_shared(); - db_cfg_ptr = make_shared(); - db_cfg_ptr->data_file_directories(std::vector({ tmpdir_for_data->path().string() })); - } - do_with_cql_env_thread([cf_names = std::move(cf_names), func = std::move(func), create_mvs, num_keys] (cql_test_env& e) { - for (const auto& cf_name : cf_names) { - e.create_table([&cf_name] (std::string_view ks_name) { - return *schema_builder(ks_name, cf_name) - .with_column("p1", utf8_type, column_kind::partition_key) - .with_column("c1", int32_type, column_kind::clustering_key) - .with_column("c2", int32_type, column_kind::clustering_key) - .with_column("r1", int32_type) - .build(); - }).get(); - auto stmt = e.prepare(fmt::format("insert into {} (p1, c1, c2, r1) values (?, ?, ?, ?)", cf_name)).get(); - auto make_key = [] (int64_t k) { - std::string s = fmt::format("key{}", k); - return cql3::raw_value::make_value(utf8_type->decompose(s)); - }; - auto make_val = [] (int64_t x) { - return cql3::raw_value::make_value(int32_type->decompose(int32_t{x})); - }; - for (size_t i = 0; i < num_keys; ++i) { - auto key = tests::random::get_int(1, 1000000); - e.execute_prepared(stmt, {make_key(key), make_val(key), make_val(key + 1), make_val(key + 2)}).get(); - e.execute_prepared(stmt, {make_key(key), make_val(key + 1), make_val(key + 1), make_val(key + 2)}).get(); - e.execute_prepared(stmt, {make_key(key), make_val(key + 2), make_val(key + 1), make_val(key + 2)}).get(); - } - - if (create_mvs) { - auto f1 = e.local_view_builder().wait_until_built("ks", seastar::format("view_{}", cf_name)); - e.execute_cql(seastar::format("create materialized view view_{0} as select * from {0} where p1 is not null and c1 is not null and c2 is " - "not null primary key (p1, c1, c2)", - cf_name)) - .get(); - f1.get(); - - auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index"); - e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get(); - f2.get(); - } - } - - func(e); - }, db_cfg_ptr).get(); - }); -} - future<> do_with_some_data(std::vector cf_names, std::function (cql_test_env&)> func, bool create_mvs = false, shared_ptr db_cfg_ptr = {}) { co_await do_with_some_data_in_thread(cf_names, [&] (cql_test_env& e) { func(e).get(); }, create_mvs, db_cfg_ptr); } -future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) { - try { - auto uuid = e.db().local().find_uuid(ks_name, cf_name); - co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts); - } catch (...) { - testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}", - ks_name, cf_name, snapshot_name, opts.skip_flush, std::current_exception()); - throw; - } -} - future> collect_files(fs::path path) { std::set ret; directory_lister lister(path, lister::dir_entry_types::of()); diff --git a/test/boost/database_test.hh b/test/boost/database_test.hh new file mode 100644 index 0000000000..76c7a1dc14 --- /dev/null +++ b/test/boost/database_test.hh @@ -0,0 +1,84 @@ + +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#pragma once + +#include "db/snapshot-ctl.hh" +#include "db/config.hh" +#include "test/lib/cql_test_env.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/tmpdir.hh" +#include "test/lib/log.hh" +#include "db/view/view_builder.hh" + + +// Snapshot tests and their helpers +// \param func: function to be called back, in a seastar thread. +future<> do_with_some_data_in_thread(std::vector cf_names, std::function func, bool create_mvs = false, shared_ptr db_cfg_ptr = {}, size_t num_keys = 2) { + return seastar::async([cf_names = std::move(cf_names), func = std::move(func), create_mvs, db_cfg_ptr = std::move(db_cfg_ptr), num_keys] () mutable { + lw_shared_ptr tmpdir_for_data; + if (!db_cfg_ptr) { + tmpdir_for_data = make_lw_shared(); + db_cfg_ptr = make_shared(); + db_cfg_ptr->data_file_directories(std::vector({ tmpdir_for_data->path().string() })); + } + do_with_cql_env_thread([cf_names = std::move(cf_names), func = std::move(func), create_mvs, num_keys] (cql_test_env& e) { + for (const auto& cf_name : cf_names) { + e.create_table([&cf_name] (std::string_view ks_name) { + return *schema_builder(ks_name, cf_name) + .with_column("p1", utf8_type, column_kind::partition_key) + .with_column("c1", int32_type, column_kind::clustering_key) + .with_column("c2", int32_type, column_kind::clustering_key) + .with_column("r1", int32_type) + .build(); + }).get(); + auto stmt = e.prepare(fmt::format("insert into {} (p1, c1, c2, r1) values (?, ?, ?, ?)", cf_name)).get(); + auto make_key = [] (int64_t k) { + std::string s = fmt::format("key{}", k); + return cql3::raw_value::make_value(utf8_type->decompose(s)); + }; + auto make_val = [] (int64_t x) { + return cql3::raw_value::make_value(int32_type->decompose(int32_t{x})); + }; + for (size_t i = 0; i < num_keys; ++i) { + auto key = tests::random::get_int(1, 1000000); + e.execute_prepared(stmt, {make_key(key), make_val(key), make_val(key + 1), make_val(key + 2)}).get(); + e.execute_prepared(stmt, {make_key(key), make_val(key + 1), make_val(key + 1), make_val(key + 2)}).get(); + e.execute_prepared(stmt, {make_key(key), make_val(key + 2), make_val(key + 1), make_val(key + 2)}).get(); + } + + if (create_mvs) { + auto f1 = e.local_view_builder().wait_until_built("ks", seastar::format("view_{}", cf_name)); + e.execute_cql(seastar::format("create materialized view view_{0} as select * from {0} where p1 is not null and c1 is not null and c2 is " + "not null primary key (p1, c1, c2)", + cf_name)) + .get(); + f1.get(); + + auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index"); + e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get(); + f2.get(); + } + } + + func(e); + }, db_cfg_ptr).get(); + }); +} + +future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) { + try { + auto uuid = e.db().local().find_uuid(ks_name, cf_name); + co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts); + } catch (...) { + testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}", + ks_name, cf_name, snapshot_name, opts.skip_flush, std::current_exception()); + throw; + } +} diff --git a/test/boost/tablet_aware_restore_test.cc b/test/boost/tablet_aware_restore_test.cc new file mode 100644 index 0000000000..8449a835d3 --- /dev/null +++ b/test/boost/tablet_aware_restore_test.cc @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2026-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + + + +#include "test/lib/cql_test_env.hh" +#include "utils/assert.hh" +#include +#include +#include + +#include +#include +#include + +#include "db/config.hh" +#include "db/consistency_level_type.hh" +#include "db/system_distributed_keyspace.hh" +#include "sstables/object_storage_client.hh" +#include "sstables/storage.hh" +#include "sstables_loader.hh" +#include "replica/database_fwd.hh" +#include "tasks/task_handler.hh" +#include "db/system_distributed_keyspace.hh" +#include "service/storage_proxy.hh" + +#include "test/lib/test_utils.hh" +#include "test/lib/random_utils.hh" +#include "test/lib/sstable_test_env.hh" +#include "test/boost/database_test.hh" + + +SEASTAR_TEST_CASE(test_snapshot_manifests_table_api_works, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + auto db_cfg_ptr = make_shared(); + + return do_with_cql_env([] (cql_test_env& env) -> future<> { + auto snapshot_name = "snapshot"; + auto ks = "ks"; + auto table = "cf"; + auto dc = "dc1"; + auto rack = "r1"; + auto sstable_id = utils::UUID_gen::get_time_UUID(); + auto last_token = dht::token::from_int64(100); + auto toc_name = "me-3gdq_0bki_2cvk01yl83nj0tp5gh-big-TOC.txt"; + auto prefix = "some/prefix"; + auto num_iter = 5; + + for (int i = num_iter - 1; i >= 0; --i) { + // insert some test data into snapshot_sstables table + auto first_token = dht::token::from_int64(i); + co_await env.get_system_distributed_keyspace().local().insert_snapshot_sstable(snapshot_name, ks, table, dc, rack, sstables::sstable_id(sstable_id), first_token, last_token, toc_name, prefix, db::is_downloaded::no, db::consistency_level::ONE); + } + // read it back and check if it is correct + auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables(snapshot_name, ks, table, dc, rack, db::consistency_level::ONE); + + BOOST_CHECK_EQUAL(sstables.size(), num_iter); + + for (int i = 0; i < num_iter; ++i) { + const auto& sstable = sstables[i]; + BOOST_CHECK_EQUAL(sstable.toc_name, toc_name); + BOOST_CHECK_EQUAL(sstable.prefix, prefix); + BOOST_CHECK_EQUAL(sstable.first_token, dht::token::from_int64(i)); + BOOST_CHECK_EQUAL(sstable.last_token, last_token); + BOOST_CHECK_EQUAL(sstable.sstable_id.uuid(), sstable_id); + } + + // test token range filtering: matching range should return the sstable + auto filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(-10), dht::token::from_int64(num_iter + 1)); + BOOST_CHECK_EQUAL(filtered.size(), num_iter); + + // test token range filtering: the interval is inclusive, if start and end are equal to first_token, it should return one sstable + filtered = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(0), dht::token::from_int64(0)); + BOOST_CHECK_EQUAL(filtered.size(), 1); + + // test token range filtering: non-matching range should return nothing + auto empty = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables( + snapshot_name, ks, table, dc, rack, db::consistency_level::ONE, + dht::token::from_int64(num_iter + 10), dht::token::from_int64(num_iter + 20)); + BOOST_CHECK_EQUAL(empty.size(), 0); + }, db_cfg_ptr); +} + +using namespace sstables; + +future<> backup(cql_test_env& env, sstring endpoint, sstring bucket) { + sharded ctl; + co_await ctl.start(std::ref(env.db()), std::ref(env.get_storage_proxy()), std::ref(env.get_task_manager()), std::ref(env.get_sstorage_manager()), db::snapshot_ctl::config{}); + auto prefix = "/backup"; + + auto task_id = co_await ctl.local().start_backup(endpoint, bucket, prefix, "ks", "cf", "snapshot", false); + auto task = tasks::task_handler{env.get_task_manager().local(), task_id}; + auto status = co_await task.wait_for_task(30s); + BOOST_REQUIRE(status.state == tasks::task_manager::task_state::done); + + co_await ctl.stop(); +} + +future<> check_snapshot_sstables(cql_test_env& env) { + auto& topology = env.get_storage_proxy().local().get_token_metadata_ptr()->get_topology(); + auto dc = topology.get_datacenter(); + auto rack = topology.get_rack(); + auto sstables = co_await env.get_system_distributed_keyspace().local().get_snapshot_sstables("snapshot", "ks", "cf", dc, rack, db::consistency_level::ONE); + + // Check that the sstables in system_distributed.snapshot_sstables match the sstables in the snapshot directory on disk + auto& cf = env.local_db().find_column_family("ks", "cf"); + auto tabledir = tests::table_dir(cf); + auto snapshot_dir = tabledir / sstables::snapshots_dir / "snapshot"; + std::set expected_sstables; + directory_lister lister(snapshot_dir, lister::dir_entry_types::of()); + while (auto de = co_await lister.get()) { + if (!de->name.ends_with("-TOC.txt")) { + continue; + } + expected_sstables.insert(de->name); + } + + BOOST_CHECK_EQUAL(sstables.size(), expected_sstables.size()); + + for (const auto& sstable : sstables) { + BOOST_CHECK(expected_sstables.contains(sstable.toc_name)); + } +} + +SEASTAR_TEST_CASE(test_populate_snapshot_sstables_from_manifests, *boost::unit_test::precondition(tests::has_scylla_test_env)) { + using namespace sstables; + + auto db_cfg_ptr = make_shared(); + db_cfg_ptr->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled); + db_cfg_ptr->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS}); + auto storage_options = make_test_object_storage_options("S3"); + db_cfg_ptr->object_storage_endpoints(make_storage_options_config(storage_options)); + + return do_with_some_data_in_thread({"cf"}, [storage_options = std::move(storage_options)] (cql_test_env& env) { + take_snapshot(env, "ks", "cf", "snapshot").get(); + + auto ep = storage_options.to_map()["endpoint"]; + auto bucket = storage_options.to_map()["bucket"]; + backup(env, ep, bucket).get(); + + BOOST_REQUIRE_THROW(populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), "ks", "cf", ep, bucket, "unexpected_snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get(), std::runtime_error);; + + // populate system_distributed.snapshot_sstables with the content of the snapshot manifest + populate_snapshot_sstables_from_manifests(env.get_sstorage_manager().local(), env.get_system_distributed_keyspace().local(), "ks", "cf", ep, bucket, "snapshot", {"/backup/manifest.json"}, db::consistency_level::ONE).get(); + + check_snapshot_sstables(env).get(); + }, false, db_cfg_ptr, 10); +} diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index 4af5fbb91c..83d1b32077 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -7,6 +7,7 @@ import asyncio import subprocess import tempfile import itertools +import aiohttp import pytest import time @@ -16,6 +17,7 @@ from test.pylib.manager_client import ManagerClient, ServerInfo from test.cluster.util import wait_for_cql_and_get_hosts, get_replication, new_test_keyspace from test.pylib.rest_client import read_barrier from test.pylib.util import unique_name, wait_all +from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas from cassandra.cluster import ConsistencyLevel from collections import defaultdict from test.pylib.util import wait_for @@ -736,6 +738,176 @@ async def do_test_streaming_scopes(build_mode: str, manager: ManagerClient, topo if restored_min_tablet_count == original_min_tablet_count: await check_streaming_directions(logger, servers, topology, host_ids, scope, pro, log_marks) + +@pytest.mark.asyncio +@pytest.mark.parametrize("topology", [ + topo(rf = 1, nodes = 3, racks = 1, dcs = 1), + topo(rf = 2, nodes = 2, racks = 2, dcs = 1), + ]) +async def test_restore_tablets(build_mode: str, manager: ManagerClient, object_storage, topology): + '''Check that restoring of a cluster using tablet-aware restore works''' + + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 10 + tablet_count=5 + tablet_count_for_restore=8 # should be tablet_count rounded up to the power of two + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count_for_restore}, 'max_tablet_count': {tablet_count_for_restore}}};") + + logger.info(f'Restore cluster via {servers[1].ip_addr}') + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[1].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + status = await manager.api.wait_task(servers[1].ip_addr, tid) + assert (status is not None) and (status['state'] == 'done') + + await check_mutation_replicas(cql, manager, servers, range(num_keys), topology, logger, ks, 'test') + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_vs_migration(build_mode: str, manager: ManagerClient, object_storage): + '''Check that restore handles tablets migrating around''' + + topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 10 + tablet_count=4 + tablet_count_for_restore=4 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count_for_restore}, 'max_tablet_count': {tablet_count_for_restore}}};") + + s0_host_id = await manager.get_host_id(servers[0].server_id) + s1_host_id = await manager.get_host_id(servers[1].server_id) + tablet = (await get_all_tablet_replicas(manager, servers[0], ks, 'test'))[0] + current = tablet.replicas[0] + target = s1_host_id if current[0] == s0_host_id else s0_host_id + await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "block_tablet_streaming", False, parameters={'keyspace': ks, 'table': 'test'}) for s in servers]) + migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test", current[0], current[1], target, 0, tablet.last_token)) + + logger.info(f'Restore cluster via {servers[1].ip_addr}') + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[1].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + + await asyncio.gather(*[manager.api.message_injection(s.ip_addr, f"block_tablet_streaming") for s in servers]) + + status = await manager.api.wait_task(servers[1].ip_addr, tid) + assert (status is not None) and (status['state'] == 'done') + + await migration_task + await check_mutation_replicas(cql, manager, servers, range(num_keys), topology, logger, ks, 'test') + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_download_failure(build_mode: str, manager: ManagerClient, object_storage): + '''Check that failure to download an sstable propagates back to API''' + + topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 12 + tablet_count=4 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + await manager.api.enable_injection(servers[1].ip_addr, "fail_download_sstable", one_shot=True) + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}, 'max_tablet_count': {tablet_count}}};") + + logger.info(f'Restore cluster via {servers[0].ip_addr}') + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[0].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + status = await manager.api.wait_task(servers[0].ip_addr, tid) + assert 'state' in status and status['state'] == 'failed' + assert 'error' in status and 'Failed to download' in status['error'] + + +@pytest.mark.asyncio +@pytest.mark.parametrize("target", ['coordinator', 'replica', 'api']) +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_restore_tablets_node_loss_resiliency(build_mode: str, manager: ManagerClient, object_storage, target): + '''Check how restore handler node loss in the middle of operation''' + + topology = topo(rf = 2, nodes = 4, racks = 2, dcs = 1) + servers, host_ids = await create_cluster(topology, manager, logger, object_storage) + log = await manager.server_open_log(servers[0].server_id) + await log.wait_for("raft_topology - start topology coordinator fiber", timeout=10) + + await manager.disable_tablet_balancing() + cql = manager.get_cql() + + num_keys = 24 + tablet_count=8 + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(create_schema(ks, 'test', min_tablet_count=tablet_count)) + insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, value) VALUES (?, ?)") + insert_stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), i)) for i in range(num_keys))) + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + await asyncio.gather(*(do_backup(s, snap_name, f'{s.server_id}/{snap_name}', ks, 'test', object_storage, manager, logger) for s in servers)) + + async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test ( pk text primary key, value int ) WITH tablets = {{'min_tablet_count': {tablet_count}, 'max_tablet_count': {tablet_count}}};") + + await manager.api.enable_injection(servers[2].ip_addr, "pause_tablet_restore", one_shot=True) + log = await manager.server_open_log(servers[2].server_id) + mark = await log.mark() + + manifests = [ f'{s.server_id}/{snap_name}/manifest.json' for s in servers ] + tid = await manager.api.restore_tablets(servers[1].ip_addr, ks, 'test', snap_name, servers[0].datacenter, object_storage.address, object_storage.bucket_name, manifests) + await log.wait_for("pause_tablet_restore: waiting for message", from_mark=mark) + + if target == 'api': + await manager.server_stop(servers[1].server_id) + with pytest.raises(aiohttp.client_exceptions.ClientConnectorError): + await manager.api.wait_task(servers[1].ip_addr, tid) + else: + if target == 'coordinator': + await manager.server_stop(servers[0].server_id) + await manager.api.message_injection(servers[2].ip_addr, "pause_tablet_restore") + elif target == 'replica': + await manager.server_stop(servers[2].server_id) + + status = await manager.api.wait_task(servers[1].ip_addr, tid) + assert status['state'] == 'failed' + + @pytest.mark.asyncio async def test_restore_with_non_existing_sstable(manager: ManagerClient, object_storage): '''Check that restore task fails well when given a non-existing sstable''' diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 970ba8da4e..8eda2c5af3 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -402,6 +402,10 @@ public: return _sys_ks; } + virtual sharded& get_system_distributed_keyspace() override { + return _sys_dist_ks; + } + virtual sharded& get_tablet_allocator() override { return _tablet_allocator; } diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index a31355f9fe..cf6791b63a 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -189,6 +189,8 @@ public: virtual sharded& get_system_keyspace() = 0; + virtual sharded& get_system_distributed_keyspace() = 0; + virtual sharded& get_tablet_allocator() = 0; virtual sharded& get_storage_proxy() = 0; diff --git a/test/lib/test_services.hh b/test/lib/test_services.hh index 074c161137..9410d4bca2 100644 --- a/test/lib/test_services.hh +++ b/test/lib/test_services.hh @@ -69,3 +69,9 @@ struct table_for_tests { void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept; void set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier); }; + +namespace sstables { + +std::vector make_storage_options_config(const data_dictionary::storage_options& so); + +} diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index d0a6e8166e..4eac804c29 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -411,6 +411,23 @@ class ScyllaRESTAPIClient: params['scope'] = scope return await self.client.post_json(f"/storage_service/restore", host=node_ip, params=params, json=sstables) + async def restore_tablets(self, node_ip: str, ks: str, cf: str, snap: str, datacenter: str, endpoint: str, bucket: str, manifests) -> str: + """Restore tablets from a backup location""" + params = { + "keyspace": ks, + "table": cf, + "snapshot": snap + } + backup_location = [ + { + "datacenter": datacenter, + "endpoint": endpoint, + "bucket": bucket, + "manifests": manifests + } + ] + return await self.client.post_json(f"/storage_service/tablets/restore", host=node_ip, params=params, json=backup_location) + async def take_snapshot(self, node_ip: str, ks: str, tag: str, tables: list[str] = None) -> None: """Take keyspace snapshot""" params = { 'kn': ks, 'tag': tag } diff --git a/utils/rjson.hh b/utils/rjson.hh index 2cd43130f2..6fbd193ce5 100644 --- a/utils/rjson.hh +++ b/utils/rjson.hh @@ -70,6 +70,10 @@ public: #include #include #include +#include "utils/UUID.hh" +#include "dht/token.hh" +#include "sstables/types.hh" + #include "seastarx.hh" namespace rjson { @@ -249,7 +253,14 @@ inline sstring to_sstring(const rjson::value& str) { inline std::string to_string(const rjson::value& str) { return std::string(str.GetString(), str.GetStringLength()); } - +// Helper for conversion to dht::token +inline dht::token to_token(const rjson::value& v) { + return dht::token::from_int64(v.GetInt64()); +} +// Helper for conversion to sstables::sstable_id +inline sstables::sstable_id to_sstable_id(const rjson::value& v) { + return sstables::sstable_id(utils::UUID(rjson::to_string_view(v))); +} // Returns a pointer to JSON member if it exists, nullptr otherwise rjson::value* find(rjson::value& value, std::string_view name); const rjson::value* find(const rjson::value& value, std::string_view name);