repair: Introduce Host and DC filter support

Currently, the tablet repair scheduler repairs all replicas of a tablet.
It does not support hosts or DCs selection. It should be enough for most
cases. However, users might still want to limit the repair to certain
hosts or DCs in production. #21985 added the preparation work to add the
config options for the selection. This patch adds the hosts or DCs
selection support.

Fixes #22417
This commit is contained in:
Asias He
2025-01-27 17:46:41 +08:00
committed by Aleksandra Martyniuk
parent 4c75701756
commit 5545289bfa
9 changed files with 164 additions and 25 deletions

View File

@@ -2873,6 +2873,22 @@
"type":"string",
"paramType":"query"
},
{
"name":"hosts_filter",
"description":"Repair replicas listed in the comma-separated host_id list.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"dcs_filter",
"description":"Repair replicas listed in the comma-separated DC list",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"await_completion",
"description":"Set true to wait for the repair to complete. Set false to skip waiting for the repair to complete. When the option is not provided, it defaults to false.",

View File

@@ -1702,8 +1702,22 @@ rest_repair_tablet(http_context& ctx, sharded<service::storage_service>& ss, std
} else {
tokens_variant = tokens;
}
auto hosts = req->get_query_param("hosts_filter");
auto dcs = req->get_query_param("dcs_filter");
auto res = co_await ss.local().add_repair_tablet_request(table_id, tokens_variant, await_completion);
std::unordered_set<locator::host_id> hosts_filter;
if (!hosts.empty()) {
std::string delim = ",";
hosts_filter = std::ranges::views::split(hosts, delim) | std::views::transform([](auto&& h) {
try {
return locator::host_id(utils::UUID(std::string_view{h}));
} catch (...) {
throw httpd::bad_param_exception(fmt::format("Wrong host_id format {}", h));
}
}) | std::ranges::to<std::unordered_set>();
}
auto dcs_filter = locator::tablet_task_info::deserialize_repair_dcs_filter(dcs);
auto res = co_await ss.local().add_repair_tablet_request(table_id, tokens_variant, hosts_filter, dcs_filter, await_completion);
co_return json::json_return_type(res);
}

View File

