diff --git a/repair/repair.cc b/repair/repair.cc index 71fbbb4759..744ac79701 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -364,6 +364,60 @@ static future> get_hosts_participating_in_repair( co_return std::list(participating_hosts.begin(), participating_hosts.end()); } +static future flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db, + sstring keyspace, std::vector cfs, + std::unordered_set ignore_nodes, + std::list participants) { + auto uuid = id.uuid(); + bool needs_flush_before_repair = false; + if (db.features().tombstone_gc_options) { + for (auto& table: cfs) { + if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { + auto s = cf->schema(); + const auto& options = s->tombstone_gc_options(); + if (options.mode() == tombstone_gc_mode::repair) { + needs_flush_before_repair = true; + } + } + } + } + + bool hints_batchlog_flushed = false; + if (needs_flush_before_repair) { + auto waiting_nodes = db.get_token_metadata().get_all_ips(); + std::erase_if(waiting_nodes, [&] (const auto& addr) { + return ignore_nodes.contains(addr); + }); + auto hints_timeout = std::chrono::seconds(300); + auto batchlog_timeout = std::chrono::seconds(300); + repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; + + try { + co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { + rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", + uuid, node, participants); + try { + auto& ms = rs.get_messaging(); + auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); + (void)resp; // nothing to do with response yet + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", + uuid, node, participants, std::current_exception()); + throw; + } + }); + hints_batchlog_flushed = true; + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", + uuid, participants); + } + } else { + rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); + } + co_return hints_batchlog_flushed; +} + + float node_ops_metrics::repair_finished_percentage() { return _module->report_progress(); } @@ -1255,20 +1309,6 @@ future<> repair::user_requested_repair_task_impl::run() { auto uuid = node_ops_id{id.uuid().uuid()}; auto start_time = std::chrono::steady_clock::now(); - bool needs_flush_before_repair = false; - if (db.features().tombstone_gc_options) { - for (auto& table: cfs) { - if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { - auto s = cf->schema(); - const auto& options = s->tombstone_gc_options(); - if (options.mode() == tombstone_gc_mode::repair) { - needs_flush_before_repair = true; - } - } - } - } - - bool hints_batchlog_flushed = false; std::list participants; if (_small_table_optimization) { auto normal_nodes = germs->get().get_token_metadata().get_all_ips(); @@ -1276,37 +1316,7 @@ future<> repair::user_requested_repair_task_impl::run() { } else { participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); } - if (needs_flush_before_repair) { - auto waiting_nodes = db.get_token_metadata().get_all_ips(); - std::erase_if(waiting_nodes, [&] (const auto& addr) { - return ignore_nodes.contains(addr); - }); - auto hints_timeout = std::chrono::seconds(300); - auto batchlog_timeout = std::chrono::seconds(300); - repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; - - try { - parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { - rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", - uuid, node, participants); - try { - auto& ms = rs.get_messaging(); - auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); - (void)resp; // nothing to do with response yet - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", - uuid, node, participants, std::current_exception()); - throw; - } - }).get(); - hints_batchlog_flushed = true; - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", - uuid, participants); - } - } else { - rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); - } + bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get0(); std::vector> repair_results; repair_results.reserve(smp::count); @@ -2124,7 +2134,7 @@ future<> repair::tablet_repair_task_impl::run() { auto start_time = std::chrono::steady_clock::now(); auto parent_data = get_repair_uniq_id().task_info; std::atomic idx{1}; - rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason] (repair_service& rs) -> future<> { + rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables] (repair_service& rs) -> future<> { for (auto& m : metas) { if (m.master_shard_id != this_shard_id()) { continue; @@ -2152,7 +2162,10 @@ future<> repair::tablet_repair_task_impl::run() { auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); - bool hints_batchlog_flushed = false; + auto my_address = erm->get_topology().my_address(); + auto participants = std::list(m.neighbors.all.begin(), m.neighbors.all.end()); + participants.push_front(my_address); + bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants); bool small_table_optimization = false; auto ranges_parallelism = std::nullopt; diff --git a/repair/row_level.cc b/repair/row_level.cc index 157f95572a..9dc469a020 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2595,6 +2595,8 @@ class row_level_repair { gc_clock::time_point _start_time; + bool _is_tablet; + public: row_level_repair(repair::shard_repair_task_impl& shard_task, sstring cf_name, @@ -2609,7 +2611,9 @@ public: , _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes)) , _small_table_optimization(small_table_optimization) , _seed(get_random_seed()) - , _start_time(gc_clock::now()) { + , _start_time(gc_clock::now()) + , _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets()) + { repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range); auto& map = r_neighbors.shard_map; for (auto& n : _all_live_peer_nodes) { @@ -2919,7 +2923,7 @@ private: co_return; } repair_service& rs = _shard_task.rs; - std::optional repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time); + std::optional repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet); if (!repair_time_opt) { co_return; } @@ -3195,15 +3199,16 @@ static shard_id repair_id_to_shard(tasks::task_id& repair_id) { } future> -repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) { +repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet) { auto shard = repair_id_to_shard(repair_id); - return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future> { + return container().invoke_on(shard, [repair_id, table_id, range, repair_time, is_tablet] (repair_service& rs) mutable -> future> { repair_history& rh = rs._finished_ranges_history[repair_id]; if (rh.repair_time > repair_time) { rh.repair_time = repair_time; } auto finished_shards = ++(rh.finished_ranges[table_id][range]); - if (finished_shards == smp::count) { + // Tablet repair runs only on one shard + if (finished_shards == smp::count || is_tablet) { // All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}", repair_id, range, table_id, finished_shards); diff --git a/repair/row_level.hh b/repair/row_level.hh index 45e85c92db..fcef13a1b0 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -135,7 +135,7 @@ public: // stop them abruptly). future<> shutdown(); - future> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time); + future> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet); future<> cleanup_history(tasks::task_id repair_id); future<> load_history(); diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index e0ff779d5c..6a1163d7f9 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -477,6 +477,44 @@ async def test_tablet_missing_data_repair(manager: ManagerClient): await manager.rolling_restart(servers, with_down=check_with_down) +@pytest.mark.repair +@pytest.mark.asyncio +async def test_tablet_repair_history(manager: ManagerClient): + logger.info("Bootstrapping cluster") + servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()] + + rf = 3 + tablets = 8 + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', " + "'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets)) + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};") + + logger.info("Populating table") + + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + logging.info(f'Got hosts={hosts}'); + + await repair_on_node(manager, servers[0], servers) + + async def check_repair_history(): + all_rows = [] + for host in hosts: + logging.info(f'Query hosts={host}'); + rows = await cql.run_async("SELECT * from system.repair_history", host=host) + all_rows += rows + for row in all_rows: + logging.info(f"Got repair_history_entry={row}") + assert len(all_rows) == rf * tablets + + await check_repair_history() + + await cql.run_async("DROP KEYSPACE test;") + @pytest.mark.asyncio async def test_tablet_cleanup(manager: ManagerClient): cmdline = ['--smp=2', '--commitlog-sync=batch']