From e43bc775d04f02df4effc613747cd5050cff0f3b Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 25 Jan 2024 12:07:06 +0800 Subject: [PATCH 1/2] repair: Extract flush hints code So it can be used by tablet repair as well. --- repair/repair.cc | 100 ++++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 71fbbb4759..e5dcdfc272 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -364,6 +364,60 @@ static future> get_hosts_participating_in_repair( co_return std::list(participating_hosts.begin(), participating_hosts.end()); } +static future flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db, + sstring keyspace, std::vector cfs, + std::unordered_set ignore_nodes, + std::list participants) { + auto uuid = id.uuid(); + bool needs_flush_before_repair = false; + if (db.features().tombstone_gc_options) { + for (auto& table: cfs) { + if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { + auto s = cf->schema(); + const auto& options = s->tombstone_gc_options(); + if (options.mode() == tombstone_gc_mode::repair) { + needs_flush_before_repair = true; + } + } + } + } + + bool hints_batchlog_flushed = false; + if (needs_flush_before_repair) { + auto waiting_nodes = db.get_token_metadata().get_all_ips(); + std::erase_if(waiting_nodes, [&] (const auto& addr) { + return ignore_nodes.contains(addr); + }); + auto hints_timeout = std::chrono::seconds(300); + auto batchlog_timeout = std::chrono::seconds(300); + repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; + + try { + co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { + rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", + uuid, node, participants); + try { + auto& ms = rs.get_messaging(); + auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); + (void)resp; // nothing to do with response yet + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", + uuid, node, participants, std::current_exception()); + throw; + } + }); + hints_batchlog_flushed = true; + } catch (...) { + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", + uuid, participants); + } + } else { + rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); + } + co_return hints_batchlog_flushed; +} + + float node_ops_metrics::repair_finished_percentage() { return _module->report_progress(); } @@ -1255,20 +1309,6 @@ future<> repair::user_requested_repair_task_impl::run() { auto uuid = node_ops_id{id.uuid().uuid()}; auto start_time = std::chrono::steady_clock::now(); - bool needs_flush_before_repair = false; - if (db.features().tombstone_gc_options) { - for (auto& table: cfs) { - if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) { - auto s = cf->schema(); - const auto& options = s->tombstone_gc_options(); - if (options.mode() == tombstone_gc_mode::repair) { - needs_flush_before_repair = true; - } - } - } - } - - bool hints_batchlog_flushed = false; std::list participants; if (_small_table_optimization) { auto normal_nodes = germs->get().get_token_metadata().get_all_ips(); @@ -1276,37 +1316,7 @@ future<> repair::user_requested_repair_task_impl::run() { } else { participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); } - if (needs_flush_before_repair) { - auto waiting_nodes = db.get_token_metadata().get_all_ips(); - std::erase_if(waiting_nodes, [&] (const auto& addr) { - return ignore_nodes.contains(addr); - }); - auto hints_timeout = std::chrono::seconds(300); - auto batchlog_timeout = std::chrono::seconds(300); - repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; - - try { - parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { - rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", - uuid, node, participants); - try { - auto& ms = rs.get_messaging(); - auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); - (void)resp; // nothing to do with response yet - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", - uuid, node, participants, std::current_exception()); - throw; - } - }).get(); - hints_batchlog_flushed = true; - } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", - uuid, participants); - } - } else { - rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); - } + bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get0(); std::vector> repair_results; repair_results.reserve(smp::count); From 796044be1c38ac810d4e881a7a54b5c611d059d0 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 30 Jan 2024 09:48:24 +0800 Subject: [PATCH 2/2] repair: Update repair history for tablet repair This patch wires up tombstone_gc repair with tablet repair. The flush hints logic from the vnode table repair is reused. The way to mark the finish of the repair is also adjusted for tablet repair because it only has one shard per tablet token range instead of smp::count shards. Fixes: #17046 Tests: test_tablet_repair_history --- repair/repair.cc | 7 +++- repair/row_level.cc | 15 +++++--- repair/row_level.hh | 2 +- .../test_tablets.py | 38 +++++++++++++++++++ 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index e5dcdfc272..744ac79701 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -2134,7 +2134,7 @@ future<> repair::tablet_repair_task_impl::run() { auto start_time = std::chrono::steady_clock::now(); auto parent_data = get_repair_uniq_id().task_info; std::atomic idx{1}; - rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason] (repair_service& rs) -> future<> { + rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables] (repair_service& rs) -> future<> { for (auto& m : metas) { if (m.master_shard_id != this_shard_id()) { continue; @@ -2162,7 +2162,10 @@ future<> repair::tablet_repair_task_impl::run() { auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); - bool hints_batchlog_flushed = false; + auto my_address = erm->get_topology().my_address(); + auto participants = std::list(m.neighbors.all.begin(), m.neighbors.all.end()); + participants.push_front(my_address); + bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants); bool small_table_optimization = false; auto ranges_parallelism = std::nullopt; diff --git a/repair/row_level.cc b/repair/row_level.cc index 157f95572a..9dc469a020 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2595,6 +2595,8 @@ class row_level_repair { gc_clock::time_point _start_time; + bool _is_tablet; + public: row_level_repair(repair::shard_repair_task_impl& shard_task, sstring cf_name, @@ -2609,7 +2611,9 @@ public: , _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes)) , _small_table_optimization(small_table_optimization) , _seed(get_random_seed()) - , _start_time(gc_clock::now()) { + , _start_time(gc_clock::now()) + , _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets()) + { repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range); auto& map = r_neighbors.shard_map; for (auto& n : _all_live_peer_nodes) { @@ -2919,7 +2923,7 @@ private: co_return; } repair_service& rs = _shard_task.rs; - std::optional repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time); + std::optional repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet); if (!repair_time_opt) { co_return; } @@ -3195,15 +3199,16 @@ static shard_id repair_id_to_shard(tasks::task_id& repair_id) { } future> -repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) { +repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet) { auto shard = repair_id_to_shard(repair_id); - return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future> { + return container().invoke_on(shard, [repair_id, table_id, range, repair_time, is_tablet] (repair_service& rs) mutable -> future> { repair_history& rh = rs._finished_ranges_history[repair_id]; if (rh.repair_time > repair_time) { rh.repair_time = repair_time; } auto finished_shards = ++(rh.finished_ranges[table_id][range]); - if (finished_shards == smp::count) { + // Tablet repair runs only on one shard + if (finished_shards == smp::count || is_tablet) { // All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}", repair_id, range, table_id, finished_shards); diff --git a/repair/row_level.hh b/repair/row_level.hh index 45e85c92db..fcef13a1b0 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -135,7 +135,7 @@ public: // stop them abruptly). future<> shutdown(); - future> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time); + future> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet); future<> cleanup_history(tasks::task_id repair_id); future<> load_history(); diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index e0ff779d5c..6a1163d7f9 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -477,6 +477,44 @@ async def test_tablet_missing_data_repair(manager: ManagerClient): await manager.rolling_restart(servers, with_down=check_with_down) +@pytest.mark.repair +@pytest.mark.asyncio +async def test_tablet_repair_history(manager: ManagerClient): + logger.info("Bootstrapping cluster") + servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()] + + rf = 3 + tablets = 8 + + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', " + "'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets)) + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};") + + logger.info("Populating table") + + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + logging.info(f'Got hosts={hosts}'); + + await repair_on_node(manager, servers[0], servers) + + async def check_repair_history(): + all_rows = [] + for host in hosts: + logging.info(f'Query hosts={host}'); + rows = await cql.run_async("SELECT * from system.repair_history", host=host) + all_rows += rows + for row in all_rows: + logging.info(f"Got repair_history_entry={row}") + assert len(all_rows) == rf * tablets + + await check_repair_history() + + await cql.run_async("DROP KEYSPACE test;") + @pytest.mark.asyncio async def test_tablet_cleanup(manager: ManagerClient): cmdline = ['--smp=2', '--commitlog-sync=batch']