tablets: Implement tablet-aware cluster-wide restore

This patch adds

- Changes in sstables_loader::restore_tablets() method

It populates the system_distributed_keyspace.snapshot_sstables table
with the information read from the manifest

- Implementation of tablet_restore_task_impl::run() method

It emplaces a bunch of tablet migrations with "restore" kind

- Topology coordinator handling of tablet_transition_stage::restore

When seen, the coordinator calls RESTORE_TABLET RPC against all tablet
replicas

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2026-02-20 13:02:42 +03:00
parent 39ae59da9c
commit 17384d42e3
7 changed files with 123 additions and 2 deletions

View File

@@ -90,6 +90,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage
return write_replica_set_selector::previous;
case tablet_transition_stage::end_migration:
return write_replica_set_selector::next;
case tablet_transition_stage::restore:
return write_replica_set_selector::previous;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
@@ -123,6 +125,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage)
return read_replica_set_selector::previous;
case tablet_transition_stage::end_migration:
return read_replica_set_selector::next;
case tablet_transition_stage::restore:
return read_replica_set_selector::previous;
}
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
}
@@ -195,6 +199,13 @@ tablet_migration_streaming_info get_migration_streaming_info(const locator::topo
result.written_to = std::move(s);
return result;
}
case tablet_transition_kind::restore: {
auto s = std::unordered_set<tablet_replica>(tinfo.replicas.begin(), tinfo.replicas.end());
result.stream_weight = locator::tablet_migration_stream_weight_restore;
result.read_from = s;
result.written_to = std::move(s);
return result;
}
}
on_internal_error(tablet_logger, format("Invalid tablet transition kind: {}", static_cast<int>(trinfo.transition)));
}
@@ -850,6 +861,7 @@ static const std::unordered_map<tablet_transition_stage, sstring> tablet_transit
{tablet_transition_stage::cleanup_target, "cleanup_target"},
{tablet_transition_stage::revert_migration, "revert_migration"},
{tablet_transition_stage::end_migration, "end_migration"},
{tablet_transition_stage::restore, "restore"},
};
static const std::unordered_map<sstring, tablet_transition_stage> tablet_transition_stage_from_name = std::invoke([] {
@@ -883,6 +895,7 @@ static const std::unordered_map<tablet_transition_kind, sstring> tablet_transiti
{tablet_transition_kind::rebuild, "rebuild"},
{tablet_transition_kind::rebuild_v2, "rebuild_v2"},
{tablet_transition_kind::repair, "repair"},
{tablet_transition_kind::restore, "restore"},
};
static const std::unordered_map<sstring, tablet_transition_kind> tablet_transition_kind_from_name = std::invoke([] {
@@ -1129,6 +1142,8 @@ std::optional<uint64_t> load_stats::get_tablet_size_in_transition(host_id host,
}
case tablet_transition_kind::intranode_migration:
[[fallthrough]];
case tablet_transition_kind::restore:
[[fallthrough]];
case tablet_transition_kind::repair:
break;
}

View File

@@ -330,6 +330,7 @@ enum class tablet_transition_stage {
end_migration,
repair,
end_repair,
restore,
};
enum class tablet_transition_kind {
@@ -352,6 +353,9 @@ enum class tablet_transition_kind {
// Repair the tablet replicas
repair,
// Download sstables for tablet
restore,
};
tablet_transition_kind choose_rebuild_transition_kind(const gms::feature_service& features);
@@ -415,6 +419,7 @@ tablet_transition_info migration_to_transition_info(const tablet_info&, const ta
/// Describes streaming required for a given tablet transition.
constexpr int tablet_migration_stream_weight_default = 1;
constexpr int tablet_migration_stream_weight_repair = 2;
constexpr int tablet_migration_stream_weight_restore = 2;
struct tablet_migration_streaming_info {
std::unordered_set<tablet_replica> read_from;
std::unordered_set<tablet_replica> written_to;

View File

@@ -5637,6 +5637,71 @@ future<> storage_service::del_tablet_replica(table_id table, dht::token token, l
});
}
future<> storage_service::restore_tablets(table_id table, sstring snap_name, sstring endpoint, sstring bucket) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.restore_tablets(table, snap_name, endpoint, bucket);
});
}
// Holding tm around transit_tablet() can lead to deadlock, if state machine is busy
// with something which executes a barrier. The barrier will wait for tm to die, and
// transit_tablet() will wait for the barrier to finish.
// Due to that, we first collect tablet boundaries, then prepare and submit transition
// mutations. Since this code is called with equal min:max tokens set for the table,
// the tablet map cannot split and merge and, thus, the static vector of tokens should
// map to correct tablet boundaries throughout the whole operation
utils::chunked_vector<std::pair<locator::tablet_id, dht::token>> tablets;
{
const auto tm = get_token_metadata_ptr();
const auto& tmap = tm->tablets().get_tablet_map(table);
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto last_token = tmap.get_last_token(tid);
tablets.push_back(std::make_pair(tid, last_token));
return make_ready_future<>();
});
}
auto wait_one_transition = [this] (locator::global_tablet_id gid) {
return _topology_state_machine.event.wait([this, gid] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(gid.table);
return !tmap.get_tablet_transition_info(gid.tablet);
});
};
std::vector<future<>> wait;
co_await coroutine::parallel_for_each(tablets, [&] (const auto& tablet) -> future<> {
auto [ tid, last_token ] = tablet;
auto gid = locator::global_tablet_id{table, tid};
while (true) {
auto success = co_await try_transit_tablet(table, last_token, [&] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> updates;
updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table)
.set_stage(last_token, locator::tablet_transition_stage::restore)
.set_new_replicas(last_token, tmap.get_tablet_info(tid).replicas)
.set_restore_config(last_token, locator::restore_config{ snap_name, endpoint, bucket })
.set_transition(last_token, locator::tablet_transition_kind::restore)
.build());
sstring reason = format("Restoring tablet {}", gid);
return std::make_tuple(std::move(updates), std::move(reason));
});
if (success) {
wait.emplace_back(wait_one_transition(gid));
break;
}
slogger.debug("Tablet is in transition, waiting");
co_await wait_one_transition(gid);
}
});
co_await when_all_succeed(wait.begin(), wait.end()).discard_result();
slogger.info("Restoring {} finished", table);
}
future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables() {
auto holder = _async_gate.hold();

View File

@@ -971,6 +971,7 @@ public:
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> restore_tablets(table_id, sstring snap_name, sstring endpoint, sstring bucket);
future<> set_tablet_balancing_enabled(bool);
future<> await_topology_quiesced();

View File

@@ -1014,6 +1014,8 @@ private:
return true;
case tablet_transition_stage::repair:
return true;
case tablet_transition_stage::restore:
return false;
case tablet_transition_stage::end_repair:
return false;
case tablet_transition_stage::write_both_read_new:

View File

@@ -63,6 +63,7 @@
#include "utils/stall_free.hh"
#include "utils/to_string.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "sstables_loader.hh"
#include "idl/join_node.dist.hh"
#include "idl/storage_service.dist.hh"
@@ -72,6 +73,7 @@
#include "utils/updateable_value.hh"
#include "repair/repair.hh"
#include "idl/repair.dist.hh"
#include "idl/sstables_loader.dist.hh"
#include "service/topology_coordinator.hh"
@@ -1556,6 +1558,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
background_action_holder cleanup;
background_action_holder repair;
background_action_holder repair_update_compaction_ctrl;
background_action_holder restore;
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
@@ -2328,6 +2331,33 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
}
}
break;
case locator::tablet_transition_stage::restore: {
if (!trinfo.restore_cfg.has_value()) {
on_internal_error(rtlogger, format("Cannot handle restore transition without config for tablet {}", gid));
}
if (action_failed(tablet_state.restore)) {
rtlogger.debug("Clearing restore transition for {} due to error", gid);
updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build());
break;
}
if (advance_in_background(gid, tablet_state.restore, "restore", [this, gid, &tmap] () -> future<> {
auto& tinfo = tmap.get_tablet_info(gid.tablet);
auto replicas = tinfo.replicas;
rtlogger.info("Restoring tablet={} on {}", gid, replicas);
co_await coroutine::parallel_for_each(replicas, [this, gid] (locator::tablet_replica r) -> future<> {
auto dst = raft::server_id(r.host.uuid());
if (!is_excluded(dst)) {
co_await ser::sstables_loader_rpc_verbs::send_restore_tablet(&_messaging, r.host, dst, gid);
rtlogger.debug("Tablet {} restored on {}", gid, r.host);
}
});
})) {
rtlogger.debug("Clearing restore transition for {}", gid);
updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build());
}
}
break;
case locator::tablet_transition_stage::end_repair: {
if (do_barrier()) {
if (tablet_state.session_id.uuid().is_null()) {
@@ -2514,6 +2544,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
break;
case locator::tablet_transition_kind::repair:
[[fallthrough]];
case locator::tablet_transition_kind::restore:
[[fallthrough]];
case locator::tablet_transition_kind::intranode_migration:
break;
}

View File

@@ -1172,12 +1172,13 @@ public:
protected:
virtual future<> run() override {
(void)_loader; (void)_tid; (void)_snap_name; (void)_endpoint; (void)_bucket;
co_return; // To be implemented
auto& loader = _loader.local();
co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket);
}
};
future<tasks::task_id> sstables_loader::restore_tablets(table_id tid, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector<sstring> manifests) {
co_await populate_snapshot_sstables_from_manifests(_storage_manager, _sys_dist_ks, keyspace, table, endpoint, bucket, snap_name, std::move(manifests));
auto task = co_await _task_manager_module->make_and_start_task<tablet_restore_task_impl>({}, container(), keyspace, tid, std::move(snap_name), std::move(endpoint), std::move(bucket));
co_return task->id();
}