diff --git a/repair/repair.cc b/repair/repair.cc index 95b2df2328..13715cd24b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -635,7 +635,7 @@ size_t repair::shard_repair_task_impl::ranges_size() { // Repair a single local range, multiple column families. // Comparable to RepairSession in Origin -future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& range, ::table_id table_id) { +future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& range, table_info table) { check_in_abort_or_shutdown(); ranges_index++; repair_neighbors r_neighbors = get_repair_neighbors(range); @@ -650,7 +650,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra nodes_down.insert(node); auto status = format("failed: mandatory neighbor={} is not alive", node); rlogger.error("repair[{}]: Repair {} out of {} ranges, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", - global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); + global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table.name, range, neighbors, live_neighbors, status); // If the task is aborted, its state will change to failed. One can wait for this with task_manager::task::done(). (void)abort(); co_await coroutine::return_exception(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}", @@ -667,7 +667,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra } auto status = live_neighbors.empty() ? "skipped_no_live_peers" : "partial"; rlogger.warn("repair[{}]: Repair {} out of {} ranges, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", - global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); + global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table.name, range, neighbors, live_neighbors, status); if (live_neighbors.empty()) { co_return; } @@ -676,15 +676,15 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra if (neighbors.empty()) { auto status = "skipped_no_followers"; rlogger.warn("repair[{}]: Repair {} out of {} ranges, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}", - global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table_names(), range, neighbors, live_neighbors, status); + global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table.name, range, neighbors, live_neighbors, status); co_return; } rlogger.debug("repair[{}]: Repair {} out of {} ranges, keyspace={}, table={}, range={}, peers={}, live_peers={}", - global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table_names(), range, neighbors, live_neighbors); + global_repair_id.uuid(), ranges_index, ranges_size(), _status.keyspace, table.name, range, neighbors, live_neighbors); co_await mm.sync_schema(db.local(), neighbors); sstring cf; try { - cf = db.local().find_column_family(table_id).schema()->cf_name(); + cf = db.local().find_column_family(table.id).schema()->cf_name(); } catch (replica::no_such_column_family&) { co_return; } @@ -693,7 +693,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra co_return; } try { - co_await repair_cf_range_row_level(*this, cf, table_id, range, neighbors); + co_await repair_cf_range_row_level(*this, cf, table.id, range, neighbors); } catch (replica::no_such_column_family&) { dropped_tables.insert(cf); } catch (...) { @@ -952,17 +952,19 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() { // Repair tables in the keyspace one after another assert(table_names().size() == table_ids.size()); for (size_t idx = 0; idx < table_ids.size(); idx++) { - auto table_id = table_ids[idx]; - auto table_name = table_names()[idx]; + table_info table_info{ + .name = table_names()[idx], + .id = table_ids[idx], + }; // repair all the ranges in limited parallelism rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}", - global_repair_id.uuid(), idx + 1, table_ids.size(), _status.keyspace, table_name, table_id, _reason); - co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) -> future<> { + global_repair_id.uuid(), idx + 1, table_ids.size(), _status.keyspace, table_info.name, table_info.id, _reason); + co_await coroutine::parallel_for_each(ranges, [this, table_info] (auto&& range) -> future<> { // Get the system range parallelism auto permit = co_await seastar::get_units(rs.get_repair_module().range_parallelism_semaphore(), 1); // Get the range parallelism specified by user auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>(); - co_await repair_range(range, table_id); + co_await repair_range(range, table_info); if (_reason == streaming::stream_reason::bootstrap) { rs.get_metrics().bootstrap_finished_ranges++; } else if (_reason == streaming::stream_reason::replace) { @@ -989,7 +991,7 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() { if (_reason != streaming::stream_reason::repair) { try { - auto& table = db.local().find_column_family(table_id); + auto& table = db.local().find_column_family(table_info.id); rlogger.debug("repair[{}]: Trigger off-strategy compaction for keyspace={}, table={}", global_repair_id.uuid(), table.schema()->ks_name(), table.schema()->cf_name()); table.trigger_offstrategy_compaction(); diff --git a/repair/task_manager_module.hh b/repair/task_manager_module.hh index 831de94f4d..8cfe709754 100644 --- a/repair/task_manager_module.hh +++ b/repair/task_manager_module.hh @@ -161,7 +161,7 @@ public: return _hints_batchlog_flushed; } - future<> repair_range(const dht::token_range& range, table_id); + future<> repair_range(const dht::token_range& range, table_info table); size_t ranges_size(); protected: