From 5545289bfae0508fd72b655ddd2fef6c5dbe60a2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 27 Jan 2025 17:46:41 +0800 Subject: [PATCH] repair: Introduce Host and DC filter support Currently, the tablet repair scheduler repairs all replicas of a tablet. It does not support hosts or DCs selection. It should be enough for most cases. However, users might still want to limit the repair to certain hosts or DCs in production. #21985 added the preparation work to add the config options for the selection. This patch adds the hosts or DCs selection support. Fixes #22417 --- api/api-doc/storage_service.json | 16 +++++++++ api/storage_service.cc | 16 ++++++++- locator/tablets.cc | 56 ++++++++++++++++++++++++++++---- locator/tablets.hh | 12 ++++--- repair/repair.cc | 36 ++++++++++++++++++-- replica/tablets.cc | 8 ++--- service/storage_service.cc | 10 +++--- service/storage_service.hh | 2 +- service/topology_coordinator.cc | 33 +++++++++++++++++-- 9 files changed, 164 insertions(+), 25 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index dda9d39ce4..1c2c6bee9c 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2873,6 +2873,22 @@ "type":"string", "paramType":"query" }, + { + "name":"hosts_filter", + "description":"Repair replicas listed in the comma-separated host_id list.", + "required":false, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"dcs_filter", + "description":"Repair replicas listed in the comma-separated DC list", + "required":false, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, { "name":"await_completion", "description":"Set true to wait for the repair to complete. Set false to skip waiting for the repair to complete. When the option is not provided, it defaults to false.", diff --git a/api/storage_service.cc b/api/storage_service.cc index f37a6b717b..902a558104 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1702,8 +1702,22 @@ rest_repair_tablet(http_context& ctx, sharded& ss, std } else { tokens_variant = tokens; } + auto hosts = req->get_query_param("hosts_filter"); + auto dcs = req->get_query_param("dcs_filter"); - auto res = co_await ss.local().add_repair_tablet_request(table_id, tokens_variant, await_completion); + std::unordered_set hosts_filter; + if (!hosts.empty()) { + std::string delim = ","; + hosts_filter = std::ranges::views::split(hosts, delim) | std::views::transform([](auto&& h) { + try { + return locator::host_id(utils::UUID(std::string_view{h})); + } catch (...) { + throw httpd::bad_param_exception(fmt::format("Wrong host_id format {}", h)); + } + }) | std::ranges::to(); + } + auto dcs_filter = locator::tablet_task_info::deserialize_repair_dcs_filter(dcs); + auto res = co_await ss.local().add_repair_tablet_request(table_id, tokens_variant, hosts_filter, dcs_filter, await_completion); co_return json::json_return_type(res); } diff --git a/locator/tablets.cc b/locator/tablets.cc index b39dd65dbf..a18199307a 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -1124,8 +1124,8 @@ auto fmt::formatter::format(const locator::tablet_tas {"request_time", fmt::to_string(db_clock::to_time_t(info.request_time))}, {"sched_nr", fmt::to_string(info.sched_nr)}, {"sched_time", fmt::to_string(db_clock::to_time_t(info.sched_time))}, - {"repair_hosts_filter", info.repair_hosts_filter}, - {"repair_dcs_filter", info.repair_dcs_filter}, + {"repair_hosts_filter", locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)}, + {"repair_dcs_filter", locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)}, }; return fmt::format_to(ctx.out(), "{}", rjson::print(rjson::from_string_map(ret))); }; @@ -1138,16 +1138,60 @@ bool locator::tablet_task_info::is_user_repair_request() const { return request_type == locator::tablet_task_type::user_repair; } -locator::tablet_task_info locator::tablet_task_info::make_auto_repair_request() { +locator::tablet_task_info locator::tablet_task_info::make_auto_repair_request(std::unordered_set hosts_filter, std::unordered_set dcs_filter) { long sched_nr = 0; auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID()); - return locator::tablet_task_info{locator::tablet_task_type::auto_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()}; + return locator::tablet_task_info{locator::tablet_task_type::auto_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter}; } -locator::tablet_task_info locator::tablet_task_info::make_user_repair_request() { +locator::tablet_task_info locator::tablet_task_info::make_user_repair_request(std::unordered_set hosts_filter, std::unordered_set dcs_filter) { long sched_nr = 0; auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID()); - return locator::tablet_task_info{locator::tablet_task_type::user_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()}; + return locator::tablet_task_info{locator::tablet_task_type::user_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter}; +} + +sstring locator::tablet_task_info::serialize_repair_hosts_filter(std::unordered_set filter) { + sstring res = ""; + bool first = true; + for (const auto& host : filter) { + if (!std::exchange(first, false)) { + res += ","; + } + res += host.to_sstring(); + } + return res; +} + +sstring locator::tablet_task_info::serialize_repair_dcs_filter(std::unordered_set filter) { + sstring res = ""; + bool first = true; + for (const auto& dc : filter) { + if (!std::exchange(first, false)) { + res += ","; + } + res += dc; + } + return res; +} + +std::unordered_set locator::tablet_task_info::deserialize_repair_hosts_filter(sstring filter) { + if (filter.empty()) { + return {}; + } + sstring delim = ","; + return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) { + return locator::host_id(utils::UUID(std::string_view{h})); + }) | std::ranges::to(); +} + +std::unordered_set locator::tablet_task_info::deserialize_repair_dcs_filter(sstring filter) { + if (filter.empty()) { + return {}; + } + sstring delim = ","; + return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) { + return sstring{std::string_view{h}}; + }) | std::ranges::to(); } locator::tablet_task_info locator::tablet_task_info::make_migration_request() { diff --git a/locator/tablets.hh b/locator/tablets.hh index a7ef2b77fe..a52f1de2af 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -162,17 +162,21 @@ struct tablet_task_info { db_clock::time_point request_time; int64_t sched_nr = 0; db_clock::time_point sched_time; - sstring repair_hosts_filter; - sstring repair_dcs_filter; + std::unordered_set repair_hosts_filter; + std::unordered_set repair_dcs_filter; bool operator==(const tablet_task_info&) const = default; bool is_valid() const; bool is_user_repair_request() const; - static tablet_task_info make_user_repair_request(); - static tablet_task_info make_auto_repair_request(); + static tablet_task_info make_user_repair_request(std::unordered_set hosts_filter = {}, std::unordered_set dcs_filter = {}); + static tablet_task_info make_auto_repair_request(std::unordered_set hosts_filter = {}, std::unordered_set dcs_filter = {}); static tablet_task_info make_migration_request(); static tablet_task_info make_intranode_migration_request(); static tablet_task_info make_split_request(); static tablet_task_info make_merge_request(); + static sstring serialize_repair_hosts_filter(std::unordered_set filter); + static sstring serialize_repair_dcs_filter(std::unordered_set filter); + static std::unordered_set deserialize_repair_hosts_filter(sstring filter); + static std::unordered_set deserialize_repair_dcs_filter(sstring filter); }; /// Stores information about a single tablet. diff --git a/repair/repair.cc b/repair/repair.cc index 9350fbd68d..7a7428336b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -2446,10 +2446,35 @@ future repair_service::repair_tablet(gms::gossip_address_m auto replicas = info.replicas; std::vector nodes; std::vector shards; - shard_id master_shard_id; + std::optional master_shard_id; + auto& topology = guard.get_token_metadata()->get_topology(); + auto hosts_filter = info.repair_task_info.repair_hosts_filter; + auto dcs_filter = info.repair_task_info.repair_dcs_filter; for (auto& r : replicas) { auto shard = r.shard; if (r.host != myhostid) { + if (!hosts_filter.empty()) { + auto dc = topology.get_datacenter(r.host); + if (!hosts_filter.contains(r.host)) { + rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} skipped", + id.uuid(), r.host, dc, hosts_filter, dcs_filter); + continue; + } else { + rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} ok", + id.uuid(), r.host, dc, hosts_filter, dcs_filter); + } + } + if (!dcs_filter.empty()) { + auto dc = topology.get_datacenter(r.host); + if (!dcs_filter.contains(dc)) { + rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} skipped", + id.uuid(), r.host, dc, hosts_filter, dcs_filter); + continue; + } else { + rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} ok", + id.uuid(), r.host, dc, hosts_filter, dcs_filter); + } + } nodes.push_back(r.host); shards.push_back(shard); } else { @@ -2457,10 +2482,17 @@ future repair_service::repair_tablet(gms::gossip_address_m } } + if (nodes.empty() || !master_shard_id) { + rlogger.info("repair[{}]: Skipped tablet repair for table={}.{} range={} replicas={} global_tablet_id={} hosts_filter={} dcs_filter={}", + id.uuid(), keyspace_name, table_name, range, replicas, gid, hosts_filter, dcs_filter); + auto flush_time = gc_clock::time_point(); + co_return flush_time; + } + std::vector task_metas; auto ranges_parallelism = std::nullopt; auto start = std::chrono::steady_clock::now(); - task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, table_id, master_shard_id, range, repair_neighbors(nodes, shards), replicas}); + task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, table_id, *master_shard_id, range, repair_neighbors(nodes, shards), replicas}); auto task_impl_ptr = seastar::make_shared(_repair_module, id, keyspace_name, global_tablet_repair_task_info.id, table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism); task_impl_ptr->sched_by_scheduler = true; auto task = co_await _repair_module->make_task(task_impl_ptr, global_tablet_repair_task_info); diff --git a/replica/tablets.cc b/replica/tablets.cc index 2f7ef42e83..1b6b813df9 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -100,8 +100,8 @@ data_value tablet_task_info_to_data_value(const locator::tablet_task_info& info) data_value(info.request_time), data_value(info.sched_nr), data_value(info.sched_time), - data_value(info.repair_hosts_filter), - data_value(info.repair_dcs_filter), + data_value(locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)), + data_value(locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)), }); return result; }; @@ -331,8 +331,8 @@ locator::tablet_task_info tablet_task_info_from_cell(const data_value& v) { value_cast(dv[2]), value_cast(dv[3]), value_cast(dv[4]), - value_cast(dv[5]), - value_cast(dv[6]), + locator::tablet_task_info::deserialize_repair_hosts_filter(value_cast(dv[5])), + locator::tablet_task_info::deserialize_repair_dcs_filter(value_cast(dv[6])), }; return result; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 9beec5f139..8bc5c4434d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6273,13 +6273,14 @@ future storage_service::exec_tablet_update(service::group0_guard guard, st // Repair the tablets contain the tokens and wait for the repair to finish // This is used to run a manual repair requested by user from the restful API. -future> storage_service::add_repair_tablet_request(table_id table, std::variant, all_tokens_tag> tokens_variant, bool await_completion) { +future> storage_service::add_repair_tablet_request(table_id table, std::variant, all_tokens_tag> tokens_variant, + std::unordered_set hosts_filter, std::unordered_set dcs_filter, bool await_completion) { auto holder = _async_gate.hold(); if (this_shard_id() != 0) { // group0 is only set on shard 0. co_return co_await container().invoke_on(0, [&] (auto& ss) { - return ss.add_repair_tablet_request(table, std::move(tokens_variant), await_completion); + return ss.add_repair_tablet_request(table, std::move(tokens_variant), std::move(hosts_filter), std::move(dcs_filter), await_completion); }); } @@ -6293,11 +6294,12 @@ future> storage_service::add_repair_tablet_ throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet"); } - auto repair_task_info = locator::tablet_task_info::make_user_repair_request(); + auto repair_task_info = locator::tablet_task_info::make_user_repair_request(hosts_filter, dcs_filter); auto res = std::unordered_map{{sstring("tablet_task_id"), repair_task_info.tablet_task_id.to_sstring()}}; auto start = std::chrono::steady_clock::now(); - slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={}", table, tokens, all_tokens, repair_task_info.tablet_task_id); + slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} hosts_filter={} dcs_filter={}", + table, tokens, all_tokens, repair_task_info.tablet_task_id, hosts_filter, dcs_filter); while (true) { auto guard = co_await get_guard_for_tablet_update(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 056722be1c..95ccaee394 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -933,7 +933,7 @@ private: future exec_tablet_update(service::group0_guard guard, std::vector updates, sstring reason); public: struct all_tokens_tag {}; - future> add_repair_tablet_request(table_id table, std::variant, all_tokens_tag> tokens_variant, bool await_completion); + future> add_repair_tablet_request(table_id table, std::variant, all_tokens_tag> tokens_variant, std::unordered_set hosts_filter, std::unordered_set dcs_filter, bool await_completion); future<> del_repair_tablet_request(table_id table, locator::tablet_task_id); future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 794a63753d..9fc5f23474 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1455,9 +1455,32 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_return; } auto sched_time = tinfo.repair_task_info.sched_time; - auto primary = tmap.get_primary_replica(gid.tablet); - auto dst = primary.host; auto tablet = gid; + auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter; + auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter; + const auto& topo = _db.get_token_metadata().get_topology(); + std::optional dst_opt; + if (hosts_filter.empty() && dcs_filter.empty()) { + auto primary = tmap.get_primary_replica(gid.tablet); + dst_opt = primary.host; + } else { + for (auto& replica : tinfo.replicas) { + if (!hosts_filter.empty() && !hosts_filter.contains(replica.host)) { + continue; + } + auto dc = topo.get_datacenter(replica.host); + if (!dcs_filter.empty() && !dcs_filter.contains(dc)) { + continue; + } + dst_opt = replica.host; + break; + } + } + if (!dst_opt) { + throw std::runtime_error(fmt::format("Could not find host to perform repair tablet={} replica={} hosts_filter={} dcs_filter={}", + tablet, tinfo.replicas, hosts_filter, dcs_filter)); + } + auto dst = dst_opt.value(); rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid); auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging, dst, _as, raft::server_id(dst.uuid()), gid); @@ -1469,12 +1492,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { })) { auto& tinfo = tmap.get_tablet_info(gid.tablet); bool valid = tinfo.repair_task_info.is_valid(); + auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter; + auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter; + bool is_filter_off = hosts_filter.empty() && dcs_filter.empty(); rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::end_repair); auto update = get_mutation_builder() .set_stage(last_token, locator::tablet_transition_stage::end_repair) .del_repair_task_info(last_token) .del_session(last_token); - if (valid) { + // Skip update repair time in case hosts filter or dcs filter is set. + if (valid && is_filter_off) { auto sched_time = tinfo.repair_task_info.sched_time; auto time = tablet_state.repair_time; rtlogger.debug("Set tablet repair time sched_time={} return_time={} set_time={}",