repair: keep a reference to shard_repair_task_impl in row_level_repair

As a part of replacing repair_info with shard_repair_task_impl,
instead of a reference to repair_info, row_level_repair keeps
a reference to shard_repair_task_impl.
This commit is contained in:
Aleksandra Martyniuk
2022-11-13 16:12:25 +01:00
parent 9b664570f0
commit 55c01a1beb
4 changed files with 51 additions and 48 deletions

View File

@@ -688,7 +688,7 @@ future<> shard_repair_task_impl::repair_range(const dht::token_range& range, ::t
if (_ri->dropped_tables.contains(cf)) {
return make_ready_future<>();
}
return repair_cf_range_row_level(*_ri, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable {
return repair_cf_range_row_level(*this, cf, table_id, range, neighbors).handle_exception_type([this, cf] (replica::no_such_column_family&) mutable {
_ri->dropped_tables.insert(cf);
return make_ready_future<>();
}).handle_exception([this] (std::exception_ptr ep) mutable {

View File

@@ -89,6 +89,9 @@ public:
, _ex(std::move(ex))
{}
lw_shared_ptr<repair_info> get_repair_info() const noexcept {
return _ri;
}
void check_failed_ranges();
void abort_repair_info() noexcept;
void check_in_abort();

View File

@@ -2369,7 +2369,7 @@ static void add_to_repair_meta_for_followers(repair_meta& rm) {
}
class row_level_repair {
repair_info& _ri;
shard_repair_task_impl& _shard_task;
sstring _cf_name;
table_id _table_id;
dht::token_range _range;
@@ -2418,17 +2418,17 @@ class row_level_repair {
gc_clock::time_point _start_time;
public:
row_level_repair(repair_info& ri,
row_level_repair(shard_repair_task_impl& shard_task,
sstring cf_name,
table_id table_id,
dht::token_range range,
std::vector<gms::inet_address> all_live_peer_nodes)
: _ri(ri)
: _shard_task(shard_task)
, _cf_name(std::move(cf_name))
, _table_id(std::move(table_id))
, _range(std::move(range))
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
, _cf(_ri.db.local().find_column_family(_table_id))
, _cf(_shard_task.get_repair_info()->db.local().find_column_family(_table_id))
, _seed(get_random_seed())
, _start_time(gc_clock::now()) {
}
@@ -2443,7 +2443,7 @@ private:
inet_address_vector_replica_set sort_peer_nodes(const std::vector<gms::inet_address>& nodes) {
auto myip = utils::fb_utilities::get_broadcast_address();
inet_address_vector_replica_set sorted_nodes(nodes.begin(), nodes.end());
_ri.db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes);
_shard_task.get_repair_info()->db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes);
return sorted_nodes;
}
@@ -2454,8 +2454,8 @@ private:
// Step A: Negotiate sync boundary to use
op_status negotiate_sync_boundary(repair_meta& master) {
_ri.check_in_shutdown();
_ri.check_in_abort();
_shard_task.check_in_shutdown();
_shard_task.check_in_abort();
_sync_boundaries.clear();
_combined_hashes.clear();
_zero_rows = false;
@@ -2514,8 +2514,8 @@ private:
// Step B: Get missing rows from peer nodes so that local node contains all the rows
op_status get_missing_rows_from_follower_nodes(repair_meta& master) {
_ri.check_in_shutdown();
_ri.check_in_abort();
_shard_task.check_in_shutdown();
_shard_task.check_in_abort();
// `combined_hashes` contains the combined hashes for the
// `_working_row_buf`. Like `_row_buf`, `_working_row_buf` contains
// rows which are within the (_last_sync_boundary, _current_sync_boundary]
@@ -2646,8 +2646,8 @@ private:
void send_missing_rows_to_follower_nodes(repair_meta& master) {
// At this time, repair master contains all the rows between (_last_sync_boundary, _current_sync_boundary]
// So we can figure out which rows peer node are missing and send the missing rows to them
_ri.check_in_shutdown();
_ri.check_in_abort();
_shard_task.check_in_shutdown();
_shard_task.check_in_abort();
repair_hash_set local_row_hash_sets = master.working_row_hashes().get0();
auto sz = _all_live_peer_nodes.size();
std::vector<repair_hash_set> set_diffs(sz);
@@ -2678,37 +2678,37 @@ private:
// Update system.repair_history table
future<> update_system_repair_table() {
// Update repair_history table only if it is a reguar repair.
if (_ri.reason != streaming::stream_reason::repair) {
if (_shard_task.get_repair_info()->reason != streaming::stream_reason::repair) {
co_return;
}
// Update repair_history table only if all replicas have been repaired
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_ri.total_rf != repaired_replicas){
if (_shard_task.get_repair_info()->total_rf != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_ri.id.uuid(), _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
_shard_task.get_repair_info()->id.uuid(), _shard_task.get_repair_info()->total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
if (!_ri.hints_batchlog_flushed()) {
if (!_shard_task.get_repair_info()->hints_batchlog_flushed()) {
co_return;
}
repair_service& rs = _ri.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid(), _table_id, _range, _start_time);
repair_service& rs = _shard_task.get_repair_info()->rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.get_repair_info()->id.uuid(), _table_id, _range, _start_time);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_ri.id.uuid(), _table_id, _ri.keyspace, _cf_name, _range, repair_time};
repair_update_system_table_request req{_shard_task.get_repair_info()->id.uuid(), _table_id, _shard_task.get_repair_info()->keyspace, _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
try {
auto& ms = _ri.messaging.local();
auto& ms = _shard_task.get_repair_info()->messaging.local();
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
(void)resp; // nothing to do with the response yet
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid(), node);
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.get_repair_info()->id.uuid(), node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid(), node, std::current_exception());
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _shard_task.get_repair_info()->id.uuid(), node, std::current_exception());
}
});
co_return;
@@ -2717,33 +2717,33 @@ private:
public:
future<> run() {
return seastar::async([this] {
_ri.check_in_shutdown();
_ri.check_in_abort();
auto repair_meta_id = _ri.rs.get_next_repair_meta_id().get0();
auto algorithm = get_common_diff_detect_algorithm(_ri.messaging.local(), _all_live_peer_nodes);
_shard_task.check_in_shutdown();
_shard_task.check_in_abort();
auto repair_meta_id = _shard_task.get_repair_info()->rs.get_next_repair_meta_id().get0();
auto algorithm = get_common_diff_detect_algorithm(_shard_task.get_repair_info()->messaging.local(), _all_live_peer_nodes);
auto max_row_buf_size = get_max_row_buf_size(algorithm);
auto master_node_shard_config = shard_config {
this_shard_id(),
_ri.sharder.shard_count(),
_ri.sharder.sharding_ignore_msb()
_shard_task.get_repair_info()->sharder.shard_count(),
_shard_task.get_repair_info()->sharder.sharding_ignore_msb()
};
auto s = _cf.schema();
auto schema_version = s->version();
bool table_dropped = false;
auto& mem_sem = _ri.rs.memory_sem();
auto max = _ri.rs.max_repair_memory();
auto& mem_sem = _shard_task.get_repair_info()->rs.memory_sem();
auto max = _shard_task.get_repair_info()->rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range;
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid(), wanted, mem_sem.current(), max);
_shard_task.get_repair_info()->id.uuid(), wanted, mem_sem.current(), max);
auto mem_permit = seastar::get_units(mem_sem, wanted).get0();
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid(), wanted, mem_sem.current(), max);
_shard_task.get_repair_info()->id.uuid(), wanted, mem_sem.current(), max);
auto permit = _ri.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
auto permit = _shard_task.get_repair_info()->db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
repair_meta master(_ri.rs,
repair_meta master(_shard_task.get_repair_info()->rs,
_cf,
s,
std::move(permit),
@@ -2753,7 +2753,7 @@ public:
_seed,
repair_master::yes,
repair_meta_id,
_ri.reason,
_shard_task.get_repair_info()->reason,
std::move(master_node_shard_config),
_all_live_peer_nodes,
_all_live_peer_nodes.size(),
@@ -2765,7 +2765,7 @@ public:
});
rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}",
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size);
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_repair_info()->keyspace, _cf_name, schema_version, _range, _seed, max_row_buf_size);
std::vector<gms::inet_address> nodes_to_stop;
@@ -2774,7 +2774,7 @@ public:
parallel_for_each(master.all_nodes(), [&, this] (repair_node_state& ns) {
const auto& node = ns.node;
ns.state = repair_state::row_level_start_started;
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version, _ri.reason).then([&] () {
return master.repair_row_level_start(node, _shard_task.get_repair_info()->keyspace, _cf_name, _range, schema_version, _shard_task.get_repair_info()->reason).then([&] () {
ns.state = repair_state::row_level_start_finished;
nodes_to_stop.push_back(node);
ns.state = repair_state::get_estimated_partitions_started;
@@ -2811,43 +2811,43 @@ public:
} catch (replica::no_such_column_family& e) {
table_dropped = true;
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_shard_task.get_repair_info()->id.uuid(), this_shard_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, e);
_failed = true;
} catch (std::exception& e) {
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_shard_task.get_repair_info()->id.uuid(), this_shard_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, e);
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
_failed = true;
}
parallel_for_each(nodes_to_stop, [&] (const gms::inet_address& node) {
master.set_repair_state(repair_state::row_level_stop_started, node);
return master.repair_row_level_stop(node, _ri.keyspace, _cf_name, _range).then([node, &master] {
return master.repair_row_level_stop(node, _shard_task.get_repair_info()->keyspace, _cf_name, _range).then([node, &master] {
master.set_repair_state(repair_state::row_level_stop_finished, node);
});
}).get();
_ri.update_statistics(master.stats());
_shard_task.update_statistics(master.stats());
if (_failed) {
if (table_dropped) {
throw replica::no_such_column_family(_ri.keyspace, _cf_name);
throw replica::no_such_column_family(_shard_task.get_repair_info()->keyspace, _cf_name);
} else {
throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _ri.keyspace, _cf_name, _range));
throw std::runtime_error(format("Failed to repair for keyspace={}, cf={}, range={}", _shard_task.get_repair_info()->keyspace, _cf_name, _range));
}
} else {
update_system_repair_table().get();
}
rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}",
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _ri.keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
master.myip(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_repair_info()->keyspace, _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
});
}
};
future<> repair_cf_range_row_level(repair_info& ri,
future<> repair_cf_range_row_level(shard_repair_task_impl& shard_task,
sstring cf_name, table_id table_id, dht::token_range range,
const std::vector<gms::inet_address>& all_peer_nodes) {
return seastar::futurize_invoke([&ri, cf_name = std::move(cf_name), table_id = std::move(table_id), range = std::move(range), &all_peer_nodes] () mutable {
auto repair = row_level_repair(ri, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes);
return seastar::futurize_invoke([&shard_task, cf_name = std::move(cf_name), table_id = std::move(table_id), range = std::move(range), &all_peer_nodes] () mutable {
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes);
return do_with(std::move(repair), [] (row_level_repair& repair) {
return repair.run();
});

View File

@@ -240,7 +240,7 @@ class repair_row;
class repair_hasher;
class repair_writer;
future<> repair_cf_range_row_level(repair_info& ri,
future<> repair_cf_range_row_level(shard_repair_task_impl& shard_task,
sstring cf_name, table_id table_id, dht::token_range range,
const std::vector<gms::inet_address>& all_peer_nodes);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,