Merge 'repair: Update repair history for tablet repair' from Asias He

This patch wires up tombstone_gc repair with tablet repair. The flush
hints logic from the vnode table repair is reused. The way to mark the
finish of the repair is also adjusted for tablet repair because it only
has one shard per tablet token range instead of smp::count shards.

Fixes: #17046
Tests: test_tablet_repair_history

Closes scylladb/scylladb#17047

* github.com:scylladb/scylladb:
  repair: Update repair history for tablet repair
  repair: Extract flush hints code
This commit is contained in:
Avi Kivity
2024-02-18 17:51:16 +02:00
4 changed files with 109 additions and 53 deletions

View File

@@ -364,6 +364,60 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
co_return std::list<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
}
static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db,
sstring keyspace, std::vector<sstring> cfs,
std::unordered_set<gms::inet_address> ignore_nodes,
std::list<gms::inet_address> participants) {
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
if (db.features().tombstone_gc_options) {
for (auto& table: cfs) {
if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) {
auto s = cf->schema();
const auto& options = s->tombstone_gc_options();
if (options.mode() == tombstone_gc_mode::repair) {
needs_flush_before_repair = true;
}
}
}
}
bool hints_batchlog_flushed = false;
if (needs_flush_before_repair) {
auto waiting_nodes = db.get_token_metadata().get_all_ips();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = rs.get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
(void)resp; // nothing to do with response yet
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
throw;
}
});
hints_batchlog_flushed = true;
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
}
} else {
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
}
co_return hints_batchlog_flushed;
}
float node_ops_metrics::repair_finished_percentage() {
return _module->report_progress();
}
@@ -1255,20 +1309,6 @@ future<> repair::user_requested_repair_task_impl::run() {
auto uuid = node_ops_id{id.uuid().uuid()};
auto start_time = std::chrono::steady_clock::now();
bool needs_flush_before_repair = false;
if (db.features().tombstone_gc_options) {
for (auto& table: cfs) {
if (const auto* cf = find_column_family_if_exists(db, keyspace, table)) {
auto s = cf->schema();
const auto& options = s->tombstone_gc_options();
if (options.mode() == tombstone_gc_mode::repair) {
needs_flush_before_repair = true;
}
}
}
}
bool hints_batchlog_flushed = false;
std::list<gms::inet_address> participants;
if (_small_table_optimization) {
auto normal_nodes = germs->get().get_token_metadata().get_all_ips();
@@ -1276,37 +1316,7 @@ future<> repair::user_requested_repair_task_impl::run() {
} else {
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
}
if (needs_flush_before_repair) {
auto waiting_nodes = db.get_token_metadata().get_all_ips();
std::erase_if(waiting_nodes, [&] (const auto& addr) {
return ignore_nodes.contains(addr);
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
uuid, node, participants);
try {
auto& ms = rs.get_messaging();
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
(void)resp; // nothing to do with response yet
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
uuid, node, participants, std::current_exception());
throw;
}
}).get();
hints_batchlog_flushed = true;
} catch (...) {
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
uuid, participants);
}
} else {
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
}
bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get0();
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
@@ -2124,7 +2134,7 @@ future<> repair::tablet_repair_task_impl::run() {
auto start_time = std::chrono::steady_clock::now();
auto parent_data = get_repair_uniq_id().task_info;
std::atomic<int> idx{1};
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason] (repair_service& rs) -> future<> {
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables] (repair_service& rs) -> future<> {
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
continue;
@@ -2152,7 +2162,10 @@ future<> repair::tablet_repair_task_impl::run() {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
auto my_address = erm->get_topology().my_address();
auto participants = std::list<gms::inet_address>(m.neighbors.all.begin(), m.neighbors.all.end());
participants.push_front(my_address);
bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants);
bool small_table_optimization = false;
auto ranges_parallelism = std::nullopt;

View File

@@ -2595,6 +2595,8 @@ class row_level_repair {
gc_clock::time_point _start_time;
bool _is_tablet;
public:
row_level_repair(repair::shard_repair_task_impl& shard_task,
sstring cf_name,
@@ -2609,7 +2611,9 @@ public:
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
, _small_table_optimization(small_table_optimization)
, _seed(get_random_seed())
, _start_time(gc_clock::now()) {
, _start_time(gc_clock::now())
, _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets())
{
repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range);
auto& map = r_neighbors.shard_map;
for (auto& n : _all_live_peer_nodes) {
@@ -2919,7 +2923,7 @@ private:
co_return;
}
repair_service& rs = _shard_task.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time);
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet);
if (!repair_time_opt) {
co_return;
}
@@ -3195,15 +3199,16 @@ static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
return container().invoke_on(shard, [repair_id, table_id, range, repair_time, is_tablet] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
if (rh.repair_time > repair_time) {
rh.repair_time = repair_time;
}
auto finished_shards = ++(rh.finished_ranges[table_id][range]);
if (finished_shards == smp::count) {
// Tablet repair runs only on one shard
if (finished_shards == smp::count || is_tablet) {
// All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}",
repair_id, range, table_id, finished_shards);

View File

@@ -135,7 +135,7 @@ public:
// stop them abruptly).
future<> shutdown();
future<std::optional<gc_clock::time_point>> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
future<std::optional<gc_clock::time_point>> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet);
future<> cleanup_history(tasks::task_id repair_id);
future<> load_history();

View File

@@ -477,6 +477,44 @@ async def test_tablet_missing_data_repair(manager: ManagerClient):
await manager.rolling_restart(servers, with_down=check_with_down)
@pytest.mark.repair
@pytest.mark.asyncio
async def test_tablet_repair_history(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]
rf = 3
tablets = 8
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', "
"'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets))
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};")
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f'Got hosts={hosts}');
await repair_on_node(manager, servers[0], servers)
async def check_repair_history():
all_rows = []
for host in hosts:
logging.info(f'Query hosts={host}');
rows = await cql.run_async("SELECT * from system.repair_history", host=host)
all_rows += rows
for row in all_rows:
logging.info(f"Got repair_history_entry={row}")
assert len(all_rows) == rf * tablets
await check_repair_history()
await cql.run_async("DROP KEYSPACE test;")
@pytest.mark.asyncio
async def test_tablet_cleanup(manager: ManagerClient):
cmdline = ['--smp=2', '--commitlog-sync=batch']