repair: Reduce hints and batchlog flush
The hints and batchlog flush requests are issued to all nodes for each repair request when tombstone_gc repair mode is used. The amount of such flush requests is high when all nodes in the cluster run repair. It is observed it takes a long time, up to 15s, for a repair request to finish such a flush request. To reduce overhead of the flush, each node caches the flush and only executes the real flush when the cahce time has passed. It is safe to do so because the real flush_time is returned. Repair uses the smallest flush_time returned from peers as the repair time. The nice thing about the cache on the receiver side is that all senders can hit the cache. It is better than cache on the sender side. A slightly smaller flush_time compared to the real flush time will be used with the benefits of significantly dropped hints and batchlog flush. The trade-off looks reasonable. Tests: 2 nodes, with 1s batchlog delay: Before: Repair nr_repairs=20 cache_time_in_ms=0 total_repair_duration=40.04245328903198 After: Repair nr_repairs=20 cache_time_in_ms=5000 total_repair_duration=1.252073049545288 Fixes #20259
This commit is contained in:
@@ -979,6 +979,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
" This can reduce the amount of data repair has to process.")
|
||||
, repair_partition_count_estimation_ratio(this, "repair_partition_count_estimation_ratio", liveness::LiveUpdate, value_status::Used, 0.1,
|
||||
"Specify the fraction of partitions written by repair out of the total partitions. The value is currently only used for bloom filter estimation. Value is between 0 and 1.")
|
||||
, repair_hints_batchlog_flush_cache_time_in_ms(this, "repair_hints_batchlog_flush_cache_time_in_ms", liveness::LiveUpdate, value_status::Used, 60 * 1000, "The repair hints and batchlog flush request cache time. Setting 0 disables the flush cache. The cache reduces the number of hints and batchlog flushes during repair when tombstone_gc is set to repair mode. When the cache is on, a slightly smaller repair time will be used with the benefits of dropped hints and batchlog flushes.")
|
||||
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
|
||||
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
|
||||
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
|
||||
|
||||
@@ -341,6 +341,7 @@ public:
|
||||
named_value<bool> enable_compacting_data_for_streaming_and_repair;
|
||||
named_value<bool> enable_tombstone_gc_for_streaming_and_repair;
|
||||
named_value<double> repair_partition_count_estimation_ratio;
|
||||
named_value<uint32_t> repair_hints_batchlog_flush_cache_time_in_ms;
|
||||
named_value<uint32_t> ring_delay_ms;
|
||||
named_value<uint32_t> shadow_round_ms;
|
||||
named_value<uint32_t> fd_max_interval_ms;
|
||||
|
||||
@@ -145,6 +145,7 @@ struct repair_flush_hints_batchlog_request {
|
||||
};
|
||||
|
||||
struct repair_flush_hints_batchlog_response {
|
||||
gc_clock::time_point flush_time [[version 6.2]];
|
||||
};
|
||||
|
||||
verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response;
|
||||
|
||||
@@ -370,10 +370,11 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(
|
||||
co_return std::list<gms::inet_address>(participating_hosts.begin(), participating_hosts.end());
|
||||
}
|
||||
|
||||
static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db,
|
||||
|
||||
future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repair_uniq_id id,
|
||||
sstring keyspace, std::vector<sstring> cfs,
|
||||
std::unordered_set<gms::inet_address> ignore_nodes,
|
||||
std::list<gms::inet_address> participants) {
|
||||
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants) {
|
||||
auto& db = get_db().local();
|
||||
auto uuid = id.uuid();
|
||||
bool needs_flush_before_repair = false;
|
||||
if (db.features().tombstone_gc_options) {
|
||||
@@ -388,6 +389,7 @@ static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::
|
||||
}
|
||||
}
|
||||
|
||||
gc_clock::time_point flush_time;
|
||||
bool hints_batchlog_flushed = false;
|
||||
if (needs_flush_before_repair) {
|
||||
auto waiting_nodes = db.get_token_metadata().get_topology().get_all_ips();
|
||||
@@ -397,22 +399,35 @@ static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::
|
||||
auto hints_timeout = std::chrono::seconds(300);
|
||||
auto batchlog_timeout = std::chrono::seconds(300);
|
||||
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
|
||||
|
||||
auto start_time = gc_clock::now();
|
||||
std::vector<gc_clock::time_point> times;
|
||||
try {
|
||||
co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started",
|
||||
uuid, node, participants);
|
||||
try {
|
||||
auto& ms = rs.get_messaging();
|
||||
auto& ms = get_messaging();
|
||||
auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req);
|
||||
(void)resp; // nothing to do with response yet
|
||||
if (resp.flush_time == gc_clock::time_point()) {
|
||||
// This means the node does not support sending flush_time back. Use the time when the flush is requested for flush_time.
|
||||
rlogger.debug("repair[{}]: Got empty flush_time from node={}. Please upgrade the node={}.", uuid, node, node);
|
||||
times.push_back(start_time);
|
||||
} else {
|
||||
times.push_back(resp.flush_time);
|
||||
}
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}",
|
||||
uuid, node, participants, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
if (!times.empty()) {
|
||||
auto it = std::min_element(times.begin(), times.end());
|
||||
flush_time = *it;
|
||||
}
|
||||
hints_batchlog_flushed = true;
|
||||
auto duration = std::chrono::duration<float>(gc_clock::now() - start_time);
|
||||
rlogger.info("repair[{}]: Finished repair_flush_hints_batchlog flush_times={} flush_time={} flush_duration={}", uuid, times, flush_time, duration);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair",
|
||||
uuid, participants);
|
||||
@@ -420,7 +435,7 @@ static future<bool> flush_hints(repair_service& rs, repair_uniq_id id, replica::
|
||||
} else {
|
||||
rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants);
|
||||
}
|
||||
co_return hints_batchlog_flushed;
|
||||
co_return std::make_tuple(hints_batchlog_flushed, flush_time);
|
||||
}
|
||||
|
||||
|
||||
@@ -616,7 +631,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed,
|
||||
bool small_table_optimization,
|
||||
std::optional<int> ranges_parallelism)
|
||||
std::optional<int> ranges_parallelism,
|
||||
gc_clock::time_point flush_time)
|
||||
: repair_task_impl(module, id, 0, "shard", keyspace, "", "", parent_id_.uuid(), reason_)
|
||||
, rs(repair)
|
||||
, db(repair.get_db())
|
||||
@@ -635,6 +651,7 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed))
|
||||
, _small_table_optimization(small_table_optimization)
|
||||
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
|
||||
, _flush_time(flush_time)
|
||||
{
|
||||
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
|
||||
_user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited");
|
||||
@@ -740,7 +757,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra
|
||||
}
|
||||
try {
|
||||
auto dropped = co_await with_table_drop_silenced(db.local(), mm, table.id, [&] (const table_id& uuid) {
|
||||
return repair_cf_range_row_level(*this, table.name, table.id, range, neighbors, _small_table_optimization);
|
||||
return repair_cf_range_row_level(*this, table.name, table.id, range, neighbors, _small_table_optimization, _flush_time);
|
||||
});
|
||||
if (dropped) {
|
||||
dropped_tables.insert(table.name);
|
||||
@@ -1337,7 +1354,7 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
} else {
|
||||
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
|
||||
}
|
||||
bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get();
|
||||
auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes, participants).get();
|
||||
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
@@ -1387,12 +1404,12 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
auto ranges_parallelism = _ranges_parallelism;
|
||||
bool small_table_optimization = _small_table_optimization;
|
||||
for (auto shard : std::views::iota(0u, smp::count)) {
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism, small_table_optimization,
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, flush_time, ranges_parallelism, small_table_optimization,
|
||||
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
|
||||
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
|
||||
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, small_table_optimization, ranges_parallelism);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time);
|
||||
co_await task->done();
|
||||
});
|
||||
repair_results.push_back(std::move(f));
|
||||
@@ -1512,9 +1529,10 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
bool hints_batchlog_flushed = false;
|
||||
bool small_table_optimization = false;
|
||||
auto ranges_parallelism = std::nullopt;
|
||||
auto flush_time = gc_clock::time_point();
|
||||
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time);
|
||||
task_impl_ptr->neighbors = std::move(neighbors);
|
||||
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
|
||||
task->start();
|
||||
@@ -2463,12 +2481,12 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
auto my_address = erm->get_topology().my_address();
|
||||
auto participants = std::list<gms::inet_address>(m.neighbors.all.begin(), m.neighbors.all.end());
|
||||
participants.push_front(my_address);
|
||||
bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants);
|
||||
auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes, participants);
|
||||
bool small_table_optimization = false;
|
||||
|
||||
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(rs._repair_module, tasks::task_id::create_random_id(),
|
||||
m.keyspace_name, rs, erm, std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts),
|
||||
std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism);
|
||||
std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time);
|
||||
task_impl_ptr->neighbors = std::move(neighbors);
|
||||
auto task = co_await rs._repair_module->make_task(std::move(task_impl_ptr), parent_data);
|
||||
task->start();
|
||||
|
||||
@@ -265,6 +265,7 @@ struct repair_flush_hints_batchlog_request {
|
||||
};
|
||||
|
||||
struct repair_flush_hints_batchlog_response {
|
||||
gc_clock::time_point flush_time;
|
||||
};
|
||||
|
||||
struct tablet_repair_task_meta {
|
||||
|
||||
@@ -2243,36 +2243,86 @@ future<repair_update_system_table_response> repair_service::repair_update_system
|
||||
}
|
||||
|
||||
future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) {
|
||||
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={}, target_nodes={}, hints_timeout={}s, batchlog_timeout={}s",
|
||||
req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
|
||||
std::vector<gms::inet_address> target_nodes(req.target_nodes.begin(), req.target_nodes.end());
|
||||
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
|
||||
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
|
||||
try {
|
||||
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
|
||||
if (!_bm.local_is_initialized() || bm_throw) {
|
||||
throw std::runtime_error("Backlog manager isn't initialized");
|
||||
}
|
||||
co_await coroutine::all(
|
||||
[this, &from, &req, &sync_point, &deadline] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
|
||||
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_return;
|
||||
},
|
||||
[this, &from, &req] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
}
|
||||
);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}, target_hosts={}, {}",
|
||||
req.repair_uuid, from, req.target_nodes, std::current_exception());
|
||||
throw;
|
||||
if (this_shard_id() != 0) {
|
||||
co_return co_await container().invoke_on(0, [&] (auto& rs) {
|
||||
return rs.repair_flush_hints_batchlog_handler(from, std::move(req));
|
||||
});
|
||||
}
|
||||
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_return repair_flush_hints_batchlog_response();
|
||||
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} target_nodes={} hints_timeout={}s batchlog_timeout={}s",
|
||||
req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count());
|
||||
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
|
||||
bool updated = false;
|
||||
auto now = gc_clock::now();
|
||||
auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms());
|
||||
auto cache_disabled = cache_time == std::chrono::milliseconds(0);
|
||||
auto flush_time = now;
|
||||
if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) {
|
||||
// Empty targets meants all nodes
|
||||
std::vector<gms::inet_address> target_nodes;
|
||||
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes));
|
||||
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
|
||||
try {
|
||||
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
|
||||
if (!_bm.local_is_initialized() || bm_throw) {
|
||||
throw std::runtime_error("Backlog manager isn't initialized");
|
||||
}
|
||||
co_await coroutine::all(
|
||||
[this, &from, &req, &sync_point, &deadline] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
|
||||
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes);
|
||||
co_return;
|
||||
},
|
||||
[this, now, cache_disabled, &flush_time, &cache_time, &from, &req] () -> future<> {
|
||||
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes);
|
||||
auto last_replay = _bm.local().get_last_replay();
|
||||
bool issue_flush = false;
|
||||
if (cache_disabled) {
|
||||
issue_flush = true;
|
||||
flush_time = now;
|
||||
} else {
|
||||
if (now < last_replay) {
|
||||
flush_time = now;
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_now", fmt::to_string(flush_time));
|
||||
} else if (now - last_replay < cache_time) {
|
||||
// Skip the replay request since last_replay is already
|
||||
// updated since last _flush_hints_batchlog_time
|
||||
// update. It is fine to use last_replay for the hint
|
||||
// flush time because last_replay (batchlog replay
|
||||
// time) is smaller than now (hint flush time).
|
||||
flush_time = last_replay;
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_last_replay", fmt::to_string(flush_time));
|
||||
} else {
|
||||
// Issue the replay so the last_replay will be updated
|
||||
// to bigger than now after the call.
|
||||
issue_flush = true;
|
||||
flush_time = now;
|
||||
}
|
||||
}
|
||||
if (issue_flush) {
|
||||
co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
|
||||
}
|
||||
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}, flushed={}", req.repair_uuid, from, req.target_nodes, issue_flush);
|
||||
}
|
||||
);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={} target_hosts={}: {}",
|
||||
req.repair_uuid, from, req.target_nodes, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
co_await container().invoke_on_all([flush_time] (repair_service& rs) {
|
||||
rs._flush_hints_batchlog_time = flush_time;
|
||||
});
|
||||
updated = true;
|
||||
} else {
|
||||
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time));
|
||||
}
|
||||
auto duration = std::chrono::duration<float>(gc_clock::now() - now);
|
||||
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} target_nodes={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
|
||||
req.repair_uuid, from, req.target_nodes, updated, _flush_hints_batchlog_time, cache_time, duration);
|
||||
repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time };
|
||||
co_return resp;
|
||||
}
|
||||
|
||||
future<> repair_service::init_ms_handlers() {
|
||||
@@ -2567,7 +2617,8 @@ public:
|
||||
table_id table_id,
|
||||
dht::token_range range,
|
||||
std::vector<gms::inet_address> all_live_peer_nodes,
|
||||
bool small_table_optimization)
|
||||
bool small_table_optimization,
|
||||
gc_clock::time_point start_time)
|
||||
: _shard_task(shard_task)
|
||||
, _cf_name(std::move(cf_name))
|
||||
, _table_id(std::move(table_id))
|
||||
@@ -2575,7 +2626,7 @@ public:
|
||||
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
|
||||
, _small_table_optimization(small_table_optimization)
|
||||
, _seed(get_random_seed())
|
||||
, _start_time(gc_clock::now())
|
||||
, _start_time(start_time)
|
||||
, _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets())
|
||||
{
|
||||
repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range);
|
||||
@@ -3075,8 +3126,9 @@ public:
|
||||
|
||||
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
sstring cf_name, table_id table_id, dht::token_range range,
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization) {
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization);
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time) {
|
||||
auto start_time = flush_time;
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time);
|
||||
co_return co_await repair.run();
|
||||
}
|
||||
|
||||
|
||||
@@ -120,6 +120,12 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
future<> init_ms_handlers();
|
||||
future<> uninit_ms_handlers();
|
||||
|
||||
seastar::semaphore _flush_hints_batchlog_sem{1};
|
||||
gc_clock::time_point _flush_hints_batchlog_time;
|
||||
future<std::tuple<bool, gc_clock::time_point>> flush_hints(repair_uniq_id id,
|
||||
sstring keyspace, std::vector<sstring> cfs,
|
||||
std::unordered_set<gms::inet_address> ignore_nodes, std::list<gms::inet_address> participants);
|
||||
|
||||
public:
|
||||
repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
distributed<gms::gossiper>& gossiper,
|
||||
@@ -265,7 +271,7 @@ class repair_writer;
|
||||
|
||||
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
sstring cf_name, table_id table_id, dht::token_range range,
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization);
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time);
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
|
||||
schema_ptr s, uint64_t seed, repair_master is_master,
|
||||
reader_permit permit, repair_hasher hasher);
|
||||
|
||||
@@ -158,6 +158,7 @@ private:
|
||||
std::optional<sstring> _failed_because;
|
||||
std::optional<semaphore> _user_ranges_parallelism;
|
||||
uint64_t _ranges_complete = 0;
|
||||
gc_clock::time_point _flush_time;
|
||||
public:
|
||||
shard_repair_task_impl(tasks::task_manager::module_ptr module,
|
||||
tasks::task_id id,
|
||||
@@ -173,7 +174,8 @@ public:
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed,
|
||||
bool small_table_optimization,
|
||||
std::optional<int> ranges_parallelism);
|
||||
std::optional<int> ranges_parallelism,
|
||||
gc_clock::time_point flush_time);
|
||||
void check_failed_ranges();
|
||||
void check_in_abort_or_shutdown();
|
||||
repair_neighbors get_repair_neighbors(const dht::token_range& range);
|
||||
|
||||
Reference in New Issue
Block a user