@@ -1124,8 +1124,8 @@ auto fmt::formatter<locator::tablet_task_info>::format(const locator::tablet_tas
{"request_time", fmt::to_string(db_clock::to_time_t(info.request_time))},
{"sched_nr", fmt::to_string(info.sched_nr)},
{"sched_time", fmt::to_string(db_clock::to_time_t(info.sched_time))},
{"repair_hosts_filter", info.repair_hosts_filter},
{"repair_dcs_filter", info.repair_dcs_filter},
{"repair_hosts_filter", locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)},
{"repair_dcs_filter", locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)},
};
return fmt::format_to(ctx.out(), "{}", rjson::print(rjson::from_string_map(ret)));
};
@@ -1138,16 +1138,60 @@ bool locator::tablet_task_info::is_user_repair_request() const {
return request_type == locator::tablet_task_type::user_repair;
}
locator::tablet_task_info locator::tablet_task_info::make_auto_repair_request() {
locator::tablet_task_info locator::tablet_task_info::make_auto_repair_request(std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter) {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::auto_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
return locator::tablet_task_info{locator::tablet_task_type::auto_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter};
}
locator::tablet_task_info locator::tablet_task_info::make_user_repair_request() {
locator::tablet_task_info locator::tablet_task_info::make_user_repair_request(std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter) {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::user_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
return locator::tablet_task_info{locator::tablet_task_type::user_repair, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point(), hosts_filter, dcs_filter};
}
sstring locator::tablet_task_info::serialize_repair_hosts_filter(std::unordered_set<locator::host_id> filter) {
sstring res = "";
bool first = true;
for (const auto& host : filter) {
if (!std::exchange(first, false)) {
res += ",";
}
res += host.to_sstring();
}
return res;
}
sstring locator::tablet_task_info::serialize_repair_dcs_filter(std::unordered_set<sstring> filter) {
sstring res = "";
bool first = true;
for (const auto& dc : filter) {
if (!std::exchange(first, false)) {
res += ",";
}
res += dc;
}
return res;
}
std::unordered_set<locator::host_id> locator::tablet_task_info::deserialize_repair_hosts_filter(sstring filter) {
if (filter.empty()) {
return {};
}
sstring delim = ",";
return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) {
return locator::host_id(utils::UUID(std::string_view{h}));
}) | std::ranges::to<std::unordered_set>();
}
std::unordered_set<sstring> locator::tablet_task_info::deserialize_repair_dcs_filter(sstring filter) {
if (filter.empty()) {
return {};
}
sstring delim = ",";
return std::ranges::views::split(filter, delim) | std::views::transform([](auto&& h) {
return sstring{std::string_view{h}};
}) | std::ranges::to<std::unordered_set>();
}
locator::tablet_task_info locator::tablet_task_info::make_migration_request() {

View File

@@ -162,17 +162,21 @@ struct tablet_task_info {
db_clock::time_point request_time;
int64_t sched_nr = 0;
db_clock::time_point sched_time;
sstring repair_hosts_filter;
sstring repair_dcs_filter;
std::unordered_set<locator::host_id> repair_hosts_filter;
std::unordered_set<sstring> repair_dcs_filter;
bool operator==(const tablet_task_info&) const = default;
bool is_valid() const;
bool is_user_repair_request() const;
static tablet_task_info make_user_repair_request();
static tablet_task_info make_auto_repair_request();
static tablet_task_info make_user_repair_request(std::unordered_set<locator::host_id> hosts_filter = {}, std::unordered_set<sstring> dcs_filter = {});
static tablet_task_info make_auto_repair_request(std::unordered_set<locator::host_id> hosts_filter = {}, std::unordered_set<sstring> dcs_filter = {});
static tablet_task_info make_migration_request();
static tablet_task_info make_intranode_migration_request();
static tablet_task_info make_split_request();
static tablet_task_info make_merge_request();
static sstring serialize_repair_hosts_filter(std::unordered_set<locator::host_id> filter);
static sstring serialize_repair_dcs_filter(std::unordered_set<sstring> filter);
static std::unordered_set<locator::host_id> deserialize_repair_hosts_filter(sstring filter);
static std::unordered_set<sstring> deserialize_repair_dcs_filter(sstring filter);
};
/// Stores information about a single tablet.

View File

@@ -2446,10 +2446,35 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
auto replicas = info.replicas;
std::vector<locator::host_id> nodes;
std::vector<shard_id> shards;
shard_id master_shard_id;
std::optional<shard_id> master_shard_id;
auto& topology = guard.get_token_metadata()->get_topology();
auto hosts_filter = info.repair_task_info.repair_hosts_filter;
auto dcs_filter = info.repair_task_info.repair_dcs_filter;
for (auto& r : replicas) {
auto shard = r.shard;
if (r.host != myhostid) {
if (!hosts_filter.empty()) {
auto dc = topology.get_datacenter(r.host);
if (!hosts_filter.contains(r.host)) {
rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} skipped",
id.uuid(), r.host, dc, hosts_filter, dcs_filter);
continue;
} else {
rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} ok",
id.uuid(), r.host, dc, hosts_filter, dcs_filter);
}
}
if (!dcs_filter.empty()) {
auto dc = topology.get_datacenter(r.host);
if (!dcs_filter.contains(dc)) {
rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} skipped",
id.uuid(), r.host, dc, hosts_filter, dcs_filter);
continue;
} else {
rlogger.debug("repair[{}]: Check node={} from dc={} hosts_filter={} dcs_filter={} ok",
id.uuid(), r.host, dc, hosts_filter, dcs_filter);
}
}
nodes.push_back(r.host);
shards.push_back(shard);
} else {
@@ -2457,10 +2482,17 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
}
}
if (nodes.empty() || !master_shard_id) {
rlogger.info("repair[{}]: Skipped tablet repair for table={}.{} range={} replicas={} global_tablet_id={} hosts_filter={} dcs_filter={}",
id.uuid(), keyspace_name, table_name, range, replicas, gid, hosts_filter, dcs_filter);
auto flush_time = gc_clock::time_point();
co_return flush_time;
}
std::vector<tablet_repair_task_meta> task_metas;
auto ranges_parallelism = std::nullopt;
auto start = std::chrono::steady_clock::now();
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, table_id, master_shard_id, range, repair_neighbors(nodes, shards), replicas});
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, table_id, *master_shard_id, range, repair_neighbors(nodes, shards), replicas});
auto task_impl_ptr = seastar::make_shared<repair::tablet_repair_task_impl>(_repair_module, id, keyspace_name, global_tablet_repair_task_info.id, table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism);
task_impl_ptr->sched_by_scheduler = true;
auto task = co_await _repair_module->make_task(task_impl_ptr, global_tablet_repair_task_info);

View File

