Files
scylladb/repair/row_level.cc
Asias He 4f77dd058d repair: Add tablet repair progress report support
This patch adds tablet repair progress report support so that the user
could use the /task_manager/task_status API to query the progress.

In order to support this, a new system table is introduced to record the
user request related info, i.e, start of the request and end of the
request.

The progress is accurate when tablet split or merge happens in the
middle of the request, since the tokens of the tablet are recorded when
the request is started and when repair of each tablet is finished. The
original tablet repair is considered as finished when the finished
ranges cover the original tablet token ranges.

After this patch, the /task_manager/task_status API will report correct
progress_total and progress_completed.

Fixes #22564
Fixes #26896

Closes scylladb/scylladb#27679
2026-01-08 21:55:18 +02:00

3956 lines
193 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <exception>
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include "dht/auto_refreshing_sharder.hh"
#include "db/view/view_building_worker.hh"
#include "gms/endpoint_state.hh"
#include "repair/repair.hh"
#include "message/messaging_service.hh"
#include "repair/task_manager_module.hh"
#include <seastar/coroutine/exception.hh>
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "mutation/mutation_fragment.hh"
#include "mutation_writer/multishard_writer.hh"
#include "dht/i_partitioner.hh"
#include "dht/sharder.hh"
#include "utils/assert.hh"
#include "utils/xx_hasher.hh"
#include "utils/UUID.hh"
#include "replica/database.hh"
#include <seastar/util/bool_class.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <list>
#include <vector>
#include <algorithm>
#include <random>
#include <optional>
#include <boost/intrusive/list.hpp>
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/gossiper.hh"
#include "repair/row_level.hh"
#include "utils/stall_free.hh"
#include "utils/to_string.hh"
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/as_future.hh>
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "db/batchlog_manager.hh"
#include "idl/repair.dist.hh"
#include "readers/empty.hh"
#include "readers/evictable.hh"
#include "readers/queue.hh"
#include "readers/filtering.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "repair/hash.hh"
#include "repair/decorated_key_with_hash.hh"
#include "repair/row.hh"
#include "repair/writer.hh"
#include "repair/reader.hh"
#include "repair/incremental.hh"
#include "compaction/compaction_manager.hh"
#include "utils/xx_hasher.hh"
#include "utils/error_injection.hh"
#include "locator/tablets.hh"
#include "gms/feature_service.hh"
extern logging::logger rlogger;
static bool inject_rpc_stream_error = false;
static shard_id get_dst_shard_id(uint32_t src_cpu_id, const rpc::optional<shard_id>& dst_cpu_id_opt) {
uint32_t dst_cpu_id = 0;
if (dst_cpu_id_opt && *dst_cpu_id_opt != repair_unspecified_shard) {
dst_cpu_id = *dst_cpu_id_opt;
} else {
dst_cpu_id = src_cpu_id % smp::count;
}
return dst_cpu_id;
}
enum class repair_state : uint16_t {
unknown,
row_level_start_started,
row_level_start_finished,
get_estimated_partitions_started,
get_estimated_partitions_finished,
set_estimated_partitions_started,
set_estimated_partitions_finished,
get_sync_boundary_started,
get_sync_boundary_finished,
get_combined_row_hash_started,
get_combined_row_hash_finished,
get_row_diff_with_rpc_stream_started,
get_row_diff_with_rpc_stream_finished,
get_row_diff_and_update_peer_row_hash_sets_started,
get_row_diff_and_update_peer_row_hash_sets_finished,
get_full_row_hashes_with_rpc_stream_started,
get_full_row_hashes_with_rpc_stream_finished,
get_full_row_hashes_started,
get_full_row_hashes_finished,
get_row_diff_started,
get_row_diff_finished,
put_row_diff_with_rpc_stream_started,
put_row_diff_with_rpc_stream_finished,
put_row_diff_started,
put_row_diff_finished,
row_level_stop_started,
row_level_stop_finished,
};
struct repair_node_state {
locator::host_id node;
repair_state state = repair_state::unknown;
// The shard that repair instance runs on
shard_id shard;
explicit repair_node_state(locator::host_id n) : node(n) { }
explicit repair_node_state(locator::host_id n, shard_id s) : node(n), shard(s) { }
};
// Wraps sink and source objects for repair master or repair follower nodes.
// For repair master, it stores sink and source pair for each of the followers.
// For repair follower, it stores one sink and source pair for repair master.
template<class SinkType, class SourceType>
class sink_source_for_repair {
uint32_t _repair_meta_id;
using get_sink_source_fn_type = std::function<future<std::tuple<rpc::sink<SinkType>, rpc::source<SourceType>>> (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id, locator::host_id addr)>;
using sink_type = std::reference_wrapper<rpc::sink<SinkType>>;
using source_type = std::reference_wrapper<rpc::source<SourceType>>;
// The vectors below store sink and source object for peer nodes.
std::vector<std::optional<rpc::sink<SinkType>>> _sinks;
std::vector<std::optional<rpc::source<SourceType>>> _sources;
std::vector<bool> _sources_closed;
get_sink_source_fn_type _fn;
public:
sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn)
: _repair_meta_id(repair_meta_id)
, _sinks(nr_peer_nodes)
, _sources(nr_peer_nodes)
, _sources_closed(nr_peer_nodes, false)
, _fn(std::move(fn)) {
}
void mark_source_closed(unsigned node_idx) {
_sources_closed[node_idx] = true;
}
future<std::tuple<sink_type, source_type>> get_sink_source(locator::host_id remote_node, unsigned node_idx, std::optional<shard_id> dst_cpu_id) {
using value_type = std::tuple<sink_type, source_type>;
if (_sinks[node_idx] && _sources[node_idx]) {
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
if (_sinks[node_idx] || _sources[node_idx]) {
throw std::runtime_error(format("sink or source is missing for node {}", remote_node));
}
auto [sink, source] = co_await _fn(_repair_meta_id, dst_cpu_id, remote_node);
_sinks[node_idx].emplace(std::move(sink));
_sources[node_idx].emplace(std::move(source));
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
future<> close() {
co_await coroutine::parallel_for_each(std::views::iota(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) -> future<> {
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
f = co_await coroutine::as_future(std::move(f));
try {
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
if (source_opt && !_sources_closed[node_idx]) {
while (co_await (*source_opt)()) {
// Keep reading source until end of stream
}
}
} catch (...) {
// Ignore the exception as we're just draining *source_opt
}
f.get();
});
}
};
using sink_source_for_get_full_row_hashes = sink_source_for_repair<repair_stream_cmd, repair_hash_with_cmd>;
using sink_source_for_get_row_diff = sink_source_for_repair<repair_hash_with_cmd, repair_row_on_wire_with_cmd>;
using sink_source_for_put_row_diff = sink_source_for_repair<repair_row_on_wire_with_cmd, repair_stream_cmd>;
struct row_level_repair_metrics {
seastar::metrics::metric_groups _metrics;
uint64_t tx_row_nr{0};
uint64_t rx_row_nr{0};
uint64_t tx_row_bytes{0};
uint64_t rx_row_bytes{0};
uint64_t row_from_disk_nr{0};
uint64_t row_from_disk_bytes{0};
uint64_t tx_hashes_nr{0};
uint64_t rx_hashes_nr{0};
uint64_t inc_sst_skipped_bytes{0};
uint64_t inc_sst_read_bytes{0};
uint64_t tablet_time_ms{0};
row_level_repair_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("repair", {
sm::make_counter("tx_row_nr", tx_row_nr,
sm::description("Total number of rows sent on this shard.")),
sm::make_counter("rx_row_nr", rx_row_nr,
sm::description("Total number of rows received on this shard.")),
sm::make_counter("tx_row_bytes", tx_row_bytes,
sm::description("Total bytes of rows sent on this shard.")),
sm::make_counter("rx_row_bytes", rx_row_bytes,
sm::description("Total bytes of rows received on this shard.")),
sm::make_counter("tx_hashes_nr", tx_hashes_nr,
sm::description("Total number of row hashes sent on this shard.")),
sm::make_counter("rx_hashes_nr", rx_hashes_nr,
sm::description("Total number of row hashes received on this shard.")),
sm::make_counter("row_from_disk_nr", row_from_disk_nr,
sm::description("Total number of rows read from disk on this shard.")),
sm::make_counter("row_from_disk_bytes", row_from_disk_bytes,
sm::description("Total bytes of rows read from disk on this shard.")),
sm::make_counter("inc_sst_skipped_bytes", inc_sst_skipped_bytes,
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
sm::make_counter("tablet_time_ms", tablet_time_ms,
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
});
}
};
static thread_local row_level_repair_metrics _metrics;
static const std::vector<row_level_diff_detect_algorithm>& suportted_diff_detect_algorithms() {
static std::vector<row_level_diff_detect_algorithm> _algorithms = {
row_level_diff_detect_algorithm::send_full_set,
row_level_diff_detect_algorithm::send_full_set_rpc_stream,
};
return _algorithms;
};
static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const host_id_vector_replica_set& nodes) {
std::vector<std::vector<row_level_diff_detect_algorithm>> nodes_algorithms(nodes.size());
parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> {
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, nodes[idx]);
std::sort(algorithms.begin(), algorithms.end());
nodes_algorithms[idx] = std::move(algorithms);
rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]);
})).get();
auto common_algorithms = suportted_diff_detect_algorithms();
for (auto& algorithms : nodes_algorithms) {
std::sort(common_algorithms.begin(), common_algorithms.end());
std::vector<row_level_diff_detect_algorithm> results;
std::set_intersection(algorithms.begin(), algorithms.end(),
common_algorithms.begin(), common_algorithms.end(),
std::back_inserter(results));
common_algorithms = std::move(results);
}
rlogger.trace("peer_algorithms={}, local_algorithms={}, common_diff_detect_algorithms={}",
nodes_algorithms, suportted_diff_detect_algorithms(), common_algorithms);
if (common_algorithms.empty()) {
throw std::runtime_error("Can not find row level repair diff detect algorithm");
}
return common_algorithms.back();
}
static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) {
// send_full_set is the only algorithm that does not support rpc stream
return algo != row_level_diff_detect_algorithm::send_full_set;
}
static uint64_t get_random_seed() {
static thread_local std::default_random_engine random_engine{std::random_device{}()};
static thread_local std::uniform_int_distribution<uint64_t> random_dist{};
return random_dist(random_engine);
}
repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
xx_hasher h(_seed);
feed_hash(h, mf, *_schema);
feed_hash(h, dk_with_hash.hash.hash);
return repair_hash(h.finalize_uint64());
}
mutation_reader repair_reader::make_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard,
gc_clock::time_point compaction_time,
incremental_repair_meta inc) {
switch (strategy) {
case read_strategy::local: {
auto ms = mutation_source([&cf, compaction_time] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr, compaction_time);
});
mutation_reader rd(nullptr);
std::tie(rd, _reader_handle) = make_manually_paused_evictable_reader(
std::move(ms),
_schema,
_permit,
_range,
_schema->full_slice(),
{},
mutation_reader::forwarding::no);
return rd;
}
case read_strategy::multishard_split: {
std::optional<size_t> multishard_reader_buffer_size;
const auto& dbconfig = db.local().get_config();
if (dbconfig.repair_multishard_reader_buffer_hint_size()) {
// Setting the repair buffer size as the multishard reader's buffer
// size helps avoid extra cross-shard round-trips and possible
// evict-recreate cycles.
multishard_reader_buffer_size = dbconfig.repair_multishard_reader_buffer_hint_size();
}
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
auto shard_range = _sharder.next();
if (shard_range) {
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, compaction_time, multishard_reader_buffer_size, read_ahead(dbconfig.repair_multishard_reader_enable_read_ahead()));
}
case read_strategy::multishard_filter: {
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}, read_ahead::yes),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_for_reads(k.token()) == remote_shard;
});
}
case read_strategy::incremental_repair: {
return cf.make_streaming_reader(_schema, _permit, _range, inc.sst_set, gc_clock::now());
}
default:
on_internal_error(rlogger,
format("make_reader: unexpected read_strategy {}", static_cast<int>(strategy)));
}
}
repair_reader::repair_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
const dht::static_sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy,
gc_clock::time_point compaction_time,
incremental_repair_meta inc)
: _schema(s)
, _permit(std::move(permit))
, _range(dht::to_partition_range(range))
, _sharder(remote_sharder, range, remote_shard)
, _seed(seed)
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc))
{ }
future<mutation_fragment_opt>
repair_reader::read_mutation_fragment() {
++_reads_issued;
return _reader().then_wrapped([this] (future<mutation_fragment_opt> f) {
try {
auto mfopt = f.get();
++_reads_finished;
return mfopt;
} catch (seastar::timed_out_error& e) {
rlogger.warn("Failed to read a fragment from the reader, keyspace={}, table={}, range={}: {}",
_schema->ks_name(), _schema->cf_name(), _range, e);
throw;
} catch (...) {
throw;
}
});
}
future<> repair_reader::on_end_of_stream() noexcept {
co_await _reader.close();
_permit.release_base_resources();
_reader = mutation_fragment_v1_stream(make_empty_mutation_reader(_schema, _permit));
_reader_handle.reset();
}
future<> repair_reader::close() noexcept {
co_await _reader.close();
_permit.release_base_resources();
_reader_handle.reset();
}
void repair_reader::set_current_dk(const dht::decorated_key& key) {
_current_dk = make_lw_shared<const decorated_key_with_hash>(*_schema, key, _seed);
}
void repair_reader::clear_current_dk() {
_current_dk = {};
}
void repair_reader::check_current_dk() {
if (!_current_dk) {
throw std::runtime_error("Current partition_key is unknown");
}
}
void repair_reader::pause() {
if (_reader_handle) {
_reader_handle->pause();
}
}
class repair_writer_impl : public repair_writer::impl {
schema_ptr _schema;
std::optional<int64_t> _repaired_at;
reader_permit _permit;
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
db::view::view_builder& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
streaming::stream_reason _reason;
mutation_reader _queue_reader;
service::frozen_topology_guard _topo_guard;
public:
repair_writer_impl(
schema_ptr schema,
std::optional<int64_t> repaired_at,
reader_permit permit,
sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
streaming::stream_reason reason,
mutation_fragment_queue queue,
mutation_reader queue_reader,
service::frozen_topology_guard topo_guard)
: _schema(std::move(schema))
, _repaired_at(repaired_at)
, _permit(std::move(permit))
, _mq(std::move(queue))
, _db(db)
, _view_builder(view_builder)
, _view_building_worker(view_building_worker)
, _reason(reason)
, _queue_reader(std::move(queue_reader))
, _topo_guard(topo_guard)
{}
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override;
virtual mutation_fragment_queue& queue() override {
return _mq;
}
virtual future<> wait_for_writer_done() override;
private:
static sstables::offstrategy is_offstrategy_supported(streaming::stream_reason reason) {
static const std::unordered_set<streaming::stream_reason> operations_supported = {
streaming::stream_reason::bootstrap,
streaming::stream_reason::replace,
streaming::stream_reason::removenode,
streaming::stream_reason::decommission,
streaming::stream_reason::repair,
streaming::stream_reason::rebuild,
};
return sstables::offstrategy(operations_supported.contains(reason));
}
};
future<> repair_writer::write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
_current_dk_written_to_sstable = dk;
if (mf.is_partition_start()) {
return _mq->push(std::move(mf)).then([this] {
_partition_opened = true;
});
} else {
auto start = mutation_fragment(*_schema, _permit, partition_start(dk->dk, tombstone()));
return _mq->push(std::move(start)).then([this, mf = std::move(mf)] () mutable {
_partition_opened = true;
return _mq->push(std::move(mf));
});
}
};
class queue_reader_handle_adapter : public mutation_fragment_queue::impl {
queue_reader_handle _handle;
public:
queue_reader_handle_adapter(queue_reader_handle handle)
: _handle(std::move(handle))
{}
virtual future<> push(mutation_fragment_v2 mf) override {
return _handle.push(std::move(mf));
}
virtual void abort(std::exception_ptr ep) override {
_handle.abort(std::move(ep));
}
virtual void push_end_of_stream() override {
_handle.push_end_of_stream();
}
};
mutation_fragment_queue make_mutation_fragment_queue(schema_ptr s, reader_permit permit, queue_reader_handle handle) {
return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared<queue_reader_handle_adapter>(std::move(handle)));
}
struct sharder_helper {
struct tablet_sharder_keepalive {
std::unique_ptr<dht::auto_refreshing_sharder> sharder_ptr;
service::topology_guard topo_guard;
};
using sharder_keepalive = std::variant<tablet_sharder_keepalive, locator::effective_replication_map_ptr>;
sharder_keepalive keepalive;
const dht::sharder& sharder;
sharder_helper(sharder_keepalive s_keepalive, const dht::sharder& s)
: keepalive(std::move(s_keepalive))
, sharder(s)
{}
};
sharder_helper get_sharder_helper(replica::table& t, const schema& s, service::frozen_topology_guard frozen_topo_guard) {
if (frozen_topo_guard == service::default_session_id) {
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(s);
sharder_helper::sharder_keepalive keepalive = std::move(erm);
return sharder_helper{std::move(keepalive), sharder};
} else {
sharder_helper::tablet_sharder_keepalive keepalive{
.sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(t.shared_from_this()),
.topo_guard = frozen_topo_guard
};
auto& sharder = *keepalive.sharder_ptr;
return sharder_helper{std::move(keepalive), sharder};
}
}
void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
if (_writer_done) {
return;
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
// #17384 don't use off-strategy for repair (etc) if using tablets. sstables generated will
// be single token range and can just be added to normal sstable set as is, eventually
// handled by normal compaction.
auto off_str = t.uses_tablets() ? sstables::offstrategy(false) : is_offstrategy_supported(_reason);
auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard);
bool mark_as_repaired = t.uses_tablets() && _repaired_at.has_value();
auto& sst_list = w->get_sstable_list_to_mark_as_repaired();
auto shard = this_shard_id();
auto inc_repair_handler = [&sst_list, mark_as_repaired, shard, sid = _topo_guard] (sstables::shared_sstable sst) mutable {
if (!mark_as_repaired) {
return;
}
if (shard != this_shard_id()) {
return;
}
sst->being_repaired = sid;
sst_list.insert(sst);
};
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, off_str,
_topo_guard, inc_repair_handler),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
return utils::get_local_injector().inject("repair_writer_impl_create_writer_wait", utils::wait_for_message(200s));
}).handle_exception([w, keepalive = std::move(sharder.keepalive)] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
w->schema()->ks_name(), w->schema()->cf_name(), ep);
w->queue().abort(ep);
return make_exception_future<>(std::move(ep));
});
}
lw_shared_ptr<repair_writer> make_repair_writer(
schema_ptr schema,
std::optional<int64_t> repaired_at,
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
service::frozen_topology_guard topo_guard) {
auto [queue_reader, queue_handle] = make_queue_reader(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, repaired_at, permit, db, view_builder, view_building_worker, reason, std::move(queue), std::move(queue_reader), topo_guard);
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}
future<> repair_writer::write_partition_end() {
if (_partition_opened) {
return _mq->push(mutation_fragment(*_schema, _permit, partition_end())).then([this] {
_partition_opened = false;
});
}
return make_ready_future<>();
}
future<> repair_writer::do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
if (_current_dk_written_to_sstable) {
const auto cmp_res = _current_dk_written_to_sstable->dk.tri_compare(*_schema, dk->dk);
if (cmp_res > 0) {
on_internal_error(rlogger, format("repair_writer::do_write(): received out-of-order partition, current: {}, next: {}", _current_dk_written_to_sstable->dk, dk->dk));
} else if (cmp_res == 0) {
return _mq->push(std::move(mf));
} else {
return write_partition_end().then([this,
dk = std::move(dk), mf = std::move(mf)] () mutable {
return write_start_and_mf(std::move(dk), std::move(mf));
});
}
} else {
return write_start_and_mf(std::move(dk), std::move(mf));
}
}
future<> repair_writer::write_end_of_stream() {
if (_created_writer) {
return with_semaphore(_sem, 1, [this] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end().then([this] () mutable {
_mq->push_end_of_stream();
}).handle_exception([this] (std::exception_ptr ep) {
_mq->abort(ep);
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
});
} else {
return make_ready_future<>();
}
}
future<> repair_writer_impl::wait_for_writer_done() {
if (_writer_done) {
return std::move(*(_writer_done));
} else {
return make_ready_future<>();
}
}
future<> repair_writer::wait_for_writer_done() {
return when_all_succeed(write_end_of_stream(), _impl->wait_for_writer_done()).discard_result().handle_exception(
[this] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
}
class repair_meta;
class repair_meta_tracker;
class row_level_repair;
static void add_to_repair_meta_for_masters(repair_meta& rm);
static void add_to_repair_meta_for_followers(repair_meta& rm);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr s, uint64_t seed, repair_master is_master, reader_permit permit, repair_hasher hasher) {
std::list<repair_row> row_list;
std::exception_ptr ex;
try {
lw_shared_ptr<const decorated_key_with_hash> dk_ptr;
lw_shared_ptr<mutation_fragment> last_mf;
position_in_partition::tri_compare cmp(*s);
// Consume the rows and mutation_fragment:s below incrementally
// as it interleaves frees with allocation, reducing memory pressure,
// and to prevent reactor stalls when freeing a large repair_rows_on_wire
// object in one shot when the function returns.
for (auto it = rows.begin(); it != rows.end(); it = rows.erase(it)) {
auto x = std::move(*it);
if (dk_ptr && x.use_last_partition_key()) {
// Use previous partition key when an empty partition is received
} else {
dht::decorated_key dk = dht::decorate_key(*s, x.get_key());
if (!(dk_ptr && dk_ptr->dk.equal(*s, dk))) {
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*s, dk, seed);
// Reset last_mf only when new partition is seen
last_mf = {};
}
}
auto& mutation_fragments = x.get_mutation_fragments();
if (is_master) {
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
// Keep the mutation_fragment in repair_row as an
// optimization to avoid unfreeze again when
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
auto hash = hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
co_await coroutine::maybe_yield();
}
} else {
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
// If the mutation_fragment has the same position as
// the last mutation_fragment, it means they are the
// same row with different contents. We can not feed
// such rows into the sstable writer. Instead we apply
// the mutation_fragment into the previous one.
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
last_mf = mf;
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
}
co_await coroutine::maybe_yield();
}
}
co_await coroutine::maybe_yield();
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await utils::clear_gently(rows);
co_await utils::clear_gently(row_list);
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(row_list);
}
class repair_meta {
friend repair_meta_tracker;
public:
using update_working_row_buf = bool_class<class update_working_row_buf_tag>;
using update_peer_row_hash_sets = bool_class<class update_peer_row_hash_sets_tag>;
using needs_all_rows_t = bool_class<class needs_all_rows_tag>;
using msg_addr = netw::messaging_service::msg_addr;
using tracker_link_type = boost::intrusive::list_member_hook<bi::link_mode<boost::intrusive::auto_unlink>>;
private:
repair_service& _rs;
seastar::sharded<replica::database>& _db;
netw::messaging_service& _messaging;
schema_ptr _schema;
reader_permit _permit;
dht::token_range _range;
repair_sync_boundary::tri_compare _cmp;
// The algorithm used to find the row difference
row_level_diff_detect_algorithm _algo;
// Max rows size can be stored in _row_buf
size_t _max_row_buf_size;
uint64_t _seed = 0;
repair_master _repair_master;
uint32_t _repair_meta_id;
streaming::stream_reason _reason;
// Repair master's sharding configuration
shard_config _master_node_shard_config;
// sharding info of repair master
dht::static_sharder _remote_sharder;
bool _same_sharding_config = false;
struct local_range_estimation {
size_t master_subranges_count;
size_t partitions_count;
};
std::optional<local_range_estimation> _local_range_estimation;
uint64_t _estimated_partitions = 0;
// For repair master nr peers is the number of repair followers, for repair
// follower nr peers is always one because repair master is the only peer.
size_t _nr_peer_nodes= 1;
repair_stats _stats;
std::optional<repair_reader> _repair_reader;
std::optional<int64_t> _repaired_at;
locator::tablet_repair_incremental_mode _incremental_mode;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
std::list<repair_row> _row_buf;
// Contains rows we are working on to sync between peers
std::list<repair_row> _working_row_buf;
// Combines all the repair_hash in _working_row_buf
repair_hash _working_row_buf_combined_hash;
// Tracks the last sync boundary
std::optional<repair_sync_boundary> _last_sync_boundary;
// Tracks current sync boundary
std::optional<repair_sync_boundary> _current_sync_boundary;
// Contains the hashes of rows in the _working_row_buffor for all peer nodes
std::vector<repair_hash_set> _peer_row_hash_sets;
// Gate used to make sure pending operation of meta data is done
seastar::named_gate _gate;
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
sink_source_for_get_row_diff _sink_source_for_get_row_diff;
sink_source_for_put_row_diff _sink_source_for_put_row_diff;
tracker_link_type _tracker_link;
row_level_repair* _row_level_repair_ptr;
std::vector<repair_node_state> _all_node_states;
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_future<>> _stopped;
repair_hasher _repair_hasher;
gc_clock::time_point _compaction_time;
bool _is_tablet;
reader_concurrency_semaphore::inactive_read_handle _fake_inactive_read_handle;
std::unique_ptr<const locator::token_metadata> _small_table_optimization_tm;
seastar::semaphore _small_table_optimization_tm_sem{1};
bool _small_table_optimization_tm_calculated = false;
service::frozen_topology_guard _frozen_topology_guard;
service::topology_guard _topology_guard;
const bool _is_eligible_to_repair_rejection;
private:
incremental_repair_meta _incremental_repair_meta;
public:
bool is_incremental_repair() {
return _repaired_at.has_value();
}
// The tablet_repair_incremental_mode::full mode turns on incremental
// repair and selects all sstables for repair.
bool is_incremental_repair_using_all_sstables() {
return is_incremental_repair() && _incremental_mode == locator::tablet_repair_incremental_mode::full;
}
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
}
void set_repair_state(repair_state state, locator::host_id node) {
for (auto& ns : all_nodes()) {
if (ns.node == node) {
ns.state = state;
}
}
}
void set_repair_state_for_local_node(repair_state state) {
// The first node is the local node
all_nodes().front().state = state;
}
repair_stats& stats() {
return _stats;
}
locator::host_id myhostid() const {
return _rs.my_host_id();
}
uint32_t repair_meta_id() const {
return _repair_meta_id;
}
const std::optional<repair_sync_boundary>& current_sync_boundary() const {
return _current_sync_boundary;
}
const std::optional<repair_sync_boundary>& last_sync_boundary() const {
return _last_sync_boundary;
};
const repair_hash& working_row_buf_combined_hash() const {
return _working_row_buf_combined_hash;
}
bool use_rpc_stream() const {
return is_rpc_stream_supported(_algo);
}
public:
// master constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
host_id_vector_replica_set all_live_peer_nodes,
size_t nr_peer_nodes,
std::vector<std::optional<shard_id>> all_live_peer_shards,
row_level_repair* row_level_repair_ptr,
gc_clock::time_point compaction_time,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode)
: _rs(rs)
, _db(rs.get_db())
, _messaging(rs.get_messaging())
, _schema(s)
, _permit(std::move(permit))
, _range(range)
, _cmp(repair_sync_boundary::tri_compare(*_schema))
, _algo(algo)
, _max_row_buf_size(max_row_buf_size)
, _seed(seed)
, _repair_master(master)
, _repair_meta_id(repair_meta_id)
, _reason(reason)
, _master_node_shard_config(std::move(master_node_shard_config))
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config(cf))
, _nr_peer_nodes(nr_peer_nodes)
, _repaired_at(repaired_at)
, _incremental_mode(incremental_mode)
, _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), rs.get_view_building_worker(), topo_guard))
, _gate(format("repair_meta[{}]", repair_meta_id))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_full_row_hashes: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("put_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _row_level_repair_ptr(row_level_repair_ptr)
, _repair_hasher(_seed, _schema)
, _compaction_time(compaction_time)
, _is_tablet(cf.uses_tablets())
, _frozen_topology_guard(topo_guard)
, _topology_guard(_frozen_topology_guard)
, _is_eligible_to_repair_rejection(cf.is_eligible_to_write_rejection_on_critical_disk_utilization())
{
if (is_incremental_repair()) {
rlogger.info("Enable incremental repair for table={}.{} range={}",
_schema->ks_name(), _schema->cf_name(), _range);
}
if (master) {
add_to_repair_meta_for_masters(*this);
} else {
add_to_repair_meta_for_followers(*this);
}
SCYLLA_ASSERT(all_live_peer_shards.size() == all_live_peer_nodes.size());
_all_node_states.push_back(repair_node_state(myhostid(), this_shard_id()));
for (unsigned i = 0; i < all_live_peer_nodes.size(); i++) {
_all_node_states.push_back(repair_node_state(all_live_peer_nodes[i], all_live_peer_shards[i].value_or(repair_unspecified_shard)));
}
// Mark the permit as evictable immediately. If there are multiple
// repairs launched, they can deadlock in the initial phase, before
// they start reading from disk (which is the point where they
// become evictable normally).
// Prevent this by marking the permit as evictable ASAP.
// FIXME: provide a better API for this, this is very clunky
_fake_inactive_read_handle = _db.local().get_reader_concurrency_semaphore().register_inactive_read(make_empty_mutation_reader(_schema, _permit));
}
// follower constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
host_id_vector_replica_set all_live_peer_nodes,
gc_clock::time_point compaction_time,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode)
: repair_meta(rs, cf, std::move(s), std::move(permit), std::move(range), algo, max_row_buf_size, seed, master, repair_meta_id, reason,
std::move(master_node_shard_config), std::move(all_live_peer_nodes), 1, {std::nullopt}, nullptr, compaction_time, topo_guard, repaired_at, incremental_mode)
{
}
public:
std::optional<shard_id> get_peer_node_dst_cpu_id(uint32_t peer_node_idx) {
SCYLLA_ASSERT(peer_node_idx + 1 < all_nodes().size());
return all_nodes()[peer_node_idx + 1].shard;
}
public:
future<> clear_gently() noexcept {
co_await utils::clear_gently(_peer_row_hash_sets);
co_await utils::clear_gently(_working_row_buf);
co_await utils::clear_gently(_row_buf);
}
future<> stop() {
// Handle deferred stop
if (_stopped) {
return _stopped->get_future();
}
promise<> stopped;
_stopped.emplace(stopped.get_future());
auto gate_future = _gate.close();
auto f1 = _sink_source_for_get_full_row_hashes.close();
auto f2 = _sink_source_for_get_row_diff.close();
auto f3 = _sink_source_for_put_row_diff.close();
rlogger.debug("repair_meta::stop");
// move to background. waited on via _stopped->get_future.
when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
return _repair_writer->wait_for_writer_done().finally([this] {
return close().then([this] {
return clear_gently();
});
});
}).forward_to(std::move(stopped));
return _stopped->get_future();
}
void reset_peer_row_hash_sets() {
if (_peer_row_hash_sets.size() != _nr_peer_nodes) {
_peer_row_hash_sets.resize(_nr_peer_nodes);
} else {
for (auto& x : _peer_row_hash_sets) {
x.clear();
}
}
}
repair_hash_set& peer_row_hash_sets(unsigned node_idx) {
return _peer_row_hash_sets[node_idx];
}
// Get a list of row hashes in _working_row_buf
future<repair_hash_set>
working_row_hashes() {
auto hashes = repair_hash_set();
for (auto& r : _working_row_buf) {
hashes.emplace(r.hash());
co_await coroutine::maybe_yield();
}
co_return std::move(hashes);
}
std::pair<std::optional<repair_sync_boundary>, bool>
get_common_sync_boundary(bool zero_rows,
std::vector<repair_sync_boundary>& sync_boundaries,
std::vector<repair_hash>& combined_hashes) {
if (sync_boundaries.empty()) {
throw std::runtime_error("sync_boundaries is empty");
}
if(combined_hashes.empty()) {
throw std::runtime_error("combined_hashes is empty");
}
// Get the smallest sync boundary in the list as the common sync boundary
std::sort(sync_boundaries.begin(), sync_boundaries.end(),
[this] (const auto& a, const auto& b) { return this->_cmp(a, b) < 0; });
repair_sync_boundary sync_boundary_min = sync_boundaries.front();
// Check if peers have identical combined hashes and sync boundary
bool same_hashes = std::adjacent_find(combined_hashes.begin(), combined_hashes.end(),
std::not_equal_to<repair_hash>()) == combined_hashes.end();
bool same_boundary = std::adjacent_find(sync_boundaries.begin(), sync_boundaries.end(),
[this] (const repair_sync_boundary& a, const repair_sync_boundary& b) { return this->_cmp(a, b) != 0; }) == sync_boundaries.end();
rlogger.debug("get_common_sync_boundary: zero_rows={}, same_hashes={}, same_boundary={}, combined_hashes={}, sync_boundaries={}",
zero_rows, same_hashes, same_boundary, combined_hashes, sync_boundaries);
bool already_synced = same_hashes && same_boundary && !zero_rows;
return std::pair<std::optional<repair_sync_boundary>, bool>(sync_boundary_min, already_synced);
}
future<> close() noexcept {
return _repair_reader ? _repair_reader->close() : make_ready_future<>();
}
private:
future<uint64_t> do_estimate_partitions_on_all_shards(const dht::token_range& range) {
return estimate_partitions(_db, _schema->ks_name(), _schema->cf_name(), range);
}
future<uint64_t> do_estimate_partitions_on_local_shard() {
auto& cf = _db.local().find_column_family(_schema->id());
return cf.estimated_partitions_in_range(_range);
}
future<uint64_t> get_estimated_partitions() {
auto gate_held = _gate.hold();
if (_repair_master || _same_sharding_config || _is_tablet) {
co_return co_await do_estimate_partitions_on_local_shard();
} else {
auto sharder = dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard);
auto partitions_sum = uint64_t(0);
auto subranges = uint64_t(0);
for (auto shard_range = sharder.next(); shard_range; shard_range = sharder.next()) {
++subranges;
auto partitions = co_await do_estimate_partitions_on_all_shards(*shard_range);
partitions_sum += partitions;
}
_local_range_estimation = local_range_estimation {
.master_subranges_count = subranges,
.partitions_count = partitions_sum
};
co_return partitions_sum;
}
}
future<> set_estimated_partitions(uint64_t estimated_partitions) {
auto gate_held = _gate.hold();
_estimated_partitions = estimated_partitions;
_repair_writer->set_estimated_partitions(_estimated_partitions);
co_return;
}
dht::static_sharder make_remote_sharder() {
return dht::static_sharder(_master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb);
}
bool is_same_sharding_config(replica::column_family& cf) {
rlogger.debug("is_same_sharding_config: remote_shard={}, remote_shard_count={}, remote_ignore_msb={}",
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb);
auto& sharder = cf.get_effective_replication_map()->get_sharder(*_schema);
return sharder.shard_count() == _master_node_shard_config.shard_count
&& sharder.sharding_ignore_msb() == _master_node_shard_config.ignore_msb
&& this_shard_id() == _master_node_shard_config.shard;
}
future<size_t> get_repair_rows_size(const std::list<repair_row>& rows) const {
size_t sz = 0;
for (const repair_row& r : rows) {
sz += r.size();
co_await coroutine::maybe_yield();
}
co_return sz;
}
// Get the size of rows in _row_buf
future<size_t> row_buf_size() const {
return get_repair_rows_size(_row_buf);
}
// return the combined checksum of rows in _row_buf
future<repair_hash> row_buf_csum() {
repair_hash combined;
for (repair_row& r : _row_buf) {
combined.add(r.hash());
co_await coroutine::maybe_yield();
}
co_return combined;
}
void handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (mf.is_partition_start()) {
auto& start = mf.as_partition_start();
_repair_reader->set_current_dk(start.key());
if (!start.partition_tombstone()) {
// Ignore partition_start with empty partition tombstone
return;
}
} else if (mf.is_end_of_partition()) {
_repair_reader->clear_current_dk();
return;
}
auto hash = _repair_hasher.do_hash_for_mf(*_repair_reader->get_current_dk(), mf);
repair_row r(freeze(*_schema, mf), position_in_partition(mf.position()), _repair_reader->get_current_dk(), hash, is_dirty_on_master::no);
rlogger.trace("Reading: r.boundary={}, r.hash={}", r.boundary(), r.hash());
auto sz = r.size();
_metrics.row_from_disk_nr++;
_metrics.row_from_disk_bytes += sz;
cur_size += sz;
new_rows_size += sz;
cur_rows.push_back(std::move(r));
}
future<> prepare_sstables_for_incremental_repair() {
auto& table = _db.local().find_column_family(_schema->id());
table_id tid = table.schema()->id();
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
auto last_token = _range.end() ? _range.end()->value() : dht::maximum_token();
auto id = tmap.get_tablet_id(last_token);
auto range = tmap.get_token_range(id);
if (range != _range) {
on_internal_error(rlogger, format("Repair range={} does not match tablet range={}", _range, range));
}
bool full = is_incremental_repair_using_all_sstables();
auto& tinfo = tmap.get_tablet_info(id);
auto sstables_repaired_at = tinfo.sstables_repaired_at;
auto gid = locator::global_tablet_id{tid, id};
// Consider this:
// 1) n1 is the topology coordinator
// 2) n1 schedules and executes a tablet repair with session id s1 for a tablet on n3 an n4.
// 3) n3 and n4 take and store the lock in _rs._repair_compaction_locks[s1]
// 4) n1 steps down before it executes locator::tablet_transition_stage::end_repair
// 5) n2 becomes the new topology coordinator
// 6) n2 runs locator::tablet_transition_stage::repair again
// 7) n3 and n4 try to take the lock again and hang since the lock is already taken
// To avoid the deadlock, we can throw in step 7 so that n2 will
// proceed to the end_repair stage and release the lock. After that,
// the scheduler could schedule the tablet repair again.
if (_rs._repair_compaction_locks.contains(gid)) {
auto msg = fmt::format("Tablet repair session={} table={} is in progress", _frozen_topology_guard, tid);
rlogger.info("{}", msg);
throw std::runtime_error(msg);
}
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
_rs._repair_compaction_locks[gid].push_back(std::move(lock_holder));
}
auto sstables = co_await table.take_storage_snapshot(_range);
_incremental_repair_meta.sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_schema, _range));
_incremental_repair_meta.sstables_repaired_at = sstables_repaired_at;
for (auto& snap : sstables) {
co_await coroutine::maybe_yield();
auto& sst = snap.sst;
auto bytes_on_disk = sst->bytes_on_disk();
if (!full && repair::is_repaired(sstables_repaired_at, sst)) {
rlogger.info("Skipped adding sst={} repaired_at={} sstables_repaired_at={} being_repaired={} session_id={} for incremental repair",
sst->toc_filename(), sst->get_stats_metadata().repaired_at, sstables_repaired_at, sst->being_repaired, _frozen_topology_guard);
_metrics.inc_sst_skipped_bytes += bytes_on_disk;
} else {
sst->mark_as_being_repaired(_frozen_topology_guard);
rlogger.info("Added sst={} repaired_at={} sstables_repaired_at={} being_repaired={} session_id={} for incremental repair",
sst->toc_filename(), sst->get_stats_metadata().repaired_at, sstables_repaired_at, sst->being_repaired, _frozen_topology_guard);
_incremental_repair_meta.sst_set->insert(sst);
_metrics.inc_sst_read_bytes += bytes_on_disk;
}
}
// Note: It is safe to re-enable compaction again because all
// sstables particiating the repair have been marked as
// being_repaired which will be ignored by the new unrepaired
// compaction.
reenablers_and_holders.cres.clear();
rlogger.info("Re-enabled compaction for range={} for incremental repair", _range);
}
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
// This reads rows from where the reader left last time into _row_buf
// _current_sync_boundary or _last_sync_boundary have no effect on the reader neither.
future<std::tuple<std::list<repair_row>, size_t>>
read_rows_from_disk(size_t cur_size) {
using value_type = std::tuple<std::list<repair_row>, size_t>;
size_t new_rows_size = 0;
std::list<repair_row> cur_rows;
std::exception_ptr ex;
if (!_repair_reader) {
// We are about to create a real evictable reader, so drop the fake
// reader (evicted or not), we don't need it anymore.
_db.local().get_reader_concurrency_semaphore().unregister_inactive_read(std::move(_fake_inactive_read_handle));
if (is_incremental_repair()) {
co_await prepare_sstables_for_incremental_repair();
}
_repair_reader.emplace(_db,
_db.local().find_column_family(_schema->id()),
_schema,
_permit,
_range,
_remote_sharder,
_master_node_shard_config.shard,
_seed,
std::invoke([this]() {
if (is_incremental_repair()) {
return repair_reader::read_strategy::incremental_repair;
}
if (_repair_master || _same_sharding_config || _is_tablet) {
rlogger.debug("repair_reader: meta_id={}, _repair_master={}, _same_sharding_config={},"
"read_strategy {} is chosen",
_repair_meta_id, _repair_master, _same_sharding_config,
repair_reader::read_strategy::local);
return repair_reader::read_strategy::local;
}
// multishard_filter means load all the data in the range and apply
// filter by master shard on top, discarding partitions from other shards.
// multishard_split means split the range into multiple subranges,
// each containing only the required data from the master shard.
// For situations with a sparse data set spread across numerous ranges,
// the overhead from continuously switching between these ranges,
// specifically during the fast_forward_to function on the multishard_reader,
// can become the main factor in performance.
// Similarly, with multishard_filter, reading all the partitions within
// the range can lead to the next_partition cost dominating the overall cost.
// The heuristic here chooses the strategy with minimal such cost.
// Note that with multishard_filter we don't read entire partitions which
// can be a huge waste. We only fill the buffer inside
// mutation_reader, if a partition is found to belong to the incorrect
// master shard, we call next_partition(), which effectively clears
// the buffer until the next partition is reached.
if (!_local_range_estimation) {
// this should not normally happen since the master
// calls get_estimated_partitions before get_sync_boundary
rlogger.warn("repair_reader: meta_id={}, no _local_range_estimation, "
"read_strategy {} is chosen",
_repair_meta_id, repair_reader::read_strategy::multishard_split);
return repair_reader::read_strategy::multishard_split;
}
const auto read_strategy =
_local_range_estimation->partitions_count <= _local_range_estimation->master_subranges_count
? repair_reader::read_strategy::multishard_filter
: repair_reader::read_strategy::multishard_split;
rlogger.debug("repair_reader: meta_id={}, _local_range_estimation: partitions_count={}, "
"master_subranges_count={}, read_strategy {} is chosen",
_repair_meta_id,
_local_range_estimation->partitions_count,
_local_range_estimation->master_subranges_count,
read_strategy);
return read_strategy;
}),
_compaction_time,
_incremental_repair_meta);
}
try {
while (cur_size < _max_row_buf_size) {
_gate.check();
mutation_fragment_opt mfopt = co_await _repair_reader->read_mutation_fragment();
if (!mfopt) {
co_await _repair_reader->on_end_of_stream();
break;
}
handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows);
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await _repair_reader->on_end_of_stream();
co_await coroutine::return_exception_ptr(std::move(ex));
}
_repair_reader->pause();
co_return value_type(std::move(cur_rows), new_rows_size);
}
future<> clear_row_buf() {
return utils::clear_gently(_row_buf);
}
future<> clear_working_row_buf() {
co_await utils::clear_gently(_working_row_buf);
_working_row_buf_combined_hash.clear();
}
// Read rows from disk until _max_row_buf_size of rows are filled into _row_buf.
// Calculate the combined checksum of the rows
// Calculate the total size of the rows in _row_buf
future<get_sync_boundary_response>
get_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
if (skipped_sync_boundary) {
_current_sync_boundary = skipped_sync_boundary;
co_await clear_row_buf();
}
// Here is the place we update _last_sync_boundary
rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary);
_last_sync_boundary = _current_sync_boundary;
co_await clear_working_row_buf();
size_t cur_size = co_await row_buf_size();
auto rows = co_await read_rows_from_disk(cur_size);
auto&& [new_rows, new_rows_size] = rows;
size_t new_rows_nr = new_rows.size();
_row_buf.splice(_row_buf.end(), new_rows);
repair_hash row_buf_combined_hash = co_await row_buf_csum();
size_t row_buf_bytes = co_await row_buf_size();
std::optional<repair_sync_boundary> sb_max;
if (!_row_buf.empty()) {
sb_max = _row_buf.back().boundary();
}
rlogger.debug("get_sync_boundary: Got nr={} rows, sb_max={}, row_buf_size={}, repair_hash={}, skipped_sync_boundary={}",
new_rows_nr, sb_max, row_buf_bytes, row_buf_combined_hash, skipped_sync_boundary);
co_return get_sync_boundary_response{sb_max, row_buf_combined_hash, row_buf_bytes, new_rows_size, new_rows_nr};
}
future<> move_row_buf_to_working_row_buf() {
if (_cmp(_row_buf.back().boundary(), *_current_sync_boundary) <= 0) {
// Fast path
_working_row_buf.swap(_row_buf);
co_return;
}
size_t sz = _row_buf.size();
for (auto it = _row_buf.rbegin(); it != _row_buf.rend(); ++it) {
// Move the rows > _current_sync_boundary to _working_row_buf
// Delete the rows > _current_sync_boundary from _row_buf
// Swap _working_row_buf and _row_buf so that _working_row_buf
// contains rows within (_last_sync_boundary,
// _current_sync_boundary], _row_buf contains rows within
// (_current_sync_boundary, ...]
repair_row& r = *it;
if (_cmp(r.boundary(), *_current_sync_boundary) <= 0) {
break;
}
_working_row_buf.push_front(std::move(r));
co_await coroutine::maybe_yield();
}
_row_buf.resize(_row_buf.size() - _working_row_buf.size());
_row_buf.swap(_working_row_buf);
if (sz != _working_row_buf.size() + _row_buf.size()) {
throw std::runtime_error(format("incorrect row_buf and working_row_buf size, before={}, after={} + {}",
sz, _working_row_buf.size(), _row_buf.size()));
}
}
// Move rows from <_row_buf> to <_working_row_buf> according to
// _last_sync_boundary and common_sync_boundary. That is rows within the
// (_last_sync_boundary, _current_sync_boundary] in <_row_buf> are moved
// into the <_working_row_buf>
future<get_combined_row_hash_response>
request_row_hashes(const std::optional<repair_sync_boundary>& common_sync_boundary) {
if (!common_sync_boundary) {
throw std::runtime_error("common_sync_boundary is empty");
}
_current_sync_boundary = common_sync_boundary;
rlogger.trace("SET _current_sync_boundary to {}, common_sync_boundary={}", _current_sync_boundary, common_sync_boundary);
_working_row_buf.clear();
_working_row_buf_combined_hash.clear();
if (_row_buf.empty()) {
co_return get_combined_row_hash_response();
}
co_await move_row_buf_to_working_row_buf();
for (repair_row& r : _working_row_buf) {
_working_row_buf_combined_hash.add(r.hash());
co_await coroutine::maybe_yield();
}
co_return get_combined_row_hash_response{_working_row_buf_combined_hash};
}
future<std::list<repair_row>>
copy_rows_from_working_row_buf() {
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
rows.push_back(r);
co_await coroutine::maybe_yield();
}
co_return rows;
}
future<std::list<repair_row>>
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
if (set_diff.contains(r.hash())) {
rows.push_back(r);
}
co_await coroutine::maybe_yield();
}
co_return rows;
}
// Return rows in the _working_row_buf with hash within the given sef_diff
// Give a set of row hashes, return the corresponding rows
// If needs_all_rows is set, return all the rows in _working_row_buf, ignore the set_diff
future<std::list<repair_row>>
get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
if (needs_all_rows) {
if (!_repair_master || _nr_peer_nodes == 1) {
return make_ready_future<std::list<repair_row>>(std::move(_working_row_buf));
}
return copy_rows_from_working_row_buf();
} else {
return copy_rows_from_working_row_buf_within_set_diff(std::move(set_diff));
}
}
future<> do_apply_rows(std::list<repair_row> row_diff, update_working_row_buf update_buf) {
auto sem_units = co_await get_units(_repair_writer->sem(), 1);
_repair_writer->create_writer();
while (!row_diff.empty()) {
repair_row& r = row_diff.front();
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
r.reset_mutation_fragment();
auto dk_with_hash = r.get_dk_with_hash();
co_await _repair_writer->do_write(std::move(dk_with_hash), std::move(mf));
row_diff.pop_front();
co_await coroutine::maybe_yield();
}
}
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
// Must run inside a seastar thread
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, locator::host_id from, update_working_row_buf update_buf,
update_peer_row_hash_sets update_hash_set, unsigned node_idx) {
if (rows.empty()) {
return;
}
auto row_diff = to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).get();
auto sz = get_repair_rows_size(row_diff).get();
stats().rx_row_bytes += sz;
stats().rx_row_nr += row_diff.size();
stats().rx_row_nr_peer[from] += row_diff.size();
if (update_buf) {
// Both row_diff and _working_row_buf and are ordered, merging
// two sored list to make sure the combination of row_diff
// and _working_row_buf are ordered.
utils::merge_to_gently(_working_row_buf, row_diff,
[this] (const repair_row& x, const repair_row& y) { return _cmp(x.boundary(), y.boundary()) < 0; });
for (auto& r : row_diff) {
thread::maybe_yield();
_working_row_buf_combined_hash.add(r.hash());
}
}
if (update_hash_set) {
_peer_row_hash_sets[node_idx] = row_diff
| std::views::transform([] (repair_row& r) { thread::maybe_yield(); return r.hash(); })
| std::ranges::to<repair_hash_set>();
}
// Repair rows in row_diff will be flushed to disk by flush_rows_in_working_row_buf,
// so we skip calling do_apply_rows here.
_dirty_on_master = is_dirty_on_master::yes;
// Clear gently to avoid stalls
utils::clear_gently(row_diff).get();
}
public:
future<const locator::token_metadata*> get_tm_for_small_table_optimization_check(const locator::token_metadata* tm) {
auto sem_units = co_await get_units(_small_table_optimization_tm_sem, 1);
if (!tm) {
throw std::runtime_error("The token_metadata pointer is nullptr");
}
if (_small_table_optimization_tm_calculated) {
co_return _small_table_optimization_tm.get();
}
std::unique_ptr<locator::token_metadata> tmptr;
if (_reason == streaming::stream_reason::bootstrap) {
auto myid = tm->get_my_id();
std::unordered_set<dht::token> bootstrap_tokens;
for (auto [token, host_id] : tm->get_bootstrap_tokens()) {
if (myid == host_id) {
bootstrap_tokens.insert(token);
}
}
if (!bootstrap_tokens.empty()) {
// The node is a bootstrapping node
tmptr = std::make_unique<locator::token_metadata>(co_await tm->clone_only_token_map());
co_await tmptr->update_normal_tokens(bootstrap_tokens, myid);
rlogger.debug("small_table_optimization: Got bootstrap tokens={}", bootstrap_tokens);
}
} else if (_reason == streaming::stream_reason::decommission) {
auto myid = tm->get_my_id();
auto& leaving = tm->get_leaving_endpoints();
if (!leaving.empty() && leaving.contains(myid)) {
// This node is leaving
tmptr = std::make_unique<locator::token_metadata>(co_await tm->clone_after_all_left());
rlogger.debug("small_table_optimization: Got leaving node={}", leaving);
}
}
_small_table_optimization_tm = std::move(tmptr);
_small_table_optimization_tm_calculated = true;
co_return _small_table_optimization_tm.get();
}
public:
// Must run inside a seastar thread
void flush_rows_in_working_row_buf(std::optional<small_table_optimization_params> small_table_optimization) {
if (_dirty_on_master) {
_dirty_on_master = is_dirty_on_master::no;
} else {
return;
}
if (small_table_optimization) {
flush_rows(_schema, _working_row_buf, _repair_writer, small_table_optimization, this);
} else {
flush_rows(_schema, _working_row_buf, _repair_writer);
}
}
private:
future<>
apply_rows_on_follower(repair_rows_on_wire rows) {
if (rows.empty()) {
co_return;
}
std::list<repair_row> row_diff = co_await to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher);
co_await do_apply_rows(std::move(row_diff), update_working_row_buf::no);
}
struct fragments_limiter {
sstring ks;
sstring cf;
size_t max_fragments_size;
size_t max_fragments_nr;
size_t fragments_size = 0;
size_t fragments_nr = 0;
// Max fragments size and number are allowed in a single repair_row_on_wire
// which is sent as a rpc stream message
static constexpr size_t _max_fragments_size = 240 * 1024;
static constexpr size_t _max_fragments_nr = 32 * 1024;
fragments_limiter(sstring keyspace, sstring table)
: ks(keyspace)
, cf(table) {
max_fragments_size = _max_fragments_size;
max_fragments_nr = _max_fragments_nr;
auto inject_max_fragments_size = utils::get_local_injector().inject_parameter<uint32_t>("row_level_repair_max_fragments_size");
if (inject_max_fragments_size) {
max_fragments_size = *inject_max_fragments_size;
rlogger.info("Set inject_max_fragments_size {} for table={}.{}", max_fragments_size, ks, cf);
}
auto inject_max_fragments_nr = utils::get_local_injector().inject_parameter<uint32_t>("row_level_repair_max_fragments_nr");
if (inject_max_fragments_nr) {
max_fragments_nr = *inject_max_fragments_nr;
rlogger.info("Set inject_max_fragments_nr {} for table={}.{}", max_fragments_nr, ks, cf);
}
}
void reset(size_t size) {
fragments_size = size;
fragments_nr = 1;
};
void add(size_t size) {
fragments_size += size;
++fragments_nr;
}
bool no_split(size_t size) {
return (fragments_size + size) < max_fragments_size && (fragments_nr + 1) < max_fragments_nr;
}
};
future<repair_rows_on_wire> to_repair_rows_on_wire(std::list<repair_row> row_list) {
lw_shared_ptr<const decorated_key_with_hash> last_dk_with_hash;
repair_rows_on_wire rows;
size_t row_bytes = co_await get_repair_rows_size(row_list);
_metrics.tx_row_nr += row_list.size();
_metrics.tx_row_bytes += row_bytes;
fragments_limiter limiter(_schema->ks_name(), _schema->cf_name());
auto msg_split_feature = bool(_db.local().features().repair_msg_split);
while (!row_list.empty()) {
repair_row r = std::move(row_list.front());
row_list.pop_front();
const auto& dk_with_hash = r.get_dk_with_hash();
auto mf = std::move(r.get_frozen_mutation());
const auto size = mf.representation().size();
bool same_pk = !rows.empty() && last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0;
if (same_pk && (!msg_split_feature || limiter.no_split(size))) {
limiter.add(size);
rows.back().push_mutation_fragment(std::move(mf));
} else {
last_dk_with_hash = dk_with_hash;
limiter.reset(size);
auto r = same_pk && msg_split_feature ? repair_row_on_wire({std::move(mf)}) : repair_row_on_wire(dk_with_hash->dk.key(), {std::move(mf)});
rows.push_back(std::move(r));
}
co_await coroutine::maybe_yield();
}
co_return rows;
};
public:
// RPC API
// Return the hashes of the rows in _working_row_buf
future<repair_hash_set>
get_full_row_hashes(locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_full_row_hashes_handler();
}
repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, remote_node,
_repair_meta_id, dst_cpu_id);
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
_metrics.rx_hashes_nr += hashes.size();
stats().rx_hashes_nr += hashes.size();
stats().rpc_call_nr++;
co_return hashes;
}
private:
future<> get_full_row_hashes_source_op(
lw_shared_ptr<repair_hash_set> current_hashes,
locator::host_id remote_node,
unsigned node_idx,
rpc::source<repair_hash_with_cmd>& source) {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_hashes->insert(hash_cmd.hash);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
co_return;
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
} else {
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
}
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {
std::exception_ptr ep;
try {
co_await sink(repair_stream_cmd::get_full_row_hashes);
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
future<repair_hash_set>
get_full_row_hashes_with_rpc_stream(locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_full_row_hashes_handler();
}
auto current_hashes = make_lw_shared<repair_hash_set>();
auto [sink, source] = co_await _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id);
co_await coroutine::all(
[&] { return get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); },
[&] { return get_full_row_hashes_sink_op(sink); }
);
stats().rx_hashes_nr += current_hashes->size();
_metrics.rx_hashes_nr += current_hashes->size();
co_return std::move(*current_hashes);
}
// RPC handler
future<repair_hash_set>
get_full_row_hashes_handler() {
auto gate_held = _gate.hold();
co_return co_await working_row_hashes();
}
// RPC API
// Return the combined hashes of the current working row buf
future<get_combined_row_hash_response>
get_combined_row_hash(std::optional<repair_sync_boundary> common_sync_boundary, locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_combined_row_hash_handler(common_sync_boundary);
}
get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, remote_node,
_repair_meta_id, common_sync_boundary, dst_cpu_id);
stats().rpc_call_nr++;
stats().rx_hashes_nr++;
_metrics.rx_hashes_nr++;
co_return resp;
}
// RPC handler
future<get_combined_row_hash_response>
get_combined_row_hash_handler(std::optional<repair_sync_boundary> common_sync_boundary) {
// We can not call this function twice. The good thing is we do not use
// retransmission at messaging_service level, so no message will be retransmitted.
rlogger.trace("Calling get_combined_row_hash_handler");
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_return co_await request_row_hashes(common_sync_boundary);
}
// RPC API
future<>
repair_row_level_start(locator::host_id remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return;
}
stats().rpc_call_nr++;
// Even though remote partitioner name is ignored in the current version of
// repair, we still have to send something to keep compatibility with nodes
// that run older versions. This will make it possible to run mixed cluster.
// Murmur3 is appropriate because that's the only supported partitioner at
// the time this change is introduced.
sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner";
rpc::optional<repair_row_level_start_response> resp =
co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, remote_node,
_repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed,
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb,
remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id, _frozen_topology_guard, _repaired_at, _incremental_mode);
if (resp && resp->status == repair_row_level_start_status::no_such_column_family) {
throw replica::no_such_column_family(ks_name, cf_name);
} else {
co_return;
}
}
// RPC handler
static future<repair_row_level_start_response>
repair_row_level_start_handler(repair_service& repair, locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason,
gc_clock::time_point compaction_time, abort_source& as, service::frozen_topology_guard topo_guard, std::optional<int64_t> repaired_at, locator::tablet_repair_incremental_mode incremental_mode) {
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
repair.my_host_id(), from_id, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
try {
// Trigger read barrier to ensure that session_id is visible.
auto& mm = repair.get_migration_manager();
if (mm.use_raft()) {
co_await mm.get_group0_barrier().trigger(mm.get_abort_source());
}
co_await repair.insert_repair_meta(from_id, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as, topo_guard, repaired_at, incremental_mode);
co_return repair_row_level_start_response{repair_row_level_start_status::ok};
} catch (replica::no_such_column_family&) {
co_return repair_row_level_start_response{repair_row_level_start_status::no_such_column_family};
}
}
// RPC API
future<> repair_row_level_stop(locator::host_id remote_node, sstring ks_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id, bool mark_as_repaired) {
if (remote_node == myhostid()) {
co_await stop();
if (mark_as_repaired) {
co_await mark_sstable_as_repaired();
}
co_return;
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, remote_node,
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id, mark_as_repaired);
}
// RPC handler
static future<>
repair_row_level_stop_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, bool mark_as_repaired) {
rlogger.debug("<<< Finished Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}",
rs.my_host_id(), from, repair_meta_id, ks_name, cf_name, range);
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::row_level_stop_started);
co_await rs.remove_repair_meta(from, repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), mark_as_repaired);
rm->set_repair_state_for_local_node(repair_state::row_level_stop_finished);
}
// RPC API
future<uint64_t> repair_get_estimated_partitions(locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_estimated_partitions();
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, remote_node, _repair_meta_id, dst_cpu_id);
}
// RPC handler
static future<uint64_t> repair_get_estimated_partitions_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id) {
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_estimated_partitions_started);
uint64_t partitions = co_await rm->get_estimated_partitions();
rm->set_repair_state_for_local_node(repair_state::get_estimated_partitions_finished);
co_return partitions;
}
// RPC API
future<> repair_set_estimated_partitions(locator::host_id remote_node, uint64_t estimated_partitions, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await set_estimated_partitions(estimated_partitions);
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, remote_node, _repair_meta_id, estimated_partitions, dst_cpu_id);
}
// RPC handler
static future<> repair_set_estimated_partitions_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id, uint64_t estimated_partitions) {
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::set_estimated_partitions_started);
co_await rm->set_estimated_partitions(estimated_partitions);
rm->set_repair_state_for_local_node(repair_state::set_estimated_partitions_finished);
}
// RPC API
// Return the largest sync point contained in the _row_buf , current _row_buf checksum, and the _row_buf size
future<get_sync_boundary_response>
get_sync_boundary(locator::host_id remote_node, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_sync_boundary_handler(skipped_sync_boundary);
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, remote_node, _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
}
// RPC handler
future<get_sync_boundary_response>
get_sync_boundary_handler(std::optional<repair_sync_boundary> skipped_sync_boundary) {
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_return co_await get_sync_boundary(std::move(skipped_sync_boundary));
}
// RPC API
// Return rows in the _working_row_buf with hash within the given sef_diff
// Must run inside a seastar thread
void get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (needs_all_rows || !set_diff.empty()) {
if (remote_node == myhostid()) {
return;
}
if (needs_all_rows) {
set_diff.clear();
} else {
stats().tx_hashes_nr += set_diff.size();
_metrics.tx_hashes_nr += set_diff.size();
}
stats().rpc_call_nr++;
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, remote_node,
_repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get();
if (!rows.empty()) {
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx);
}
}
}
// Must run inside a seastar thread
void get_row_diff_and_update_peer_row_hash_sets(locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
return;
}
stats().rpc_call_nr++;
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, remote_node,
_repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get();
if (!rows.empty()) {
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx);
}
}
private:
// Must run inside a seastar thread
void get_row_diff_source_op(
update_peer_row_hash_sets update_hash_set,
locator::host_id remote_node,
unsigned node_idx,
rpc::sink<repair_hash_with_cmd>& sink,
rpc::source<repair_row_on_wire_with_cmd>& source) {
repair_rows_on_wire current_rows;
for (;;) {
std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = source().get();
if (row_opt) {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff: Inject sender error in source loop");
}
auto row = std::move(std::get<0>(row_opt.value()));
if (row.cmd == repair_stream_cmd::row_data) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with data");
current_rows.push_back(std::move(row.row));
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt");
apply_rows_on_master_in_thread(std::move(current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx);
break;
} else if (row.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_row_diff: Peer failed to process");
} else {
throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd");
}
} else {
_sink_source_for_get_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("get_row_diff: Got unexpected end of stream");
}
}
}
future<> get_row_diff_sink_op(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
rpc::sink<repair_hash_with_cmd>& sink,
locator::host_id remote_node) {
std::exception_ptr ep;
try {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff: Inject sender error in sink loop");
}
if (needs_all_rows) {
rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows");
co_await sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()});
co_await sink.flush();
co_return;
}
for (const repair_hash& hash : set_diff) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
// Must run inside a seastar thread
void get_row_diff_with_rpc_stream(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
update_peer_row_hash_sets update_hash_set,
locator::host_id remote_node,
unsigned node_idx,
shard_id dst_cpu_id) {
if (needs_all_rows || !set_diff.empty()) {
if (remote_node == myhostid()) {
return;
}
if (needs_all_rows) {
set_diff.clear();
} else {
stats().tx_hashes_nr += set_diff.size();
_metrics.tx_hashes_nr += set_diff.size();
}
stats().rpc_call_nr++;
auto f = _sink_source_for_get_row_diff.get_sink_source(remote_node, node_idx, dst_cpu_id).get();
rpc::sink<repair_hash_with_cmd>& sink = std::get<0>(f);
rpc::source<repair_row_on_wire_with_cmd>& source = std::get<1>(f);
auto sink_op = get_row_diff_sink_op(std::move(set_diff), needs_all_rows, sink, remote_node);
get_row_diff_source_op(update_hash_set, remote_node, node_idx, sink, source);
sink_op.get();
}
}
// RPC handler
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
auto gate_held = _gate.hold();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
co_return co_await to_repair_rows_on_wire(std::move(row_diff));
}
// RPC API
// Send rows in the _working_row_buf with hash within the given sef_diff
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, locator::host_id remote_node, shard_id dst_cpu_id) {
if (!set_diff.empty()) {
if (remote_node == myhostid()) {
co_return;
}
size_t sz = set_diff.size();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, remote_node, _repair_meta_id, std::move(rows), dst_cpu_id);
}
}
private:
future<> put_row_diff_source_op(
locator::host_id remote_node,
unsigned node_idx,
rpc::source<repair_stream_cmd>& source) {
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
if (status == repair_stream_cmd::put_rows_done) {
co_return;
} else if (status == repair_stream_cmd::error) {
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
} else {
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
}
future<> put_row_diff_sink_op(
repair_rows_on_wire rows,
rpc::sink<repair_row_on_wire_with_cmd>& sink,
locator::host_id remote_node) {
std::exception_ptr ep;
try {
for (repair_row_on_wire& row : rows) {
rlogger.trace("put_row_diff: send row");
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}
rlogger.trace("put_row_diff: send empty row");
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
rlogger.trace("put_row_diff: send done");
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
future<> put_row_diff_with_rpc_stream(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
locator::host_id remote_node, unsigned node_idx,
std::optional<small_table_optimization_params> small_table_optimization,
shard_id dst_cpu_id) {
if (set_diff.empty()) {
co_return;
}
if (remote_node == myhostid()) {
co_return;
}
size_t sz = set_diff.size();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
if (small_table_optimization) {
auto& strat = small_table_optimization->erm->get_replication_strategy();
const auto* tm = &small_table_optimization->erm->get_token_metadata();
const auto* tmptr = co_await get_tm_for_small_table_optimization_check(tm);
if (tmptr) {
tm = tmptr;
}
std::list<repair_row> tmp;
for (auto& row : row_diff) {
repair_row r = std::move(row);
const auto& dk = r.get_dk_with_hash()->dk;
auto eps = co_await strat.calculate_natural_endpoints(dk.token(), *tm);
if (eps.contains(remote_node)) {
tmp.push_back(std::move(r));
} else {
rlogger.trace("master: put : ignore row, token={}", dk.token());
}
}
row_diff = std::move(tmp);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
auto [sink, source] = co_await _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx, dst_cpu_id);
auto source_op = [&] () mutable -> future<> {
co_await put_row_diff_source_op(remote_node, node_idx, source);
};
auto sink_op = [&] () mutable -> future<> {
co_await put_row_diff_sink_op(std::move(rows), sink, remote_node);
};
co_await coroutine::all(source_op, sink_op);
}
// RPC handler
future<> put_row_diff_handler(repair_rows_on_wire rows) {
if (_rs.is_disabled() && _is_eligible_to_repair_rejection) {
co_await coroutine::return_exception(std::runtime_error("Repair service is disabled"));
}
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_await apply_rows_on_follower(std::move(rows));
}
public:
future<> mark_sstable_as_repaired() {
auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired();
if (_incremental_repair_meta.sst_set || !sstables.empty()) {
co_await seastar::async([&] {
auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) {
auto filename = sst->toc_filename();
auto name = sst->component_basename(component_type::Data);
int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1;
sst->update_repaired_at(repaired_at);
rlogger.info("Marking filename={} name={} repaired_at={} being_repaired={} type={} for incremental repair",
filename, name, repaired_at, sst->being_repaired, type);
};
_incremental_repair_meta.sst_set->for_each_sstable([&] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "existing");
});
for (auto& sst : sstables) {
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "repair_produced");
}
});
}
}
};
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization, repair_meta* rm) {
auto cmp = position_in_partition::tri_compare(*s);
lw_shared_ptr<mutation_fragment> last_mf;
lw_shared_ptr<const decorated_key_with_hash> last_dk;
const auto* tm = small_table_optimization ? &small_table_optimization->erm->get_token_metadata() : nullptr;
if (small_table_optimization && rm) {
const auto* tmptr = rm->get_tm_for_small_table_optimization_check(tm).get();
if (tmptr) {
tm = tmptr;
}
}
for (auto& r : rows) {
thread::maybe_yield();
if (!r.dirty_on_master()) {
continue;
}
const auto& dk = r.get_dk_with_hash()->dk;
if (small_table_optimization) {
// Check if the token is owned by the node
auto eps = small_table_optimization->erm->get_replication_strategy().calculate_natural_endpoints(dk.token(), *tm).get();
if (!eps.contains(small_table_optimization->erm->get_topology().my_host_id())) {
rlogger.trace("master: ignore row, token={}", dk.token());
continue;
}
}
writer->create_writer();
auto mf = r.get_mutation_fragment_ptr();
if (last_mf && last_dk &&
cmp(last_mf->position(), mf->position()) == 0 &&
dk.tri_compare(*s, last_dk->dk) == 0 &&
last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
last_mf = mf;
last_dk = r.get_dk_with_hash();
}
r.reset_mutation_fragment();
}
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
}
// Must run inside a seastar thread
static repair_hash_set
get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
repair_hash_set set_diff;
// Note std::set_difference needs x and y are sorted.
std::copy_if(x.begin(), x.end(), std::inserter(set_diff, set_diff.end()),
[&y] (auto& item) { thread::maybe_yield(); return !y.contains(item); });
return set_diff;
}
static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt);
static future<> repair_get_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_set_diff.insert(hash_cmd.hash);
return make_ready_future<>();
} else {
return repair_get_row_diff_with_rpc_stream_process_op_slow_path(repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, std::move(sink), std::move(source), error, current_set_diff, std::move(hash_cmd_opt));
}
}
static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop");
}
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
_metrics.rx_hashes_nr += current_set_diff.size();
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
repair_rows_on_wire rows_on_wire = co_await repair.invoke_on(dst_cpu_id, [&] (repair_service& local_repair) -> future<repair_rows_on_wire> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_started);
if (fp.get_owner_shard() == this_shard_id()) {
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
} else {
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
}
});
if (rows_on_wire.empty()) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
} else {
for (repair_row_on_wire& row : rows_on_wire) {
auto cmd = repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)};
co_await sink(cmd);
co_await cmd.row.clear_gently();
}
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
}
co_await sink.flush();
co_return;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<> repair_put_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_stream_cmd> sink,
rpc::source<repair_row_on_wire_with_cmd> source,
bool& error,
repair_rows_on_wire& current_rows,
std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) {
auto row = std::move(std::get<0>(row_opt.value()));
if (row.cmd == repair_stream_cmd::row_data) {
rlogger.trace("Got repair_rows_on_wire from peer={}, got row_data", from);
current_rows.push_back(std::move(row.row));
co_return;
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
rlogger.trace("Got repair_rows_on_wire from peer={}, got end_of_current_rows", from);
auto fp = make_foreign(std::make_unique<repair_rows_on_wire>(std::move(current_rows)));
co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id, fp = std::move(fp)] (repair_service& local_repair) mutable -> future<> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_started);
if (fp.get_owner_shard() == this_shard_id()) {
co_await rm->put_row_diff_handler(std::move(*fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
} else {
co_await rm->put_row_diff_handler(*fp);
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
}
});
co_await sink(repair_stream_cmd::put_rows_done);
co_await sink.flush();
co_return;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source,
bool &error,
std::optional<std::tuple<repair_stream_cmd>> status_opt) {
repair_stream_cmd status = std::get<0>(status_opt.value());
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
if (status == repair_stream_cmd::get_full_row_hashes) {
repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future<repair_hash_set> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
repair_hash_set hashes = co_await rm->get_full_row_hashes_handler();
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
_metrics.tx_hashes_nr += hashes.size();
co_return hashes;
});
for (const repair_hash& hash : hashes) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
co_return stop_iteration::no;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<> repair_get_row_diff_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source) {
bool error = false;
repair_hash_set current_set_diff = repair_hash_set();
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
co_await repair_get_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_set_diff,
std::move(hash_cmd_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_put_row_diff_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_stream_cmd> sink,
rpc::source<repair_row_on_wire_with_cmd> source) {
std::exception_ptr outer_exception;
bool error = false;
repair_rows_on_wire current_rows = repair_rows_on_wire();
try {
while (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
co_await repair_put_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_rows,
std::move(row_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_stream_cmd::error);
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source) {
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
bool error = false;
std::exception_ptr ep;
try {
if (error) {
continue;
}
auto stop = co_await repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
std::move(status_opt));
if (stop) {
break;
}
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {
rlogger.debug("repair[{}]: Got repair_update_system_table_request from node={}, range={}, repair_time={}", req.repair_uuid, from, req.range, req.repair_time);
auto& db = this->get_db();
bool is_valid_range = true;
if (req.range.start()) {
if (req.range.start()->is_inclusive()) {
is_valid_range = false;
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
is_valid_range = false;
}
}
if (!is_valid_range) {
throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range));
}
co_await db.invoke_on_all([&req] (replica::database& local_db) {
auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state();
return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time);
});
db::system_keyspace::repair_history_entry ent;
ent.id = req.repair_uuid;
ent.table_uuid = req.table_uuid;
ent.ts = db_clock::from_time_t(gc_clock::to_time_t(req.repair_time));
ent.ks = req.keyspace_name;
ent.cf = req.table_name;
auto range_start = req.range.start() ? req.range.start()->value() : dht::minimum_token();
ent.range_start = dht::token::to_int64(range_start);
auto range_end = req.range.end() ? req.range.end()->value() : dht::maximum_token();
ent.range_end = dht::token::to_int64(range_end);
co_await _sys_ks.local().update_repair_history(std::move(ent));
co_return repair_update_system_table_response();
}
future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&] (auto& rs) {
return rs.repair_flush_hints_batchlog_handler(from, std::move(req));
});
}
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} hints_timeout={}s batchlog_timeout={}s",
req.repair_uuid, from, req.hints_timeout.count(), req.batchlog_timeout.count());
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
bool updated = false;
auto now = gc_clock::now();
auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms());
auto cache_disabled = cache_time == std::chrono::milliseconds(0);
auto flush_time = now;
db::all_batches_replayed all_replayed = db::all_batches_replayed::yes;
if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) {
// Empty targets meants all nodes
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::vector<locator::host_id>{});
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
try {
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
if (!_bm.local_is_initialized() || bm_throw) {
throw std::runtime_error("Backlog manager isn't initialized");
}
co_await coroutine::all(
[this, &from, &req, &sync_point, &deadline] () -> future<> {
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_return;
},
[this, now, cache_disabled, &flush_time, &cache_time, &from, &req, &all_replayed] () -> future<> {
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
auto last_replay = _bm.local().get_last_replay();
bool issue_flush = false;
if (cache_disabled) {
issue_flush = true;
flush_time = now;
} else {
if (now < last_replay) {
flush_time = now;
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_now", fmt::to_string(flush_time));
} else if (now - last_replay < cache_time) {
// Skip the replay request since last_replay is already
// updated since last _flush_hints_batchlog_time
// update. It is fine to use last_replay for the hint
// flush time because last_replay (batchlog replay
// time) is smaller than now (hint flush time).
flush_time = last_replay;
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_last_replay", fmt::to_string(flush_time));
} else {
// Issue the replay so the last_replay will be updated
// to bigger than now after the call.
issue_flush = true;
flush_time = now;
}
}
if (issue_flush) {
all_replayed = co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
}
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush);
}
);
if (!all_replayed) {
throw std::runtime_error("Not all batchlog entries were replayed");
}
} catch (...) {
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}: {}",
req.repair_uuid, from, std::current_exception());
throw;
}
co_await container().invoke_on_all([flush_time] (repair_service& rs) {
rs._flush_hints_batchlog_time = flush_time;
});
updated = true;
} else {
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time));
}
auto duration = std::chrono::duration<float>(gc_clock::now() - now);
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
req.repair_uuid, from, updated, _flush_hints_batchlog_time, cache_time, duration);
repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time };
co_return resp;
}
future<> repair_service::init_ms_handlers() {
auto& ms = this->_messaging;
ms.register_repair_get_row_diff_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_hash_with_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_get_row_diff_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
});
ms.register_repair_put_row_diff_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_put_row_diff_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
});
ms.register_repair_get_full_row_hashes_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_stream_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_get_full_row_hashes_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
});
ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
return container().invoke_on(shard, [from, repair_meta_id] (repair_service& local_repair) {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) {
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_finished);
_metrics.tx_hashes_nr += hashes.size();
return hashes;
});
}) ;
});
ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id,
common_sync_boundary = std::move(common_sync_boundary)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
_metrics.tx_hashes_nr++;
rm->set_repair_state_for_local_node(repair_state::get_combined_row_hash_started);
return rm->get_combined_row_hash_handler(std::move(common_sync_boundary)).then([rm] (get_combined_row_hash_response resp) {
rm->set_repair_state_for_local_node(repair_state::get_combined_row_hash_finished);
return resp;
});
});
});
ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
return container().invoke_on(shard, [from, repair_meta_id,
skipped_sync_boundary = std::move(skipped_sync_boundary)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_sync_boundary_started);
return rm->get_sync_boundary_handler(std::move(skipped_sync_boundary)).then([rm] (get_sync_boundary_response resp) {
rm->set_repair_state_for_local_node(repair_state::get_sync_boundary_finished);
return resp;
});
});
});
ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
_metrics.rx_hashes_nr += set_diff.size();
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(set_diff)));
return container().invoke_on(shard, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_row_diff_started);
if (fp.get_owner_shard() == this_shard_id()) {
return rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_finished);
return rows;
});
} else {
return rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_finished);
return rows;
});
}
});
});
ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto fp = make_foreign(std::make_unique<repair_rows_on_wire>(std::move(row_diff)));
return container().invoke_on(shard, [from, repair_meta_id, fp = std::move(fp)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::put_row_diff_started);
if (fp.get_owner_shard() == this_shard_id()) {
return rm->put_row_diff_handler(std::move(*fp)).then([rm] {
rm->set_repair_state_for_local_node(repair_state::put_row_diff_finished);
});
} else {
return rm->put_row_diff_handler(*fp).then([rm] {
rm->set_repair_state_for_local_node(repair_state::put_row_diff_finished);
});
}
});
});
ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version,
rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id_opt,
rpc::optional<service::frozen_topology_guard> topo_guard, rpc::optional<std::optional<int64_t>> repaired_at,
rpc::optional<locator::tablet_repair_incremental_mode> incremental_mode) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from_id, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this,
topo_guard = topo_guard.value_or(service::default_session_id), repaired_at = repaired_at.value_or(std::nullopt), incremental_mode = incremental_mode.value_or(locator::tablet_repair_incremental_mode::disabled)] (repair_service& local_repair) mutable {
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
return repair_meta::repair_row_level_start_handler(local_repair, from_id, src_cpu_id, repair_meta_id, std::move(ks_name),
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
shard_config{remote_shard, remote_shard_count, remote_ignore_msb},
schema_version, r, ct, _repair_module->abort_source(), topo_guard, repaired_at, incremental_mode);
});
});
ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id_opt, rpc::optional<bool> mark_as_repaired_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
bool mark_as_repaired = mark_as_repaired_opt.value_or(false);
return container().invoke_on(shard, [from, repair_meta_id, ks_name, cf_name, range, mark_as_repaired] (repair_service& local_repair) mutable {
return repair_meta::repair_row_level_stop_handler(local_repair, from, repair_meta_id,
std::move(ks_name), std::move(cf_name), std::move(range), mark_as_repaired);
});
});
ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id] (repair_service& local_repair) mutable {
return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id);
});
});
ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id, estimated_partitions] (repair_service& local_repair) mutable {
return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions);
});
});
ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) {
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
});
ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_update_system_table_handler(from, std::move(req));
});
ser::repair_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_flush_hints_batchlog_handler(from, std::move(req));
});
ser::repair_rpc_verbs::register_repair_update_compaction_ctrl(&ms, [this] (const rpc::client_info& cinfo, locator::global_tablet_id gid, service::frozen_topology_guard topo_guard) -> future<> {
co_await container().invoke_on_all([gid, topo_guard] (repair_service& local_repair) mutable -> future<> {
auto& table = local_repair.get_db().local().find_column_family(gid.table);
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(gid.table);
auto* trinfo = tmap.get_tablet_transition_info(gid.tablet);
if (!trinfo) {
auto msg = fmt::format("Skipped repair_update_compaction_ctrl gid={} session_id={} since tablet is not in transition", gid, topo_guard);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
if (trinfo->stage != locator::tablet_transition_stage::end_repair) {
auto msg = fmt::format("Skipped repair_update_compaction_ctrl gid={} session_id={} since tablet is not in tablet_transition_stage::end_repair", gid, topo_guard);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
auto range = tmap.get_token_range(gid.tablet);
co_await table.clear_being_repaired_for_range(range);
auto removed = local_repair._repair_compaction_locks.erase(gid);
rlogger.info("Got repair_update_compaction_ctrl gid={} session_id={} removed={}", gid, topo_guard, removed);
});
});
ser::repair_rpc_verbs::register_repair_update_repaired_at_for_merge(&ms, [this] (const rpc::client_info& cinfo, table_id table) -> future<> {
rlogger.debug("Got repair_update_repaired_at_for_merge table={}", table);
co_await container().invoke_on_all([table] (repair_service& local_repair) mutable -> future<> {
auto& t = local_repair.get_db().local().find_column_family(table);
co_await t.update_repaired_at_for_merge();
});
});
return make_ready_future<>();
}
future<> repair_service::uninit_ms_handlers() {
auto& ms = this->_messaging;
return when_all_succeed(
ms.unregister_repair_get_row_diff_with_rpc_stream(),
ms.unregister_repair_put_row_diff_with_rpc_stream(),
ms.unregister_repair_get_full_row_hashes_with_rpc_stream(),
ser::repair_rpc_verbs::unregister(&ms)
).discard_result();
}
class repair_meta_tracker {
boost::intrusive::list<repair_meta,
boost::intrusive::member_hook<repair_meta, repair_meta::tracker_link_type, &repair_meta::_tracker_link>,
boost::intrusive::constant_time_size<false>> _repair_metas;
public:
void add(repair_meta& rm) {
_repair_metas.push_back(rm);
}
};
namespace debug {
static thread_local repair_meta_tracker repair_meta_for_masters;
static thread_local repair_meta_tracker repair_meta_for_followers;
}
static void add_to_repair_meta_for_masters(repair_meta& rm) {
debug::repair_meta_for_masters.add(rm);
}
static void add_to_repair_meta_for_followers(repair_meta& rm) {
debug::repair_meta_for_followers.add(rm);
}
class row_level_repair {
repair::shard_repair_task_impl& _shard_task;
sstring _cf_name;
table_id _table_id;
dht::token_range _range;
host_id_vector_replica_set _all_live_peer_nodes;
std::vector<std::optional<shard_id>> _all_live_peer_shards;
bool _small_table_optimization;
// Repair master and followers will propose a sync boundary. Each of them
// read N bytes of rows from disk, the row with largest
// `position_in_partition` value is the proposed sync boundary of that
// node. The repair master uses `get_sync_boundary` rpc call to
// get all the proposed sync boundary and stores in in
// `_sync_boundaries`. The `get_sync_boundary` rpc call also
// returns the combined hashes and the total size for the rows which are
// in the `_row_buf`. `_row_buf` buffers the rows read from sstable. It
// contains rows at most of `_max_row_buf_size` bytes.
// If all the peers return the same `_sync_boundaries` and
// `_combined_hashes`, we think the rows are synced.
// If not, we proceed to the next step.
std::vector<repair_sync_boundary> _sync_boundaries;
std::vector<repair_hash> _combined_hashes;
// `common_sync_boundary` is the boundary all the peers agrees on
std::optional<repair_sync_boundary> _common_sync_boundary = {};
// `_skipped_sync_boundary` is used in case we find the range is synced
// only with the `get_sync_boundary` rpc call. We use it to make
// sure the remote peers update the `_current_sync_boundary` and
// `_last_sync_boundary` correctly.
std::optional<repair_sync_boundary> _skipped_sync_boundary = {};
// If the total size of the `_row_buf` on either of the nodes is zero,
// we set this flag, which is an indication that rows are not synced.
bool _zero_rows = false;
// Sum of estimated_partitions on all peers
uint64_t _estimated_partitions = 0;
// A flag indicates any error during the repair
bool _failed = false;
// Seed for the repair row hashing. If we ever had a hash conflict for a row
// and we are not using stable hash, there is chance we will fix the row in
// the next repair.
uint64_t _seed;
gc_clock::time_point _start_time;
bool _is_tablet;
service::frozen_topology_guard _topo_guard;
public:
row_level_repair(repair::shard_repair_task_impl& shard_task,
sstring cf_name,
table_id table_id,
dht::token_range range,
std::vector<locator::host_id> all_live_peer_nodes,
bool small_table_optimization,
gc_clock::time_point start_time,
service::frozen_topology_guard topo_guard)
: _shard_task(shard_task)
, _cf_name(std::move(cf_name))
, _table_id(std::move(table_id))
, _range(std::move(range))
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
, _small_table_optimization(small_table_optimization)
, _seed(get_random_seed())
, _start_time(start_time)
, _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets())
, _topo_guard(topo_guard)
{
repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range);
auto& map = r_neighbors.shard_map;
for (auto& n : _all_live_peer_nodes) {
auto it = map.find(n);
if (it != map.end()) {
_all_live_peer_shards.push_back(it->second);
} else {
_all_live_peer_shards.push_back(std::nullopt);
}
}
if (_all_live_peer_shards.size() != _all_live_peer_nodes.size()) {
on_internal_error(rlogger, seastar::format("The size of shards and nodes do not match table={} range={} shards={} nodes={}",
_cf_name, _range, _all_live_peer_shards, _all_live_peer_nodes));
}
}
private:
enum class op_status {
next_round,
next_step,
all_done,
};
host_id_vector_replica_set sort_peer_nodes(const std::vector<locator::host_id>& nodes) {
host_id_vector_replica_set sorted_nodes(nodes.begin(), nodes.end());
auto& topology = get_erm()->get_topology();
topology.sort_by_proximity(topology.my_host_id(), sorted_nodes);
return sorted_nodes;
}
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
size_t size = is_rpc_stream_supported(algo) ? repair::task_manager_module::max_repair_memory_per_range : 256 * 1024;
if (_small_table_optimization) {
// For small table optimization, we reduce the buffer size to reduce memory consumption.
size /= _all_live_peer_nodes.size();
}
return size;
}
// Step A: Negotiate sync boundary to use
op_status negotiate_sync_boundary(repair_meta& master) {
_shard_task.check_in_abort_or_shutdown();
_sync_boundaries.clear();
_combined_hashes.clear();
_zero_rows = false;
rlogger.debug("ROUND {}, _last_sync_boundary={}, _current_sync_boundary={}, _skipped_sync_boundary={}",
master.stats().round_nr, master.last_sync_boundary(), master.current_sync_boundary(), _skipped_sync_boundary);
master.stats().round_nr++;
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
auto dst_cpu_id = ns.shard;
// By calling `get_sync_boundary`, the `_last_sync_boundary`
// is moved to the `_current_sync_boundary` or
// `_skipped_sync_boundary` if it is not std::nullopt.
ns.state = repair_state::get_sync_boundary_started;
try {
get_sync_boundary_response res = co_await master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id);
ns.state = repair_state::get_sync_boundary_finished;
master.stats().row_from_disk_bytes[node] += res.new_rows_size;
master.stats().row_from_disk_nr[node] += res.new_rows_nr;
if (res.boundary && res.row_buf_size > 0) {
_sync_boundaries.push_back(*res.boundary);
_combined_hashes.push_back(res.row_buf_combined_csum);
} else {
// row_size equals 0 means there is no data from on
// that node, so we ignore the sync boundary of
// this node when calculating common sync boundary
_zero_rows = true;
}
rlogger.debug("Called master.get_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}",
node, res.boundary, res.row_buf_combined_csum, res.row_buf_size, _zero_rows, _skipped_sync_boundary);
} catch (...) {
auto ep = std::current_exception();
rlogger.warn("repair[{}]: get_sync_boundary: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
})).get();
rlogger.debug("sync_boundaries nr={}, combined_hashes nr={}",
_sync_boundaries.size(), _combined_hashes.size());
if (!_sync_boundaries.empty()) {
// We have data to sync between (_last_sync_boundary, _current_sync_boundary]
auto res = master.get_common_sync_boundary(_zero_rows, _sync_boundaries, _combined_hashes);
_common_sync_boundary = res.first;
bool already_synced = res.second;
rlogger.debug("Calling master._get_common_sync_boundary: common_sync_boundary={}, already_synced={}",
_common_sync_boundary, already_synced);
// If rows between (_last_sync_boundary, _current_sync_boundary] are synced, goto first step
// This is the first fast path.
if (already_synced) {
_skipped_sync_boundary = _common_sync_boundary;
rlogger.debug("Skip set skipped_sync_boundary={}", _skipped_sync_boundary);
master.stats().round_nr_fast_path_already_synced++;
return op_status::next_round;
} else {
_skipped_sync_boundary = std::nullopt;
}
} else {
master.stats().round_nr_fast_path_already_synced++;
// We are done with this range because all the nodes have no more data.
return op_status::all_done;
}
return op_status::next_step;
}
// Step B: Get missing rows from peer nodes so that local node contains all the rows
op_status get_missing_rows_from_follower_nodes(repair_meta& master) {
_shard_task.check_in_abort_or_shutdown();
// `combined_hashes` contains the combined hashes for the
// `_working_row_buf`. Like `_row_buf`, `_working_row_buf` contains
// rows which are within the (_last_sync_boundary, _current_sync_boundary]
// By calling `get_combined_row_hash(_common_sync_boundary)`,
// all the nodes move the `_current_sync_boundary` to `_common_sync_boundary`,
// Rows within the (_last_sync_boundary, _current_sync_boundary] are
// moved from the `_row_buf` to `_working_row_buf`.
std::vector<repair_hash> combined_hashes;
combined_hashes.resize(master.all_nodes().size());
parallel_for_each(std::views::iota(size_t(0), master.all_nodes().size()), coroutine::lambda([&] (size_t idx) -> future<> {
// Request combined hashes from all nodes between (_last_sync_boundary, _current_sync_boundary]
// Each node will
// - Set `_current_sync_boundary` to `_common_sync_boundary`
// - Move rows from `_row_buf` to `_working_row_buf`
// But the full hashes (each and every hashes for the rows in
// the `_working_row_buf`) are not returned until repair master
// explicitly requests with get_full_row_hashes() below as
// an optimization. Because if the combined_hashes from all
// peers are identical, we think rows in the `_working_row_buff`
// are identical, there is no need to transfer each and every
// row hashes to the repair master.
auto& ns = master.all_nodes()[idx];
ns.state = repair_state::get_combined_row_hash_started;
try {
get_combined_row_hash_response resp = co_await master.get_combined_row_hash(_common_sync_boundary, ns.node, ns.shard);
master.all_nodes()[idx].state = repair_state::get_combined_row_hash_finished;
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}", master.all_nodes()[idx].node, resp);
combined_hashes[idx]= std::move(resp);
} catch (...) {
auto ep = std::current_exception();
auto& node = master.all_nodes()[idx].node;
rlogger.warn("repair[{}]: get_combined_row_hash: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
})).get();
// If all the peers has the same combined_hashes. This means they contain
// the identical rows. So there is no need to sync for this sync boundary.
bool same_combined_hashes = std::adjacent_find(combined_hashes.begin(), combined_hashes.end(),
std::not_equal_to<repair_hash>()) == combined_hashes.end();
if (same_combined_hashes) {
// `_working_row_buf` on all the nodes are the same
// This is the second fast path.
master.stats().round_nr_fast_path_same_combined_hashes++;
return op_status::next_round;
}
master.reset_peer_row_hash_sets();
// Note: We can not work on _all_live_peer_nodes in parallel,
// because syncing with _all_live_peer_nodes in serial avoids
// getting the same rows from more than one peers.
for (unsigned node_idx = 0; node_idx < _all_live_peer_nodes.size(); node_idx++) {
auto& ns = master.all_nodes()[node_idx + 1];
auto& node = _all_live_peer_nodes[node_idx];
auto dst_cpu_id = ns.shard;
try {
// Here is an optimization to avoid transferring full rows hashes,
// if remote and local node, has the same combined_hashes.
// For example:
// node1: 1 2 3
// node2: 1 2 3 4
// node3: 1 2 3 4
// After node1 get the row 4 from node2, node1 update it is
// combined_hashes, so we can avoid fetching the full row hashes from node3.
if (combined_hashes[node_idx + 1] == master.working_row_buf_combined_hash()) {
// local node and peer node have the same combined hash. This
// means we can set peer_row_hash_sets[n] to local row hashes
// without fetching it from peers to save network traffic.
master.peer_row_hash_sets(node_idx) = master.working_row_hashes().get();
rlogger.debug("Calling optimize master.working_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
continue;
}
// Fast path: if local has zero row and remote has rows, request them all.
if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) {
master.peer_row_hash_sets(node_idx).clear();
if (master.use_rpc_stream()) {
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc stream");
ns.state = repair_state::get_row_diff_with_rpc_stream_started;
master.get_row_diff_with_rpc_stream({}, repair_meta::needs_all_rows_t::yes, repair_meta::update_peer_row_hash_sets::yes, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_with_rpc_stream_finished;
} else {
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc verb");
ns.state = repair_state::get_row_diff_and_update_peer_row_hash_sets_started;
master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_and_update_peer_row_hash_sets_finished;
}
continue;
}
rlogger.debug("Before master.get_full_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
// Ask the peer to send the full list hashes in the working row buf.
if (master.use_rpc_stream()) {
ns.state = repair_state::get_full_row_hashes_with_rpc_stream_started;
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes_with_rpc_stream(node, node_idx, dst_cpu_id).get();
ns.state = repair_state::get_full_row_hashes_with_rpc_stream_finished;
} else {
ns.state = repair_state::get_full_row_hashes_started;
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node, dst_cpu_id).get();
ns.state = repair_state::get_full_row_hashes_finished;
}
rlogger.debug("After master.get_full_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
// With hashes of rows from peer node, we can figure out
// what rows repair master is missing. Note we get missing
// data from repair follower 1, apply the rows, then get
// missing data from repair follower 2 and so on. We do it
// sequentially because the rows from repair follower 1 to
// repair master might reduce the amount of missing data
// between repair master and repair follower 2.
repair_hash_set set_diff = get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get());
// Request missing sets from peer node
rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}",
node, master.working_row_hashes().get().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size());
// If we need to pull all rows from the peer. We can avoid
// sending the row hashes on wire by setting needs_all_rows flag.
auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size());
if (master.use_rpc_stream()) {
ns.state = repair_state::get_row_diff_with_rpc_stream_started;
master.get_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, repair_meta::update_peer_row_hash_sets::no, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_with_rpc_stream_finished;
} else {
ns.state = repair_state::get_row_diff_started;
master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_finished;
}
rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myhostid(), master.working_row_hashes().get().size());
} catch (...) {
rlogger.warn("repair[{}]: get_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, std::current_exception());
throw;
}
}
master.flush_rows_in_working_row_buf(_small_table_optimization ? std::make_optional(small_table_optimization_params{ .erm = get_erm() }) : std::nullopt);
return op_status::next_step;
}
// Step C: Send missing rows to the peer nodes
void send_missing_rows_to_follower_nodes(repair_meta& master) {
// At this time, repair master contains all the rows between (_last_sync_boundary, _current_sync_boundary]
// So we can figure out which rows peer node are missing and send the missing rows to them
_shard_task.check_in_abort_or_shutdown();
repair_hash_set local_row_hash_sets = master.working_row_hashes().get();
auto sz = _all_live_peer_nodes.size();
std::vector<repair_hash_set> set_diffs(sz);
for (size_t idx : std::views::iota(size_t(0), sz)) {
set_diffs[idx] = get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
}
parallel_for_each(std::views::iota(size_t(0), sz), coroutine::lambda([&] (size_t idx) -> future<> {
auto& ns = master.all_nodes()[idx + 1];
auto dst_cpu_id = ns.shard;
auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty());
auto& set_diff = set_diffs[idx];
rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows);
auto& node = master.all_nodes()[idx].node;
if (master.use_rpc_stream()) {
ns.state = repair_state::put_row_diff_with_rpc_stream_started;
try {
co_await master.put_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], idx, _small_table_optimization ? std::make_optional(small_table_optimization_params{ .erm = get_erm() }) : std::nullopt, dst_cpu_id);
ns.state = repair_state::put_row_diff_with_rpc_stream_finished;
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("repair[{}]: put_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
} else {
ns.state = repair_state::put_row_diff_started;
try {
co_await master.put_row_diff(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], dst_cpu_id);
ns.state = repair_state::put_row_diff_finished;
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("repair[{}]: put_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
}
})).get();
master.stats().round_nr_slow_path++;
}
private:
locator::effective_replication_map_ptr get_erm() {
return _shard_task.get_erm();
}
private:
// Update system.repair_history table
future<> update_system_repair_table() {
// Update repair_history table only if it is a reguar repair.
if (_shard_task.reason() != streaming::stream_reason::repair) {
co_return;
}
auto my_address = get_erm()->get_topology().my_host_id();
// Update repair_history table only if all replicas have been repaired
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_shard_task.get_total_rf() != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.get_total_rf(), repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
if (!_shard_task.hints_batchlog_flushed()) {
co_return;
}
// The tablet repair time for tombstone gc will be updated when the
// system.tablet.repair_time is updated.
if (_is_tablet && _shard_task.sched_info.sched_by_scheduler) {
rlogger.debug("repair[{}]: Skipped to update system.repair_history for tablet repair scheduled by scheduler total_rf={} repaired_replicas={} local={} peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.get_total_rf(), repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
repair_service& rs = _shard_task.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_shard_task.global_repair_id.uuid(), _table_id, _shard_task.get_keyspace(), _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(my_address);
co_await coroutine::parallel_for_each(all_nodes, [this, req] (locator::host_id node) -> future<> {
try {
auto& ms = _shard_task.messaging.local();
repair_update_system_table_response resp = co_await ser::repair_rpc_verbs::send_repair_update_system_table(&ms, node, req);
(void)resp; // nothing to do with the response yet
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.global_repair_id.uuid(), node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _shard_task.global_repair_id.uuid(), node, std::current_exception());
}
});
co_return;
}
public:
future<> run() {
return seastar::async([this] {
_shard_task.check_in_abort_or_shutdown();
auto repair_meta_id = _shard_task.rs.get_next_repair_meta_id().get();
auto algorithm = get_common_diff_detect_algorithm(_shard_task.messaging.local(), _all_live_peer_nodes);
auto max_row_buf_size = get_max_row_buf_size(algorithm);
auto& cf = _shard_task.db.local().find_column_family(_table_id);
auto& sharder = cf.get_effective_replication_map()->get_sharder(*(cf.schema()));
auto master_node_shard_config = shard_config {
this_shard_id(),
sharder.shard_count(),
sharder.sharding_ignore_msb()
};
auto s = cf.schema();
auto schema_version = s->version();
bool table_dropped = false;
auto& mem_sem = _shard_task.rs.memory_sem();
auto max = _shard_task.rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * max_row_buf_size;
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max);
auto mem_permit = seastar::get_units(mem_sem, wanted).get();
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
_shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max);
auto permit = _shard_task.db.local().obtain_reader_permit(_shard_task.db.local().find_column_family(_table_id), "repair-meta", db::no_timeout, {}).get();
auto compaction_time = gc_clock::now();
std::optional<int64_t> repaired_at;
bool enable_incremental_repair = _shard_task.db.local().features().tablet_incremental_repair && _is_tablet &&
_shard_task.sched_info.incremental_mode != locator::tablet_repair_incremental_mode::disabled &&
_shard_task.sched_info.sched_by_scheduler &&
!_shard_task.sched_info.for_tablet_rebuild;
if (enable_incremental_repair) {
auto& table = _shard_task.db.local().find_column_family(_table_id);
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_table_id);
auto last_token = _range.end() ? _range.end()->value() : dht::maximum_token();
auto& tinfo = tmap.get_tablet_info(last_token);
auto sstables_repaired_at = tinfo.sstables_repaired_at;
repaired_at = sstables_repaired_at + 1;
}
repair_meta master(_shard_task.rs,
_shard_task.db.local().find_column_family(_table_id),
s,
std::move(permit),
_range,
algorithm,
max_row_buf_size,
_seed,
repair_master::yes,
repair_meta_id,
_shard_task.reason(),
std::move(master_node_shard_config),
_all_live_peer_nodes,
_all_live_peer_nodes.size(),
_all_live_peer_shards,
this,
compaction_time,
_topo_guard,
repaired_at,
_shard_task.sched_info.incremental_mode);
auto auto_stop_master = defer([&master] {
try {
master.stop().get();
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("Failed auto-stopping Row Level Repair (Master): {}. Ignored.", ep);
}
});
rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}",
master.myhostid(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_keyspace(), _cf_name, schema_version, _range, _seed, max_row_buf_size);
std::exception_ptr ex = nullptr;
std::vector<repair_node_state> nodes_to_stop;
nodes_to_stop.reserve(master.all_nodes().size());
try {
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
ns.state = repair_state::row_level_start_started;
co_await master.repair_row_level_start(node, _shard_task.get_keyspace(), _cf_name, _range, schema_version, _shard_task.reason(), compaction_time, ns.shard);
ns.state = repair_state::row_level_start_finished;
nodes_to_stop.push_back(ns);
ns.state = repair_state::get_estimated_partitions_started;
uint64_t partitions = co_await master.repair_get_estimated_partitions(node, ns.shard);
ns.state = repair_state::get_estimated_partitions_finished;
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);
_estimated_partitions += partitions;
})).get();
if (!master.all_nodes().empty()) {
// Use the average number of partitions, instead of the sum
// of the partitions, as the estimated partitions in a
// given range. The bigger the estimated partitions, the
// more memory bloom filter for the sstable would consume.
_estimated_partitions /= master.all_nodes().size();
// In addition, estimate the difference between nodes is
// less than the specified ratio for regular repair.
// Underestimation will not be a big problem since those
// sstables produced by repair will go through off-strategy
// later anyway. The worst case is that we have a worse
// false positive ratio than expected temporarily when the
// sstable is still in maintenance set.
//
// To save memory and have less different conditions, we
// use the estimation for RBNO repair as well.
_estimated_partitions *= _shard_task.db.local().get_config().repair_partition_count_estimation_ratio();
}
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
rlogger.trace("Get repair_set_estimated_partitions for node={}, estimated_partitions={}", node, _estimated_partitions);
ns.state = repair_state::set_estimated_partitions_started;
co_await master.repair_set_estimated_partitions(node, _estimated_partitions, ns.shard);
ns.state = repair_state::set_estimated_partitions_finished;
})).get();
while (true) {
auto status = negotiate_sync_boundary(master);
if (status == op_status::next_round) {
continue;
} else if (status == op_status::all_done) {
break;
}
status = get_missing_rows_from_follower_nodes(master);
if (status == op_status::next_round) {
continue;
}
send_missing_rows_to_follower_nodes(master);
}
} catch (replica::no_such_column_family& e) {
table_dropped = true;
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_shard_task.global_repair_id.uuid(), this_shard_id(), _shard_task.get_keyspace(), _cf_name, _range, e);
_failed = true;
} catch (std::exception& e) {
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_shard_task.global_repair_id.uuid(), this_shard_id(), _shard_task.get_keyspace(), _cf_name, _range, e);
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
_failed = true;
ex = std::current_exception();
}
bool mark_as_repaired = false;
if (master.is_incremental_repair() && !_failed) {
mark_as_repaired = true;
}
utils::get_local_injector().inject("repair_finish_wait", utils::wait_for_message(300s)).get();
parallel_for_each(nodes_to_stop, coroutine::lambda([&] (repair_node_state& ns) -> future<> {
auto node = ns.node;
master.set_repair_state(repair_state::row_level_stop_started, node);
co_await master.repair_row_level_stop(node, _shard_task.get_keyspace(), _cf_name, _range, ns.shard, mark_as_repaired);
master.set_repair_state(repair_state::row_level_stop_finished, node);
})).get();
_shard_task.update_statistics(master.stats());
if (_failed) {
if (table_dropped) {
throw replica::no_such_column_family(_shard_task.get_keyspace(), _cf_name);
} else {
throw nested_exception(std::make_exception_ptr(std::runtime_error(fmt::format("Failed to repair for keyspace={}, cf={}, range={}", _shard_task.get_keyspace(),
_cf_name, _range))), std::move(ex));
}
} else {
update_system_repair_table().get();
}
rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}",
master.myhostid(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_keyspace(), _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
});
}
};
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
sstring cf_name, table_id table_id, dht::token_range range,
const std::vector<locator::host_id>& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time,
service::frozen_topology_guard topo_guard) {
auto start_time = flush_time;
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
auto t = std::chrono::steady_clock::now();
auto update_time = seastar::defer([&] {
if (is_tablet && !is_tablet_rebuild) {
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
_metrics.tablet_time_ms += duration.count();
}
});
co_await repair.run();
}
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {
repair_service& _repair_service;
public:
row_level_repair_gossip_helper(repair_service& repair_service) noexcept
: _repair_service(repair_service)
{}
future<> remove_row_level_repair(locator::host_id node) {
rlogger.debug("Started to remove row level repair on all shards for node {}", node);
try {
co_await _repair_service.container().invoke_on_all([node] (repair_service& local_repair) {
return local_repair.remove_repair_meta(node);
});
rlogger.debug("Finished to remove row level repair on all shards for node {}", node);
} catch(...) {
rlogger.warn("Failed to remove row level repair for node {}: {}", node, std::current_exception());
}
}
virtual future<> on_dead(
gms::inet_address endpoint,
locator::host_id id,
gms::endpoint_state_ptr state,
gms::permit_id) override {
return remove_row_level_repair(id);
}
virtual future<> on_remove(
gms::inet_address endpoint,
locator::host_id id,
gms::permit_id) override {
return remove_row_level_repair(id);
}
virtual future<> on_restart(
gms::inet_address endpoint,
locator::host_id id,
gms::endpoint_state_ptr ep_state,
gms::permit_id) override {
return remove_row_level_repair(id);
}
};
repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
sharded<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<replica::database>& db,
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_keyspace>& sys_ks,
db::view::view_builder& vb,
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
: _tsm(tsm)
, _gossiper(gossiper)
, _messaging(ms)
, _db(db)
, _sp(sp)
, _bm(bm)
, _sys_ks(sys_ks)
, _view_builder(vb)
, _view_building_worker(vbw)
, _repair_module(seastar::make_shared<repair::task_manager_module>(tm, *this, max_repair_memory))
, _mm(mm)
, _node_ops_metrics(_repair_module)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
_gossip_helper = make_shared<row_level_repair_gossip_helper>(*this);
_gossiper.local().register_(_gossip_helper);
}
}
future<> repair_service::start(utils::disk_space_monitor* dsm) {
if (dsm && (this_shard_id() == 0)) {
_out_of_space_subscription = dsm->subscribe(_db.local().get_config().critical_disk_utilization_level, [this] (auto threshold_reached) {
if (threshold_reached) {
return container().invoke_on_all([] (repair_service& rs) { return rs.drain(); });
}
return container().invoke_on_all([] (repair_service& rs) { rs.enable(); });
});
}
_load_history_done = load_history();
co_await init_ms_handlers();
_state = state::running;
}
future<> repair_service::stop() {
try {
rlogger.debug("Stopping repair task module");
co_await _repair_module->stop();
rlogger.debug("Waiting on load_history_done");
co_await std::move(_load_history_done);
rlogger.debug("Uninitializing messaging service handlers");
co_await uninit_ms_handlers();
if (this_shard_id() == 0) {
rlogger.debug("Unregistering gossiper helper");
co_await _gossiper.local().unregister_(_gossip_helper);
}
_state = state::stopped;
rlogger.info("Stopped repair_service");
} catch (...) {
on_fatal_internal_error(rlogger, format("Failed stopping repair_service: {}", std::current_exception()));
}
}
void repair_service::enable() {
SCYLLA_ASSERT(_state == state::none || _state == state::running);
rlogger.info("Asked to enable");
if (_state == state::none) {
_state = state::running;
SCYLLA_ASSERT(_disabled_state_count == 0);
} else if (_disabled_state_count > 0 && --_disabled_state_count > 0) {
rlogger.debug("Repair service is still disabled, requires {} more call(s) to enable()", _disabled_state_count);
return;
}
rlogger.info("Enabled");
}
future<> repair_service::drain() {
rlogger.info("Asked to drain");
++_disabled_state_count;
// Abort ongoing repairs
co_await abort_all();
rlogger.info("Drained");
}
repair_service::~repair_service() {
// Assert that repair service was explicitly stopped, if started.
// Otherwise, fiber(s) will be alive after the object is stopped.
SCYLLA_ASSERT(_state == state::none || _state == state::stopped);
}
static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
return shard_id(repair_id.uuid().get_most_significant_bits()) % smp::count;
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time, is_tablet] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
if (rh.repair_time > repair_time) {
rh.repair_time = repair_time;
}
auto finished_shards = ++(rh.finished_ranges[table_id][range]);
// Tablet repair runs only on one shard
if (finished_shards == smp::count || is_tablet) {
// All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return rh.repair_time;
} else {
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_historytable, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return std::nullopt;
}
});
}
future<> repair_service::cleanup_history(tasks::task_id repair_id) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
rs._finished_ranges_history.erase(repair_id);
rlogger.debug("repair[{}]: Finished cleaning up repair_service history", repair_id);
});
}
future<> repair_service::load_history() {
try {
co_await get_db().local().get_tables_metadata().parallel_for_each_table(coroutine::lambda([&] (table_id table_uuid, lw_shared_ptr<replica::table> table) -> future<> {
auto shard = utils::uuid_xor_to_uint32(table_uuid.uuid()) % smp::count;
if (shard != this_shard_id()) {
co_return;
}
auto permit = co_await seastar::get_units(_load_parallelism_semaphore, 1);
rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}",
table->schema()->ks_name(), table->schema()->cf_name(), table_uuid);
co_await _sys_ks.local().get_repair_history(table_uuid, [this] (const auto& entry) -> future<> {
get_repair_module().check_in_shutdown();
auto start = entry.range_start == std::numeric_limits<int64_t>::min() ? dht::minimum_token() : dht::token::from_int64(entry.range_start);
auto end = entry.range_end == std::numeric_limits<int64_t>::min() ? dht::maximum_token() : dht::token::from_int64(entry.range_end);
auto range = dht::token_range(dht::token_range::bound(start, false), dht::token_range::bound(end, true));
auto repair_time = to_gc_clock(entry.ts);
rlogger.debug("Loading repair history for keyspace={}, table={}, table_uuid={}, repair_time={}, range={}",
entry.ks, entry.cf, entry.table_uuid, entry.ts, range);
try {
co_await get_db().invoke_on_all([table_uuid = entry.table_uuid, range, repair_time] (replica::database& local_db) {
auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state();
gc_state.update_repair_time(table_uuid, range, repair_time);
});
} catch (...) {
rlogger.warn("Failed to update repair history time for keyspace={}, table={}, range={}, repair_time={}",
entry.ks, entry.cf, range, repair_time);
}
});
}));
} catch (const abort_requested_exception&) {
// Ignore
} catch (...) {
rlogger.warn("Failed to update repair history time: {}. Ignored", std::current_exception());
}
}
repair_meta_ptr repair_service::get_repair_meta(locator::host_id from, uint32_t repair_meta_id) {
node_repair_meta_id id{from, repair_meta_id};
auto it = repair_meta_map().find(id);
if (it == repair_meta_map().end()) {
throw std::runtime_error(format("get_repair_meta: repair_meta_id {} for node {} does not exist", id.repair_meta_id, id.ip));
} else {
return it->second;
}
}
future<>
repair_service::insert_repair_meta(
locator::host_id from_id,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
dht::token_range range,
row_level_diff_detect_algorithm algo,
uint64_t max_row_buf_size,
uint64_t seed,
shard_config master_node_shard_config,
table_schema_version schema_version,
streaming::stream_reason reason,
gc_clock::time_point compaction_time,
abort_source& as,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode) {
schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, from_id, src_cpu_id, get_messaging(), as);
auto& db = get_db();
reader_permit permit = co_await db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {});
node_repair_meta_id id{from_id, repair_meta_id};
auto rm = seastar::make_shared<repair_meta>(*this,
get_db().local().find_column_family(s->id()),
s,
std::move(permit),
range,
algo,
max_row_buf_size,
seed,
repair_master::no,
repair_meta_id,
reason,
std::move(master_node_shard_config),
host_id_vector_replica_set{from_id},
compaction_time,
topo_guard,
repaired_at,
incremental_mode);
rm->set_repair_state_for_local_node(repair_state::row_level_start_started);
bool insertion = repair_meta_map().emplace(id, rm).second;
if (!insertion) {
rlogger.warn("insert_repair_meta: repair_meta_id {} for node {} already exists, replace existing one", id.repair_meta_id, id.ip);
repair_meta_map()[id] = rm;
rm->set_repair_state_for_local_node(repair_state::row_level_start_finished);
} else {
rlogger.debug("insert_repair_meta: Inserted repair_meta_id {} for node {}", id.repair_meta_id, id.ip);
}
}
future<>
repair_service::remove_repair_meta(const locator::host_id& from,
uint32_t repair_meta_id,
sstring ks_name,
sstring cf_name,
dht::token_range range,
bool mark_as_repaired) {
node_repair_meta_id id{from, repair_meta_id};
auto it = repair_meta_map().find(id);
if (it == repair_meta_map().end()) {
rlogger.warn("remove_repair_meta: repair_meta_id {} for node {} does not exist", id.repair_meta_id, id.ip);
co_return;
} else {
auto rm = it->second;
repair_meta_map().erase(it);
rlogger.debug("remove_repair_meta: Stop repair_meta_id {} for node {} started", id.repair_meta_id, id.ip);
co_await rm->stop();
if (mark_as_repaired) {
co_await rm->mark_sstable_as_repaired();
}
rlogger.debug("remove_repair_meta: Stop repair_meta_id {} for node {} finished", id.repair_meta_id, id.ip);
}
}
future<>
repair_service::remove_repair_meta(locator::host_id from) {
rlogger.debug("Remove all repair_meta for single node {}", from);
auto repair_metas = make_lw_shared<utils::chunked_vector<repair_meta_ptr>>();
for (auto it = repair_meta_map().begin(); it != repair_meta_map().end();) {
if (it->first.ip == from) {
repair_metas->push_back(it->second);
it = repair_meta_map().erase(it);
} else {
it++;
}
}
co_await coroutine::parallel_for_each(*repair_metas, [&] (auto& rm) -> future<> {
co_await rm->stop();
rm = {};
});
rlogger.debug("Removed all repair_meta for single node {}", from);
}
future<>
repair_service::remove_repair_meta() {
rlogger.debug("Remove all repair_meta for all nodes");
auto repair_metas = make_lw_shared<utils::chunked_vector<repair_meta_ptr>>(
repair_meta_map()
| std::views::values | std::ranges::to<utils::chunked_vector<repair_meta_ptr>>());
repair_meta_map().clear();
co_await coroutine::parallel_for_each(*repair_metas, [&] (auto& rm) -> future<> {
co_await rm->stop();
rm = {};
});
rlogger.debug("Removed all repair_meta for all nodes");
}
future<uint32_t> repair_service::get_next_repair_meta_id() {
return container().invoke_on(0, [] (repair_service& local_repair) {
return local_repair._next_repair_meta_id++;
});
}
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}