diff --git a/locator/tablets.cc b/locator/tablets.cc index 793d1d8c69..7889c7dc0e 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -90,6 +90,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage return write_replica_set_selector::previous; case tablet_transition_stage::end_migration: return write_replica_set_selector::next; + case tablet_transition_stage::restore: + return write_replica_set_selector::previous; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -123,6 +125,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) return read_replica_set_selector::previous; case tablet_transition_stage::end_migration: return read_replica_set_selector::next; + case tablet_transition_stage::restore: + return read_replica_set_selector::previous; } on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast(stage))); } @@ -195,6 +199,13 @@ tablet_migration_streaming_info get_migration_streaming_info(const locator::topo result.written_to = std::move(s); return result; } + case tablet_transition_kind::restore: { + auto s = std::unordered_set(tinfo.replicas.begin(), tinfo.replicas.end()); + result.stream_weight = locator::tablet_migration_stream_weight_restore; + result.read_from = s; + result.written_to = std::move(s); + return result; + } } on_internal_error(tablet_logger, format("Invalid tablet transition kind: {}", static_cast(trinfo.transition))); } @@ -850,6 +861,7 @@ static const std::unordered_map tablet_transit {tablet_transition_stage::cleanup_target, "cleanup_target"}, {tablet_transition_stage::revert_migration, "revert_migration"}, {tablet_transition_stage::end_migration, "end_migration"}, + {tablet_transition_stage::restore, "restore"}, }; static const std::unordered_map tablet_transition_stage_from_name = std::invoke([] { @@ -883,6 +895,7 @@ static const std::unordered_map tablet_transiti {tablet_transition_kind::rebuild, "rebuild"}, {tablet_transition_kind::rebuild_v2, "rebuild_v2"}, {tablet_transition_kind::repair, "repair"}, + {tablet_transition_kind::restore, "restore"}, }; static const std::unordered_map tablet_transition_kind_from_name = std::invoke([] { @@ -1129,6 +1142,8 @@ std::optional load_stats::get_tablet_size_in_transition(host_id host, } case tablet_transition_kind::intranode_migration: [[fallthrough]]; + case tablet_transition_kind::restore: + [[fallthrough]]; case tablet_transition_kind::repair: break; } diff --git a/locator/tablets.hh b/locator/tablets.hh index a9244c22da..763414415b 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -330,6 +330,7 @@ enum class tablet_transition_stage { end_migration, repair, end_repair, + restore, }; enum class tablet_transition_kind { @@ -352,6 +353,9 @@ enum class tablet_transition_kind { // Repair the tablet replicas repair, + + // Download sstables for tablet + restore, }; tablet_transition_kind choose_rebuild_transition_kind(const gms::feature_service& features); @@ -415,6 +419,7 @@ tablet_transition_info migration_to_transition_info(const tablet_info&, const ta /// Describes streaming required for a given tablet transition. constexpr int tablet_migration_stream_weight_default = 1; constexpr int tablet_migration_stream_weight_repair = 2; +constexpr int tablet_migration_stream_weight_restore = 2; struct tablet_migration_streaming_info { std::unordered_set read_from; std::unordered_set written_to; diff --git a/service/storage_service.cc b/service/storage_service.cc index aecfe9b57e..69fc4338cc 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5637,6 +5637,71 @@ future<> storage_service::del_tablet_replica(table_id table, dht::token token, l }); } +future<> storage_service::restore_tablets(table_id table, sstring snap_name, sstring endpoint, sstring bucket) { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // group0 is only set on shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.restore_tablets(table, snap_name, endpoint, bucket); + }); + } + + // Holding tm around transit_tablet() can lead to deadlock, if state machine is busy + // with something which executes a barrier. The barrier will wait for tm to die, and + // transit_tablet() will wait for the barrier to finish. + // Due to that, we first collect tablet boundaries, then prepare and submit transition + // mutations. Since this code is called with equal min:max tokens set for the table, + // the tablet map cannot split and merge and, thus, the static vector of tokens should + // map to correct tablet boundaries throughout the whole operation + utils::chunked_vector> tablets; + { + const auto tm = get_token_metadata_ptr(); + const auto& tmap = tm->tablets().get_tablet_map(table); + co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) { + auto last_token = tmap.get_last_token(tid); + tablets.push_back(std::make_pair(tid, last_token)); + return make_ready_future<>(); + }); + } + + auto wait_one_transition = [this] (locator::global_tablet_id gid) { + return _topology_state_machine.event.wait([this, gid] { + auto& tmap = get_token_metadata().tablets().get_tablet_map(gid.table); + return !tmap.get_tablet_transition_info(gid.tablet); + }); + }; + + std::vector> wait; + co_await coroutine::parallel_for_each(tablets, [&] (const auto& tablet) -> future<> { + auto [ tid, last_token ] = tablet; + auto gid = locator::global_tablet_id{table, tid}; + while (true) { + auto success = co_await try_transit_tablet(table, last_token, [&] (const locator::tablet_map& tmap, api::timestamp_type write_timestamp) { + utils::chunked_vector updates; + updates.emplace_back(tablet_mutation_builder_for_base_table(write_timestamp, table) + .set_stage(last_token, locator::tablet_transition_stage::restore) + .set_new_replicas(last_token, tmap.get_tablet_info(tid).replicas) + .set_restore_config(last_token, locator::restore_config{ snap_name, endpoint, bucket }) + .set_transition(last_token, locator::tablet_transition_kind::restore) + .build()); + + sstring reason = format("Restoring tablet {}", gid); + return std::make_tuple(std::move(updates), std::move(reason)); + }); + if (success) { + wait.emplace_back(wait_one_transition(gid)); + break; + } + slogger.debug("Tablet is in transition, waiting"); + co_await wait_one_transition(gid); + } + }); + + co_await when_all_succeed(wait.begin(), wait.end()).discard_result(); + slogger.info("Restoring {} finished", table); +} + future storage_service::load_stats_for_tablet_based_tables() { auto holder = _async_gate.hold(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 70764941ea..3729a8c3cc 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -971,6 +971,7 @@ public: future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); + future<> restore_tablets(table_id, sstring snap_name, sstring endpoint, sstring bucket); future<> set_tablet_balancing_enabled(bool); future<> await_topology_quiesced(); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index fb1ad7c04c..61764b8f2d 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -1014,6 +1014,8 @@ private: return true; case tablet_transition_stage::repair: return true; + case tablet_transition_stage::restore: + return false; case tablet_transition_stage::end_repair: return false; case tablet_transition_stage::write_both_read_new: diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 458e603eb9..4698de92ef 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -63,6 +63,7 @@ #include "utils/stall_free.hh" #include "utils/to_string.hh" #include "service/endpoint_lifecycle_subscriber.hh" +#include "sstables_loader.hh" #include "idl/join_node.dist.hh" #include "idl/storage_service.dist.hh" @@ -72,6 +73,7 @@ #include "utils/updateable_value.hh" #include "repair/repair.hh" #include "idl/repair.dist.hh" +#include "idl/sstables_loader.dist.hh" #include "service/topology_coordinator.hh" @@ -1556,6 +1558,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber background_action_holder cleanup; background_action_holder repair; background_action_holder repair_update_compaction_ctrl; + background_action_holder restore; std::unordered_map barriers; // Record the repair_time returned by the repair_tablet rpc call db_clock::time_point repair_time; @@ -2328,6 +2331,33 @@ class topology_coordinator : public endpoint_lifecycle_subscriber } } break; + case locator::tablet_transition_stage::restore: { + if (!trinfo.restore_cfg.has_value()) { + on_internal_error(rtlogger, format("Cannot handle restore transition without config for tablet {}", gid)); + } + if (action_failed(tablet_state.restore)) { + rtlogger.debug("Clearing restore transition for {} due to error", gid); + updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build()); + break; + } + if (advance_in_background(gid, tablet_state.restore, "restore", [this, gid, &tmap] () -> future<> { + auto& tinfo = tmap.get_tablet_info(gid.tablet); + auto replicas = tinfo.replicas; + + rtlogger.info("Restoring tablet={} on {}", gid, replicas); + co_await coroutine::parallel_for_each(replicas, [this, gid] (locator::tablet_replica r) -> future<> { + auto dst = raft::server_id(r.host.uuid()); + if (!is_excluded(dst)) { + co_await ser::sstables_loader_rpc_verbs::send_restore_tablet(&_messaging, r.host, dst, gid); + rtlogger.debug("Tablet {} restored on {}", gid, r.host); + } + }); + })) { + rtlogger.debug("Clearing restore transition for {}", gid); + updates.emplace_back(get_mutation_builder().del_transition(last_token).del_restore_config(last_token).build()); + } + } + break; case locator::tablet_transition_stage::end_repair: { if (do_barrier()) { if (tablet_state.session_id.uuid().is_null()) { @@ -2514,6 +2544,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber break; case locator::tablet_transition_kind::repair: [[fallthrough]]; + case locator::tablet_transition_kind::restore: + [[fallthrough]]; case locator::tablet_transition_kind::intranode_migration: break; } diff --git a/sstables_loader.cc b/sstables_loader.cc index f98bccae4d..d05523788c 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -1172,12 +1172,13 @@ public: protected: virtual future<> run() override { - (void)_loader; (void)_tid; (void)_snap_name; (void)_endpoint; (void)_bucket; - co_return; // To be implemented + auto& loader = _loader.local(); + co_await loader._ss.local().restore_tablets(_tid, _snap_name, _endpoint, _bucket); } }; future sstables_loader::restore_tablets(table_id tid, sstring keyspace, sstring table, sstring snap_name, sstring endpoint, sstring bucket, utils::chunked_vector manifests) { + co_await populate_snapshot_sstables_from_manifests(_storage_manager, _sys_dist_ks, keyspace, table, endpoint, bucket, snap_name, std::move(manifests)); auto task = co_await _task_manager_module->make_and_start_task({}, container(), keyspace, tid, std::move(snap_name), std::move(endpoint), std::move(bucket)); co_return task->id(); }