@@ -100,8 +100,8 @@ data_value tablet_task_info_to_data_value(const locator::tablet_task_info& info)
data_value(info.request_time),
data_value(info.sched_nr),
data_value(info.sched_time),
data_value(info.repair_hosts_filter),
data_value(info.repair_dcs_filter),
data_value(locator::tablet_task_info::serialize_repair_hosts_filter(info.repair_hosts_filter)),
data_value(locator::tablet_task_info::serialize_repair_dcs_filter(info.repair_dcs_filter)),
});
return result;
};
@@ -331,8 +331,8 @@ locator::tablet_task_info tablet_task_info_from_cell(const data_value& v) {
value_cast<db_clock::time_point>(dv[2]),
value_cast<int64_t>(dv[3]),
value_cast<db_clock::time_point>(dv[4]),
value_cast<sstring>(dv[5]),
value_cast<sstring>(dv[6]),
locator::tablet_task_info::deserialize_repair_hosts_filter(value_cast<sstring>(dv[5])),
locator::tablet_task_info::deserialize_repair_dcs_filter(value_cast<sstring>(dv[6])),
};
return result;
}

View File

@@ -6273,13 +6273,14 @@ future<bool> storage_service::exec_tablet_update(service::group0_guard guard, st
// Repair the tablets contain the tokens and wait for the repair to finish
// This is used to run a manual repair requested by user from the restful API.
future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant, bool await_completion) {
future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant,
std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.add_repair_tablet_request(table, std::move(tokens_variant), await_completion);
return ss.add_repair_tablet_request(table, std::move(tokens_variant), std::move(hosts_filter), std::move(dcs_filter), await_completion);
});
}
@@ -6293,11 +6294,12 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
throw std::runtime_error("The TABLET_REPAIR_SCHEDULER feature is not enabled on the cluster yet");
}
auto repair_task_info = locator::tablet_task_info::make_user_repair_request();
auto repair_task_info = locator::tablet_task_info::make_user_repair_request(hosts_filter, dcs_filter);
auto res = std::unordered_map<sstring, sstring>{{sstring("tablet_task_id"), repair_task_info.tablet_task_id.to_sstring()}};
auto start = std::chrono::steady_clock::now();
slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={}", table, tokens, all_tokens, repair_task_info.tablet_task_id);
slogger.info("Starting tablet repair by API request table_id={} tokens={} all_tokens={} tablet_task_id={} hosts_filter={} dcs_filter={}",
table, tokens, all_tokens, repair_task_info.tablet_task_id, hosts_filter, dcs_filter);
while (true) {
auto guard = co_await get_guard_for_tablet_update();

View File

@@ -933,7 +933,7 @@ private:
future<bool> exec_tablet_update(service::group0_guard guard, std::vector<canonical_mutation> updates, sstring reason);
public:
struct all_tokens_tag {};
future<std::unordered_map<sstring, sstring>> add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant, bool await_completion);
future<std::unordered_map<sstring, sstring>> add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant, std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion);
future<> del_repair_tablet_request(table_id table, locator::tablet_task_id);
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);

View File

@@ -1455,9 +1455,32 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_return;
}
auto sched_time = tinfo.repair_task_info.sched_time;
auto primary = tmap.get_primary_replica(gid.tablet);
auto dst = primary.host;
auto tablet = gid;
auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter;
auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter;
const auto& topo = _db.get_token_metadata().get_topology();
std::optional<locator::host_id> dst_opt;
if (hosts_filter.empty() && dcs_filter.empty()) {
auto primary = tmap.get_primary_replica(gid.tablet);
dst_opt = primary.host;
} else {
for (auto& replica : tinfo.replicas) {
if (!hosts_filter.empty() && !hosts_filter.contains(replica.host)) {
continue;
}
auto dc = topo.get_datacenter(replica.host);
if (!dcs_filter.empty() && !dcs_filter.contains(dc)) {
continue;
}
dst_opt = replica.host;
break;
}
}
if (!dst_opt) {
throw std::runtime_error(fmt::format("Could not find host to perform repair tablet={} replica={} hosts_filter={} dcs_filter={}",
tablet, tinfo.replicas, hosts_filter, dcs_filter));
}
auto dst = dst_opt.value();
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
@@ -1469,12 +1492,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
})) {
auto& tinfo = tmap.get_tablet_info(gid.tablet);
bool valid = tinfo.repair_task_info.is_valid();
auto hosts_filter = tinfo.repair_task_info.repair_hosts_filter;
auto dcs_filter = tinfo.repair_task_info.repair_dcs_filter;
bool is_filter_off = hosts_filter.empty() && dcs_filter.empty();
rtlogger.debug("Will set tablet {} stage to {}", gid, locator::tablet_transition_stage::end_repair);
auto update = get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token)
.del_session(last_token);
if (valid) {
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;
auto time = tablet_state.repair_time;
rtlogger.debug("Set tablet repair time sched_time={} return_time={} set_time={}",