diff --git a/repair/repair.cc b/repair/repair.cc index 3cabba2301..704b67f0ee 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -688,7 +688,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t if (_ri->dropped_tables.contains(cf)) { return make_ready_future<>(); } - return repair_cf_range_row_level(*_ri, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable { + return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable { _ri->dropped_tables.insert(cf); return make_ready_future<>(); }).handle_exception([this] (std::exception_ptr ep) mutable { diff --git a/repair/repair_task.hh b/repair/repair_task.hh index 8ad2e9b69f..3091b41a07 100644 --- a/repair/repair_task.hh +++ b/repair/repair_task.hh @@ -89,6 +89,9 @@ public: , _ex(std::move(ex)) {} + lw_shared_ptr get_repair_info() const noexcept { + return _ri; + } void check_failed_ranges(); void abort_repair_info() noexcept; void check_in_abort(); diff --git a/repair/row_level.cc b/repair/row_level.cc index e3fed91e87..dbd9663dde 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2369,7 +2369,7 @@ static void add_to_repair_meta_for_followers(repair_meta& rm) { } class row_level_repair { - repair_info& _ri; + shard_repair_task_impl& _shard_task; sstring _cf_name; table_id _table_id; dht::token_range _range; @@ -2418,17 +2418,17 @@ class row_level_repair { gc_clock::time_point _start_time; public: - row_level_repair(repair_info& ri, + row_level_repair(shard_repair_task_impl& shard_task, sstring cf_name, table_id table_id, dht::token_range range, std::vector all_live_peer_nodes) - : _ri(ri) + : _shard_task(shard_task) , _cf_name(std::move(cf_name)) , _table_id(std::move(table_id)) , _range(std::move(range)) , _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes)) - , _cf(_ri.db.local().find_column_family(_table_id)) + , _cf(_shard_task.get_repair_info()->db.local().find_column_family(_table_id)) , _seed(get_random_seed()) , _start_time(gc_clock::now()) { } @@ -2443,7 +2443,7 @@ private: inet_address_vector_replica_set sort_peer_nodes(const std::vector& nodes) { auto myip = utils::fb_utilities::get_broadcast_address(); inet_address_vector_replica_set sorted_nodes(nodes.begin(), nodes.end()); - _ri.db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes); + _shard_task.get_repair_info()->db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes); return sorted_nodes; } @@ -2454,8 +2454,8 @@ private: // Step A: Negotiate sync boundary to use op_status negotiate_sync_boundary(repair_meta& master) { - _ri.check_in_shutdown(); - _ri.check_in_abort(); + _shard_task.check_in_shutdown(); + _shard_task.check_in_abort(); _sync_boundaries.clear(); _combined_hashes.clear(); _zero_rows = false; @@ -2514,8 +2514,8 @@ private: // Step B: Get missing rows from peer nodes so that local node contains all the rows op_status get_missing_rows_from_follower_nodes(repair_meta& master) { - _ri.check_in_shutdown(); - _ri.check_in_abort(); + _shard_task.check_in_shutdown(); + _shard_task.check_in_abort(); // `combined_hashes` contains the combined hashes for the // `_working_row_buf`. Like `_row_buf`, `_working_row_buf` contains // rows which are within the (_last_sync_boundary, _current_sync_boundary] @@ -2646,8 +2646,8 @@ private: void send_missing_rows_to_follower_nodes(repair_meta& master) { // At this time, repair master contains all the rows between (_last_sync_boundary, _current_sync_boundary] // So we can figure out which rows peer node are missing and send the missing rows to them - _ri.check_in_shutdown(); - _ri.check_in_abort(); + _shard_task.check_in_shutdown(); + _shard_task.check_in_abort(); repair_hash_set local_row_hash_sets = master.working_row_hashes().get0(); auto sz = _all_live_peer_nodes.size(); std::vector set_diffs(sz); @@ -2678,37 +2678,37 @@ private: // Update system.repair_history table future<> update_system_repair_table() { // Update repair_history table only if it is a reguar repair. - if (_ri.reason != streaming::stream_reason::repair) { + if (_shard_task.get_repair_info()->reason != streaming::stream_reason::repair) { co_return; } // Update repair_history table only if all replicas have been repaired size_t repaired_replicas = _all_live_peer_nodes.size() + 1; - if (_ri.total_rf != repaired_replicas){ + if (_shard_task.get_repair_info()->total_rf != repaired_replicas){ rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}", - _ri.id.uuid(), _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes); + _shard_task.get_repair_info()->id.uuid(), _shard_task.get_repair_info()->total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes); co_return; } // Update repair_history table only if both hints and batchlog have been flushed. - if (!_ri.hints_batchlog_flushed()) { + if (!_shard_task.get_repair_info()->hints_batchlog_flushed()) { co_return; } - repair_service& rs = _ri.rs; - std::optional repair_time_opt = co_await rs.update_history(_ri.id.uuid(), _table_id, _range, _start_time); + repair_service& rs = _shard_task.get_repair_info()->rs; + std::optional repair_time_opt = co_await rs.update_history(_shard_task.get_repair_info()->id.uuid(), _table_id, _range, _start_time); if (!repair_time_opt) { co_return; } auto repair_time = repair_time_opt.value(); - repair_update_system_table_request req{_ri.id.uuid(), _table_id, _ri.keyspace, _cf_name, _range, repair_time}; + repair_update_system_table_request req{_shard_task.get_repair_info()->id.uuid(), _table_id, _shard_task.get_repair_info()->keyspace, _cf_name, _range, repair_time}; auto all_nodes = _all_live_peer_nodes; all_nodes.push_back(utils::fb_utilities::get_broadcast_address()); co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> { try { - auto& ms = _ri.messaging.local(); + auto& ms = _shard_task.get_repair_info()->messaging.local(); repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req); (void)resp; // nothing to do with the response yet - rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid(), node); + rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.get_repair_info()->id.uuid(), node); } catch (...) { - rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid(), node, std::current_exception()); + rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _shard_task.get_repair_info()->id.uuid(), node, std::current_exception()); } }); co_return; @@ -2717,33 +2717,33 @@ private: public: future<> run() { return seastar::async([this] { - _ri.check_in_shutdown(); - _ri.check_in_abort(); - auto repair_meta_id = _ri.rs.get_next_repair_meta_id().get0(); - auto algorithm = get_common_diff_detect_algorithm(_ri.messaging.local(), _all_live_peer_nodes); + _shard_task.check_in_shutdown(); + _shard_task.check_in_abort(); + auto repair_meta_id = _shard_task.get_repair_info()->rs.get_next_repair_meta_id().get0(); + auto algorithm = get_common_diff_detect_algorithm(_shard_task.get_repair_info()->messaging.local(), _all_live_peer_nodes); auto max_row_buf_size = get_max_row_buf_size(algorithm); auto master_node_shard_config = shard_config { this_shard_id(), - _ri.sharder.shard_count(), - _ri.sharder.sharding_ignore_msb() + _shard_task.get_repair_info()->sharder.shard_count(), + _shard_task.get_repair_info()->sharder.sharding_ignore_msb() }; auto s = _cf.schema(); auto schema_version = s->version(); bool table_dropped = false; - auto& mem_sem = _ri.rs.memory_sem(); - auto max = _ri.rs.max_repair_memory(); + auto& mem_sem = _shard_task.get_repair_info()->rs.memory_sem(); + auto max = _shard_task.get_repair_info()->rs.max_repair_memory(); auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range; wanted = std::min(max, wanted); rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}", - _ri.id.uuid(), wanted, mem_sem.current(), max); + _shard_task.get_repair_info()->id.uuid(), wanted, mem_sem.current(), max); auto mem_permit = seastar::get_units(mem_sem, wanted).get0(); rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}", - _ri.id.uuid(), wanted, mem_sem.current(), max); + _shard_task.get_repair_info()->id.uuid(), wanted, mem_sem.current(), max); - auto permit = _ri.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0(); + auto permit = _shard_task.get_repair_info()->db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0(); - repair_meta master(_ri.rs, + repair_meta master(_shard_task.get_repair_info()->rs, _cf, s, std::move(permit), @@ -2753,7 +2753,7 @@ public: _seed, repair_master::yes, repair_meta_id, - _ri.reason, + _shard_task.get_repair_info()->reason, std::move(master_node_shard_config), _all_live_peer_nodes, _all_live_peer_nodes.size(), @@ -2765,7 +2765,7 @@ public: }); rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}", - master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size); + master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_repair_info()->keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size); std::vector nodes_to_stop; @@ -2774,7 +2774,7 @@ public: parallel_for_each(master.all_nodes(), [&, this] (repair_node_state& ns) { const auto& node = ns.node; ns.state = repair_state::row_level_start_started; - return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version, _ri.reason).then([&] () { + return master.repair_row_level_start(node, _shard_task.get_repair_info()->keyspace, _cf_name, _range, schema_version, _shard_task.get_repair_info()->reason).then([&] () { ns.state = repair_state::row_level_start_finished; nodes_to_stop.push_back(node); ns.state = repair_state::get_estimated_partitions_started; @@ -2811,43 +2811,43 @@ public: } catch (replica::no_such_column_family& e) { table_dropped = true; rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}", - _ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e); + _shard_task.get_repair_info()->id.uuid(), this_shard_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, e); _failed = true; } catch (std::exception& e) { rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}", - _ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e); + _shard_task.get_repair_info()->id.uuid(), this_shard_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, e); // In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers _failed = true; } parallel_for_each(nodes_to_stop, [&] (const gms::inet_address& node) { master.set_repair_state(repair_state::row_level_stop_started, node); - return master.repair_row_level_stop(node, _ri.keyspace, _cf_name, _range).then([node, &master] { + return master.repair_row_level_stop(node, _shard_task.get_repair_info()->keyspace, _cf_name, _range).then([node, &master] { master.set_repair_state(repair_state::row_level_stop_finished, node); }); }).get(); - _ri.update_statistics(master.stats()); + _shard_task.update_statistics(master.stats()); if (_failed) { if (table_dropped) { - throw replica::no_such_column_family(_ri.keyspace, _cf_name); + throw replica::no_such_column_family(_shard_task.get_repair_info()->keyspace, _cf_name); } else { - throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _ri.keyspace, _cf_name, _range)); + throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _shard_task.get_repair_info()->keyspace, _cf_name, _range)); } } else { update_system_repair_table().get(); } rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}", - master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr); + master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr); }); } }; -future<> repair_cf_range_row_level(repair_info& ri, +future<> repair_cf_range_row_level(shard_repair_task_impl& shard_task, sstring cf_name, table_id table_id, dht::token_range range, const std::vector& all_peer_nodes) { - return seastar::futurize_invoke([&ri, cf_name = std::move(cf_name), table_id = std::move(table_id), range = std::move(range), &all_peer_nodes] () mutable { - auto repair = row_level_repair(ri, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes); + return seastar::futurize_invoke([&shard_task, cf_name = std::move(cf_name), table_id = std::move(table_id), range = std::move(range), &all_peer_nodes] () mutable { + auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes); return do_with(std::move(repair), [] (row_level_repair& repair) { return repair.run(); }); diff --git a/repair/row_level.hh b/repair/row_level.hh index 7a0f559a14..9905174185 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -240,7 +240,7 @@ class repair_row; class repair_hasher; class repair_writer; -future<> repair_cf_range_row_level(repair_info& ri, +future<> repair_cf_range_row_level(shard_repair_task_impl& shard_task, sstring cf_name, table_id table_id, dht::token_range range, const std::vector& all_peer_nodes); future> to_repair_rows_list(repair_rows_on_wire rows,