repair: Extract flush hints code

So it can be used by tablet repair as well.
This commit is contained in:
Asias He
2024-01-25 12:07:06 +08:00
parent e132ffdb60
commit e43bc775d0

View File

@@ -364,6 +364,60 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
co_return std::list<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
}
static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db,
sstring keyspace, std::vector<sstring> cfs,
std::unordered_set<gms::inet_address> ignore_nodes,
std::list<gms::inet_address> participants) {
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
if (db.features().tombstone_gc_options) {
for (auto& table: cfs) {
if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) {
auto s = cf->schema();
const auto& options = s->tombstone_gc_options();
if (options.mode() == tombstone_gc_mode::repair) {
needs_flush_before_repair = true;
}
}
}
}
bool hints_batchlog_flushed = false;
if (needs_flush_before_repair) {
auto waiting_nodes = db.get_token_metadata().get_all_ips();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = rs.get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
(void)resp; // nothing to do with response yet
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
throw;
}
});
hints_batchlog_flushed = true;
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
}
} else {
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
}
co_return hints_batchlog_flushed;
}
float node_ops_metrics::repair_finished_percentage() {
return _module->report_progress();
}
@@ -1255,20 +1309,6 @@ future<> repair::user_requested_repair_task_impl::run() {
auto uuid = node_ops_id{id.uuid().uuid()};
auto start_time = std::chrono::steady_clock::now();
bool needs_flush_before_repair = false;
if (db.features().tombstone_gc_options) {
for (auto& table: cfs) {
if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) {
auto s = cf->schema();
const auto& options = s->tombstone_gc_options();
if (options.mode() == tombstone_gc_mode::repair) {
needs_flush_before_repair = true;
}
}
}
}
bool hints_batchlog_flushed = false;
std::list<gms::inet_address> participants;
if (_small_table_optimization) {
auto normal_nodes = germs->get().get_token_metadata().get_all_ips();
@@ -1276,37 +1316,7 @@ future<> repair::user_requested_repair_task_impl::run() {
} else {
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
}
if (needs_flush_before_repair) {
auto waiting_nodes = db.get_token_metadata().get_all_ips();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = rs.get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
(void)resp; // nothing to do with response yet
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
throw;
}
}).get();
hints_batchlog_flushed = true;
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
}
} else {
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
}
bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get0();
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);