From e43bc775d04f02df4effc613747cd5050cff0f3b Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 25 Jan 2024 12:07:06 +0800 Subject: [PATCH] repair: Extract flush hints code So it can be used by tablet repair as well. --- repair/repair.cc | 100 ++++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 71fbbb4759..e5dcdfc272 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -364,6 +364,60 @@ static future> get_hosts_participating_in_repair( co_return std::list(participating_hosts.begin(), participating_hosts.end()); } +static future flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db, + sstring keyspace, std::vector cfs, + std::unordered_set ignore_nodes, + std::list participants) { + auto uuid = id.uuid(); + bool needs_flush_before_repair = false; + if (db.features().tombstone_gc_options) { + for (auto& table: cfs) { + if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { + auto s = cf->schema(); + const auto& options = s->tombstone_gc_options(); + if (options.mode() == tombstone_gc_mode::repair) { + needs_flush_before_repair = true; + } + } + } + } + + bool hints_batchlog_flushed = false; + if (needs_flush_before_repair) { + auto waiting_nodes = db.get_token_metadata().get_all_ips(); + std::erase_if(waiting_nodes, [&] (const auto& addr) { + return ignore_nodes.contains(addr); + }); + auto hints_timeout = std::chrono::seconds(300); + auto batchlog_timeout = std::chrono::seconds(300); + repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; + + try { + co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { + rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", + uuid, node, participants); + try { + auto& ms = rs.get_messaging(); + auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); + (void)resp; // nothing to do with response yet + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", + uuid, node, participants, std::current_exception()); + throw; + } + }); + hints_batchlog_flushed = true; + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", + uuid, participants); + } + } else { + rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); + } + co_return hints_batchlog_flushed; +} + + float node_ops_metrics::repair_finished_percentage() { return _module->report_progress(); } @@ -1255,20 +1309,6 @@ future<> repair::user_requested_repair_task_impl::run() { auto uuid = node_ops_id{id.uuid().uuid()}; auto start_time = std::chrono::steady_clock::now(); - bool needs_flush_before_repair = false; - if (db.features().tombstone_gc_options) { - for (auto& table: cfs) { - if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { - auto s = cf->schema(); - const auto& options = s->tombstone_gc_options(); - if (options.mode() == tombstone_gc_mode::repair) { - needs_flush_before_repair = true; - } - } - } - } - - bool hints_batchlog_flushed = false; std::list participants; if (_small_table_optimization) { auto normal_nodes = germs->get().get_token_metadata().get_all_ips(); @@ -1276,37 +1316,7 @@ future<> repair::user_requested_repair_task_impl::run() { } else { participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); } - if (needs_flush_before_repair) { - auto waiting_nodes = db.get_token_metadata().get_all_ips(); - std::erase_if(waiting_nodes, [&] (const auto& addr) { - return ignore_nodes.contains(addr); - }); - auto hints_timeout = std::chrono::seconds(300); - auto batchlog_timeout = std::chrono::seconds(300); - repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; - - try { - parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { - rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", - uuid, node, participants); - try { - auto& ms = rs.get_messaging(); - auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); - (void)resp; // nothing to do with response yet - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", - uuid, node, participants, std::current_exception()); - throw; - } - }).get(); - hints_batchlog_flushed = true; - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", - uuid, participants); - } - } else { - rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); - } + bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get0(); std::vector> repair_results; repair_results.reserve(smp::count);