Files
scylladb/repair/row_level.cc
Asias He 6cb263bab0 repair: Prevent CPU stall during cross-shard row copy and destruction
When handling `repair_stream_cmd::end_of_current_rows`, passing the
foreign list directly to `put_row_diff_handler` triggered a massive
synchronous deep copy on the destination shard. Additionally, destroying
the list triggered a synchronous deallocation on the source shard. This
blocked the reactor and triggered the CPU stall detector.

This commit fixes the issue by introducing `clone_gently()` to copy the
list elements one by one, and leveraging the existing
`utils::clear_gently()` to destroy them. Both utilize
`seastar::coroutine::maybe_yield()` to allow the reactor to breathe
during large cross-shard transfers and cleanups.

Fixes SCYLLADB-403

Closes scylladb/scylladb#28979
2026-03-17 11:05:15 +02:00

4014 lines
196 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <exception>
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include "dht/auto_refreshing_sharder.hh"
#include "db/view/view_building_worker.hh"
#include "gms/endpoint_state.hh"
#include "repair/repair.hh"
#include "message/messaging_service.hh"
#include "repair/task_manager_module.hh"
#include <seastar/coroutine/exception.hh>
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "mutation/mutation_fragment.hh"
#include "mutation_writer/multishard_writer.hh"
#include "dht/i_partitioner.hh"
#include "dht/sharder.hh"
#include "utils/assert.hh"
#include "utils/xx_hasher.hh"
#include "utils/UUID.hh"
#include "replica/database.hh"
#include <seastar/util/bool_class.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <list>
#include <vector>
#include <algorithm>
#include <random>
#include <optional>
#include <boost/intrusive/list.hpp>
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/gossiper.hh"
#include "repair/row_level.hh"
#include "utils/stall_free.hh"
#include "utils/to_string.hh"
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/as_future.hh>
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "db/batchlog_manager.hh"
#include "idl/repair.dist.hh"
#include "readers/empty.hh"
#include "readers/evictable.hh"
#include "readers/queue.hh"
#include "readers/filtering.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "repair/hash.hh"
#include "repair/decorated_key_with_hash.hh"
#include "repair/row.hh"
#include "repair/writer.hh"
#include "repair/reader.hh"
#include "repair/incremental.hh"
#include "compaction/compaction_manager.hh"
#include "utils/xx_hasher.hh"
#include "utils/error_injection.hh"
#include "locator/tablets.hh"
#include "gms/feature_service.hh"
extern logging::logger rlogger;
static bool inject_rpc_stream_error = false;
static shard_id get_dst_shard_id(uint32_t src_cpu_id, const rpc::optional<shard_id>& dst_cpu_id_opt) {
uint32_t dst_cpu_id = 0;
if (dst_cpu_id_opt && *dst_cpu_id_opt != repair_unspecified_shard) {
dst_cpu_id = *dst_cpu_id_opt;
} else {
dst_cpu_id = src_cpu_id % smp::count;
}
return dst_cpu_id;
}
enum class repair_state : uint16_t {
unknown,
row_level_start_started,
row_level_start_finished,
get_estimated_partitions_started,
get_estimated_partitions_finished,
set_estimated_partitions_started,
set_estimated_partitions_finished,
get_sync_boundary_started,
get_sync_boundary_finished,
get_combined_row_hash_started,
get_combined_row_hash_finished,
get_row_diff_with_rpc_stream_started,
get_row_diff_with_rpc_stream_finished,
get_row_diff_and_update_peer_row_hash_sets_started,
get_row_diff_and_update_peer_row_hash_sets_finished,
get_full_row_hashes_with_rpc_stream_started,
get_full_row_hashes_with_rpc_stream_finished,
get_full_row_hashes_started,
get_full_row_hashes_finished,
get_row_diff_started,
get_row_diff_finished,
put_row_diff_with_rpc_stream_started,
put_row_diff_with_rpc_stream_finished,
put_row_diff_started,
put_row_diff_finished,
row_level_stop_started,
row_level_stop_finished,
};
struct repair_node_state {
locator::host_id node;
repair_state state = repair_state::unknown;
// The shard that repair instance runs on
shard_id shard;
explicit repair_node_state(locator::host_id n) : node(n) { }
explicit repair_node_state(locator::host_id n, shard_id s) : node(n), shard(s) { }
};
// Wraps sink and source objects for repair master or repair follower nodes.
// For repair master, it stores sink and source pair for each of the followers.
// For repair follower, it stores one sink and source pair for repair master.
template<class SinkType, class SourceType>
class sink_source_for_repair {
uint32_t _repair_meta_id;
using get_sink_source_fn_type = std::function<future<std::tuple<rpc::sink<SinkType>, rpc::source<SourceType>>> (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id, locator::host_id addr)>;
using sink_type = std::reference_wrapper<rpc::sink<SinkType>>;
using source_type = std::reference_wrapper<rpc::source<SourceType>>;
// The vectors below store sink and source object for peer nodes.
std::vector<std::optional<rpc::sink<SinkType>>> _sinks;
std::vector<std::optional<rpc::source<SourceType>>> _sources;
std::vector<bool> _sources_closed;
get_sink_source_fn_type _fn;
public:
sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn)
: _repair_meta_id(repair_meta_id)
, _sinks(nr_peer_nodes)
, _sources(nr_peer_nodes)
, _sources_closed(nr_peer_nodes, false)
, _fn(std::move(fn)) {
}
void mark_source_closed(unsigned node_idx) {
_sources_closed[node_idx] = true;
}
future<std::tuple<sink_type, source_type>> get_sink_source(locator::host_id remote_node, unsigned node_idx, std::optional<shard_id> dst_cpu_id) {
using value_type = std::tuple<sink_type, source_type>;
if (_sinks[node_idx] && _sources[node_idx]) {
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
if (_sinks[node_idx] || _sources[node_idx]) {
throw std::runtime_error(format("sink or source is missing for node {}", remote_node));
}
auto [sink, source] = co_await _fn(_repair_meta_id, dst_cpu_id, remote_node);
_sinks[node_idx].emplace(std::move(sink));
_sources[node_idx].emplace(std::move(source));
co_return value_type(_sinks[node_idx].value(), _sources[node_idx].value());
}
future<> close() {
co_await coroutine::parallel_for_each(std::views::iota(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) -> future<> {
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
f = co_await coroutine::as_future(std::move(f));
try {
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
if (source_opt && !_sources_closed[node_idx]) {
while (co_await (*source_opt)()) {
// Keep reading source until end of stream
}
}
} catch (...) {
// Ignore the exception as we're just draining *source_opt
}
f.get();
});
}
};
using sink_source_for_get_full_row_hashes = sink_source_for_repair<repair_stream_cmd, repair_hash_with_cmd>;
using sink_source_for_get_row_diff = sink_source_for_repair<repair_hash_with_cmd, repair_row_on_wire_with_cmd>;
using sink_source_for_put_row_diff = sink_source_for_repair<repair_row_on_wire_with_cmd, repair_stream_cmd>;
struct row_level_repair_metrics {
seastar::metrics::metric_groups _metrics;
uint64_t tx_row_nr{0};
uint64_t rx_row_nr{0};
uint64_t tx_row_bytes{0};
uint64_t rx_row_bytes{0};
uint64_t row_from_disk_nr{0};
uint64_t row_from_disk_bytes{0};
uint64_t tx_hashes_nr{0};
uint64_t rx_hashes_nr{0};
uint64_t inc_sst_skipped_bytes{0};
uint64_t inc_sst_read_bytes{0};
uint64_t tablet_time_ms{0};
row_level_repair_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("repair", {
sm::make_counter("tx_row_nr", tx_row_nr,
sm::description("Total number of rows sent on this shard.")),
sm::make_counter("rx_row_nr", rx_row_nr,
sm::description("Total number of rows received on this shard.")),
sm::make_counter("tx_row_bytes", tx_row_bytes,
sm::description("Total bytes of rows sent on this shard.")),
sm::make_counter("rx_row_bytes", rx_row_bytes,
sm::description("Total bytes of rows received on this shard.")),
sm::make_counter("tx_hashes_nr", tx_hashes_nr,
sm::description("Total number of row hashes sent on this shard.")),
sm::make_counter("rx_hashes_nr", rx_hashes_nr,
sm::description("Total number of row hashes received on this shard.")),
sm::make_counter("row_from_disk_nr", row_from_disk_nr,
sm::description("Total number of rows read from disk on this shard.")),
sm::make_counter("row_from_disk_bytes", row_from_disk_bytes,
sm::description("Total bytes of rows read from disk on this shard.")),
sm::make_counter("inc_sst_skipped_bytes", inc_sst_skipped_bytes,
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
sm::make_counter("tablet_time_ms", tablet_time_ms,
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
});
}
};
static thread_local row_level_repair_metrics _metrics;
static const std::vector<row_level_diff_detect_algorithm>& suportted_diff_detect_algorithms() {
static std::vector<row_level_diff_detect_algorithm> _algorithms = {
row_level_diff_detect_algorithm::send_full_set,
row_level_diff_detect_algorithm::send_full_set_rpc_stream,
};
return _algorithms;
};
static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const host_id_vector_replica_set& nodes) {
std::vector<std::vector<row_level_diff_detect_algorithm>> nodes_algorithms(nodes.size());
parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> {
std::vector<row_level_diff_detect_algorithm> algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, nodes[idx]);
std::sort(algorithms.begin(), algorithms.end());
nodes_algorithms[idx] = std::move(algorithms);
rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]);
})).get();
auto common_algorithms = suportted_diff_detect_algorithms();
for (auto& algorithms : nodes_algorithms) {
std::sort(common_algorithms.begin(), common_algorithms.end());
std::vector<row_level_diff_detect_algorithm> results;
std::set_intersection(algorithms.begin(), algorithms.end(),
common_algorithms.begin(), common_algorithms.end(),
std::back_inserter(results));
common_algorithms = std::move(results);
}
rlogger.trace("peer_algorithms={}, local_algorithms={}, common_diff_detect_algorithms={}",
nodes_algorithms, suportted_diff_detect_algorithms(), common_algorithms);
if (common_algorithms.empty()) {
throw std::runtime_error("Can not find row level repair diff detect algorithm");
}
return common_algorithms.back();
}
static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) {
// send_full_set is the only algorithm that does not support rpc stream
return algo != row_level_diff_detect_algorithm::send_full_set;
}
static uint64_t get_random_seed() {
static thread_local std::default_random_engine random_engine{std::random_device{}()};
static thread_local std::uniform_int_distribution<uint64_t> random_dist{};
return random_dist(random_engine);
}
repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
xx_hasher h(_seed);
feed_hash(h, mf, *_schema);
feed_hash(h, dk_with_hash.hash.hash);
return repair_hash(h.finalize_uint64());
}
mutation_reader repair_reader::make_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard,
gc_clock::time_point compaction_time,
incremental_repair_meta inc) {
switch (strategy) {
case read_strategy::local: {
auto ms = mutation_source([&cf, compaction_time] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr, compaction_time);
});
mutation_reader rd(nullptr);
std::tie(rd, _reader_handle) = make_manually_paused_evictable_reader(
std::move(ms),
_schema,
_permit,
_range,
_schema->full_slice(),
{},
mutation_reader::forwarding::no);
return rd;
}
case read_strategy::multishard_split: {
std::optional<size_t> multishard_reader_buffer_size;
const auto& dbconfig = db.local().get_config();
if (dbconfig.repair_multishard_reader_buffer_hint_size()) {
// Setting the repair buffer size as the multishard reader's buffer
// size helps avoid extra cross-shard round-trips and possible
// evict-recreate cycles.
multishard_reader_buffer_size = dbconfig.repair_multishard_reader_buffer_hint_size();
}
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
auto shard_range = _sharder.next();
if (shard_range) {
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, compaction_time, multishard_reader_buffer_size, read_ahead(dbconfig.repair_multishard_reader_enable_read_ahead()));
}
case read_strategy::multishard_filter: {
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}, read_ahead::yes),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_for_reads(k.token()) == remote_shard;
});
}
case read_strategy::incremental_repair: {
return cf.make_streaming_reader(_schema, _permit, _range, inc.sst_set, gc_clock::now());
}
default:
on_internal_error(rlogger,
format("make_reader: unexpected read_strategy {}", static_cast<int>(strategy)));
}
}
repair_reader::repair_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
const dht::static_sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy,
gc_clock::time_point compaction_time,
incremental_repair_meta inc)
: _schema(s)
, _permit(std::move(permit))
, _range(dht::to_partition_range(range))
, _sharder(remote_sharder, range, remote_shard)
, _seed(seed)
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc))
{ }
future<mutation_fragment_opt>
repair_reader::read_mutation_fragment() {
++_reads_issued;
return _reader().then_wrapped([this] (future<mutation_fragment_opt> f) {
try {
auto mfopt = f.get();
++_reads_finished;
return mfopt;
} catch (seastar::timed_out_error& e) {
rlogger.warn("Failed to read a fragment from the reader, keyspace={}, table={}, range={}: {}",
_schema->ks_name(), _schema->cf_name(), _range, e);
throw;
} catch (...) {
throw;
}
});
}
future<> repair_reader::on_end_of_stream() noexcept {
co_await _reader.close();
_permit.release_base_resources();
_reader = mutation_fragment_v1_stream(make_empty_mutation_reader(_schema, _permit));
_reader_handle.reset();
}
future<> repair_reader::close() noexcept {
co_await _reader.close();
_permit.release_base_resources();
_reader_handle.reset();
}
void repair_reader::set_current_dk(const dht::decorated_key& key) {
_current_dk = make_lw_shared<const decorated_key_with_hash>(*_schema, key, _seed);
}
void repair_reader::clear_current_dk() {
_current_dk = {};
}
void repair_reader::check_current_dk() {
if (!_current_dk) {
throw std::runtime_error("Current partition_key is unknown");
}
}
void repair_reader::pause() {
if (_reader_handle) {
_reader_handle->pause();
}
}
class repair_writer_impl : public repair_writer::impl {
schema_ptr _schema;
std::optional<int64_t> _repaired_at;
reader_permit _permit;
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
db::view::view_builder& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
streaming::stream_reason _reason;
mutation_reader _queue_reader;
service::frozen_topology_guard _topo_guard;
public:
repair_writer_impl(
schema_ptr schema,
std::optional<int64_t> repaired_at,
reader_permit permit,
sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
streaming::stream_reason reason,
mutation_fragment_queue queue,
mutation_reader queue_reader,
service::frozen_topology_guard topo_guard)
: _schema(std::move(schema))
, _repaired_at(repaired_at)
, _permit(std::move(permit))
, _mq(std::move(queue))
, _db(db)
, _view_builder(view_builder)
, _view_building_worker(view_building_worker)
, _reason(reason)
, _queue_reader(std::move(queue_reader))
, _topo_guard(topo_guard)
{}
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override;
virtual mutation_fragment_queue& queue() override {
return _mq;
}
virtual future<> wait_for_writer_done() override;
private:
static sstables::offstrategy is_offstrategy_supported(streaming::stream_reason reason) {
static const std::unordered_set<streaming::stream_reason> operations_supported = {
streaming::stream_reason::bootstrap,
streaming::stream_reason::replace,
streaming::stream_reason::removenode,
streaming::stream_reason::decommission,
streaming::stream_reason::repair,
streaming::stream_reason::rebuild,
};
return sstables::offstrategy(operations_supported.contains(reason));
}
};
future<> repair_writer::write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
_current_dk_written_to_sstable = dk;
if (mf.is_partition_start()) {
return _mq->push(std::move(mf)).then([this] {
_partition_opened = true;
});
} else {
auto start = mutation_fragment(*_schema, _permit, partition_start(dk->dk, tombstone()));
return _mq->push(std::move(start)).then([this, mf = std::move(mf)] () mutable {
_partition_opened = true;
return _mq->push(std::move(mf));
});
}
};
class queue_reader_handle_adapter : public mutation_fragment_queue::impl {
queue_reader_handle _handle;
public:
queue_reader_handle_adapter(queue_reader_handle handle)
: _handle(std::move(handle))
{}
virtual future<> push(mutation_fragment_v2 mf) override {
return _handle.push(std::move(mf));
}
virtual void abort(std::exception_ptr ep) override {
_handle.abort(std::move(ep));
}
virtual void push_end_of_stream() override {
_handle.push_end_of_stream();
}
};
mutation_fragment_queue make_mutation_fragment_queue(schema_ptr s, reader_permit permit, queue_reader_handle handle) {
return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared<queue_reader_handle_adapter>(std::move(handle)));
}
struct sharder_helper {
struct tablet_sharder_keepalive {
std::unique_ptr<dht::auto_refreshing_sharder> sharder_ptr;
service::topology_guard topo_guard;
};
using sharder_keepalive = std::variant<tablet_sharder_keepalive, locator::effective_replication_map_ptr>;
sharder_keepalive keepalive;
const dht::sharder& sharder;
sharder_helper(sharder_keepalive s_keepalive, const dht::sharder& s)
: keepalive(std::move(s_keepalive))
, sharder(s)
{}
};
sharder_helper get_sharder_helper(replica::table& t, const schema& s, service::frozen_topology_guard frozen_topo_guard) {
if (frozen_topo_guard == service::default_session_id) {
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(s);
sharder_helper::sharder_keepalive keepalive = std::move(erm);
return sharder_helper{std::move(keepalive), sharder};
} else {
sharder_helper::tablet_sharder_keepalive keepalive{
.sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(t.shared_from_this()),
.topo_guard = frozen_topo_guard
};
auto& sharder = *keepalive.sharder_ptr;
return sharder_helper{std::move(keepalive), sharder};
}
}
void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
if (_writer_done) {
return;
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
// #17384 don't use off-strategy for repair (etc) if using tablets. sstables generated will
// be single token range and can just be added to normal sstable set as is, eventually
// handled by normal compaction.
auto off_str = t.uses_tablets() ? sstables::offstrategy(false) : is_offstrategy_supported(_reason);
auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard);
bool mark_as_repaired = t.uses_tablets() && _repaired_at.has_value();
auto& sst_list = w->get_sstable_list_to_mark_as_repaired();
auto shard = this_shard_id();
auto inc_repair_handler = [&sst_list, mark_as_repaired, shard, sid = _topo_guard] (sstables::shared_sstable sst) mutable {
if (!mark_as_repaired) {
return;
}
if (shard != this_shard_id()) {
return;
}
sst->being_repaired = sid;
sst_list.insert(sst);
};
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, _view_building_worker, w->get_estimated_partitions(), _reason, off_str,
_topo_guard, inc_repair_handler),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
return utils::get_local_injector().inject("repair_writer_impl_create_writer_wait", utils::wait_for_message(200s));
}).handle_exception([w, keepalive = std::move(sharder.keepalive)] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
w->schema()->ks_name(), w->schema()->cf_name(), ep);
w->queue().abort(ep);
return make_exception_future<>(std::move(ep));
});
}
lw_shared_ptr<repair_writer> make_repair_writer(
schema_ptr schema,
std::optional<int64_t> repaired_at,
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
service::frozen_topology_guard topo_guard) {
auto [queue_reader, queue_handle] = make_queue_reader(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, repaired_at, permit, db, view_builder, view_building_worker, reason, std::move(queue), std::move(queue_reader), topo_guard);
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}
future<> repair_writer::write_partition_end() {
if (_partition_opened) {
return _mq->push(mutation_fragment(*_schema, _permit, partition_end())).then([this] {
_partition_opened = false;
});
}
return make_ready_future<>();
}
future<> repair_writer::do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
if (_current_dk_written_to_sstable) {
const auto cmp_res = _current_dk_written_to_sstable->dk.tri_compare(*_schema, dk->dk);
if (cmp_res > 0) {
on_internal_error(rlogger, format("repair_writer::do_write(): received out-of-order partition, current: {}, next: {}", _current_dk_written_to_sstable->dk, dk->dk));
} else if (cmp_res == 0) {
return _mq->push(std::move(mf));
} else {
return write_partition_end().then([this,
dk = std::move(dk), mf = std::move(mf)] () mutable {
return write_start_and_mf(std::move(dk), std::move(mf));
});
}
} else {
return write_start_and_mf(std::move(dk), std::move(mf));
}
}
future<> repair_writer::write_end_of_stream() {
if (_created_writer) {
return with_semaphore(_sem, 1, [this] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end().then([this] () mutable {
_mq->push_end_of_stream();
}).handle_exception([this] (std::exception_ptr ep) {
_mq->abort(ep);
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
});
} else {
return make_ready_future<>();
}
}
future<> repair_writer_impl::wait_for_writer_done() {
if (_writer_done) {
return std::move(*(_writer_done));
} else {
return make_ready_future<>();
}
}
future<> repair_writer::wait_for_writer_done() {
return when_all_succeed(write_end_of_stream(), _impl->wait_for_writer_done()).discard_result().handle_exception(
[this] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
}
class repair_meta;
class repair_meta_tracker;
class row_level_repair;
static void add_to_repair_meta_for_masters(repair_meta& rm);
static void add_to_repair_meta_for_followers(repair_meta& rm);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr s, uint64_t seed, repair_master is_master, reader_permit permit, repair_hasher hasher) {
std::list<repair_row> row_list;
std::exception_ptr ex;
try {
lw_shared_ptr<const decorated_key_with_hash> dk_ptr;
lw_shared_ptr<mutation_fragment> last_mf;
position_in_partition::tri_compare cmp(*s);
// Consume the rows and mutation_fragment:s below incrementally
// as it interleaves frees with allocation, reducing memory pressure,
// and to prevent reactor stalls when freeing a large repair_rows_on_wire
// object in one shot when the function returns.
for (auto it = rows.begin(); it != rows.end(); it = rows.erase(it)) {
auto x = std::move(*it);
if (dk_ptr && x.use_last_partition_key()) {
// Use previous partition key when an empty partition is received
} else {
dht::decorated_key dk = dht::decorate_key(*s, x.get_key());
if (!(dk_ptr && dk_ptr->dk.equal(*s, dk))) {
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*s, dk, seed);
// Reset last_mf only when new partition is seen
last_mf = {};
}
}
auto& mutation_fragments = x.get_mutation_fragments();
if (is_master) {
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
// Keep the mutation_fragment in repair_row as an
// optimization to avoid unfreeze again when
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
auto hash = hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
co_await coroutine::maybe_yield();
}
} else {
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
// If the mutation_fragment has the same position as
// the last mutation_fragment, it means they are the
// same row with different contents. We can not feed
// such rows into the sstable writer. Instead we apply
// the mutation_fragment into the previous one.
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
last_mf = mf;
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
}
co_await coroutine::maybe_yield();
}
}
co_await coroutine::maybe_yield();
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await utils::clear_gently(rows);
co_await utils::clear_gently(row_list);
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(row_list);
}
class repair_meta {
friend repair_meta_tracker;
public:
using update_working_row_buf = bool_class<class update_working_row_buf_tag>;
using update_peer_row_hash_sets = bool_class<class update_peer_row_hash_sets_tag>;
using needs_all_rows_t = bool_class<class needs_all_rows_tag>;
using msg_addr = netw::messaging_service::msg_addr;
using tracker_link_type = boost::intrusive::list_member_hook<bi::link_mode<boost::intrusive::auto_unlink>>;
private:
repair_service& _rs;
seastar::sharded<replica::database>& _db;
netw::messaging_service& _messaging;
schema_ptr _schema;
reader_permit _permit;
dht::token_range _range;
repair_sync_boundary::tri_compare _cmp;
// The algorithm used to find the row difference
row_level_diff_detect_algorithm _algo;
// Max rows size can be stored in _row_buf
size_t _max_row_buf_size;
uint64_t _seed = 0;
repair_master _repair_master;
uint32_t _repair_meta_id;
streaming::stream_reason _reason;
// Repair master's sharding configuration
shard_config _master_node_shard_config;
// sharding info of repair master
dht::static_sharder _remote_sharder;
bool _same_sharding_config = false;
struct local_range_estimation {
size_t master_subranges_count;
size_t partitions_count;
};
std::optional<local_range_estimation> _local_range_estimation;
uint64_t _estimated_partitions = 0;
// For repair master nr peers is the number of repair followers, for repair
// follower nr peers is always one because repair master is the only peer.
size_t _nr_peer_nodes= 1;
repair_stats _stats;
std::optional<repair_reader> _repair_reader;
std::optional<int64_t> _repaired_at;
locator::tablet_repair_incremental_mode _incremental_mode;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
std::list<repair_row> _row_buf;
// Contains rows we are working on to sync between peers
std::list<repair_row> _working_row_buf;
// Combines all the repair_hash in _working_row_buf
repair_hash _working_row_buf_combined_hash;
// Tracks the last sync boundary
std::optional<repair_sync_boundary> _last_sync_boundary;
// Tracks current sync boundary
std::optional<repair_sync_boundary> _current_sync_boundary;
// Contains the hashes of rows in the _working_row_buffor for all peer nodes
std::vector<repair_hash_set> _peer_row_hash_sets;
// Gate used to make sure pending operation of meta data is done
seastar::named_gate _gate;
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
sink_source_for_get_row_diff _sink_source_for_get_row_diff;
sink_source_for_put_row_diff _sink_source_for_put_row_diff;
tracker_link_type _tracker_link;
row_level_repair* _row_level_repair_ptr;
std::vector<repair_node_state> _all_node_states;
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_future<>> _stopped;
repair_hasher _repair_hasher;
gc_clock::time_point _compaction_time;
bool _is_tablet;
reader_concurrency_semaphore::inactive_read_handle _fake_inactive_read_handle;
std::unique_ptr<const locator::token_metadata> _small_table_optimization_tm;
seastar::semaphore _small_table_optimization_tm_sem{1};
bool _small_table_optimization_tm_calculated = false;
service::frozen_topology_guard _frozen_topology_guard;
service::topology_guard _topology_guard;
const bool _is_eligible_to_repair_rejection;
private:
incremental_repair_meta _incremental_repair_meta;
public:
bool is_incremental_repair() {
return _repaired_at.has_value();
}
// The tablet_repair_incremental_mode::full mode turns on incremental
// repair and selects all sstables for repair.
bool is_incremental_repair_using_all_sstables() {
return is_incremental_repair() && _incremental_mode == locator::tablet_repair_incremental_mode::full;
}
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
}
void set_repair_state(repair_state state, locator::host_id node) {
for (auto& ns : all_nodes()) {
if (ns.node == node) {
ns.state = state;
}
}
}
void set_repair_state_for_local_node(repair_state state) {
// The first node is the local node
all_nodes().front().state = state;
}
repair_stats& stats() {
return _stats;
}
locator::host_id myhostid() const {
return _rs.my_host_id();
}
uint32_t repair_meta_id() const {
return _repair_meta_id;
}
const std::optional<repair_sync_boundary>& current_sync_boundary() const {
return _current_sync_boundary;
}
const std::optional<repair_sync_boundary>& last_sync_boundary() const {
return _last_sync_boundary;
};
const repair_hash& working_row_buf_combined_hash() const {
return _working_row_buf_combined_hash;
}
bool use_rpc_stream() const {
return is_rpc_stream_supported(_algo);
}
public:
// master constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
host_id_vector_replica_set all_live_peer_nodes,
size_t nr_peer_nodes,
std::vector<std::optional<shard_id>> all_live_peer_shards,
row_level_repair* row_level_repair_ptr,
gc_clock::time_point compaction_time,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode)
: _rs(rs)
, _db(rs.get_db())
, _messaging(rs.get_messaging())
, _schema(s)
, _permit(std::move(permit))
, _range(range)
, _cmp(repair_sync_boundary::tri_compare(*_schema))
, _algo(algo)
, _max_row_buf_size(max_row_buf_size)
, _seed(seed)
, _repair_master(master)
, _repair_meta_id(repair_meta_id)
, _reason(reason)
, _master_node_shard_config(std::move(master_node_shard_config))
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config(cf))
, _nr_peer_nodes(nr_peer_nodes)
, _repaired_at(repaired_at)
, _incremental_mode(incremental_mode)
, _repair_writer(make_repair_writer(_schema, _repaired_at, _permit, _reason, _db, rs.get_view_builder(), rs.get_view_building_worker(), topo_guard))
, _gate(format("repair_meta[{}]", repair_meta_id))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_full_row_hashes: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("put_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _row_level_repair_ptr(row_level_repair_ptr)
, _repair_hasher(_seed, _schema)
, _compaction_time(compaction_time)
, _is_tablet(cf.uses_tablets())
, _frozen_topology_guard(topo_guard)
, _topology_guard(_frozen_topology_guard)
, _is_eligible_to_repair_rejection(cf.is_eligible_to_write_rejection_on_critical_disk_utilization())
{
if (is_incremental_repair()) {
rlogger.info("Enable incremental repair for table={}.{} range={}",
_schema->ks_name(), _schema->cf_name(), _range);
}
if (master) {
add_to_repair_meta_for_masters(*this);
} else {
add_to_repair_meta_for_followers(*this);
}
SCYLLA_ASSERT(all_live_peer_shards.size() == all_live_peer_nodes.size());
_all_node_states.push_back(repair_node_state(myhostid(), this_shard_id()));
for (unsigned i = 0; i < all_live_peer_nodes.size(); i++) {
_all_node_states.push_back(repair_node_state(all_live_peer_nodes[i], all_live_peer_shards[i].value_or(repair_unspecified_shard)));
}
// Mark the permit as evictable immediately. If there are multiple
// repairs launched, they can deadlock in the initial phase, before
// they start reading from disk (which is the point where they
// become evictable normally).
// Prevent this by marking the permit as evictable ASAP.
// FIXME: provide a better API for this, this is very clunky
_fake_inactive_read_handle = _db.local().get_reader_concurrency_semaphore().register_inactive_read(make_empty_mutation_reader(_schema, _permit));
}
// follower constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
host_id_vector_replica_set all_live_peer_nodes,
gc_clock::time_point compaction_time,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode)
: repair_meta(rs, cf, std::move(s), std::move(permit), std::move(range), algo, max_row_buf_size, seed, master, repair_meta_id, reason,
std::move(master_node_shard_config), std::move(all_live_peer_nodes), 1, {std::nullopt}, nullptr, compaction_time, topo_guard, repaired_at, incremental_mode)
{
}
public:
std::optional<shard_id> get_peer_node_dst_cpu_id(uint32_t peer_node_idx) {
SCYLLA_ASSERT(peer_node_idx + 1 < all_nodes().size());
return all_nodes()[peer_node_idx + 1].shard;
}
public:
future<> clear_gently() noexcept {
co_await utils::clear_gently(_peer_row_hash_sets);
co_await utils::clear_gently(_working_row_buf);
co_await utils::clear_gently(_row_buf);
}
future<> stop() {
// Handle deferred stop
if (_stopped) {
return _stopped->get_future();
}
promise<> stopped;
_stopped.emplace(stopped.get_future());
auto gate_future = _gate.close();
auto f1 = _sink_source_for_get_full_row_hashes.close();
auto f2 = _sink_source_for_get_row_diff.close();
auto f3 = _sink_source_for_put_row_diff.close();
rlogger.debug("repair_meta::stop");
// move to background. waited on via _stopped->get_future.
when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
return _repair_writer->wait_for_writer_done().finally([this] {
return close().then([this] {
return clear_gently();
});
});
}).forward_to(std::move(stopped));
return _stopped->get_future();
}
void reset_peer_row_hash_sets() {
if (_peer_row_hash_sets.size() != _nr_peer_nodes) {
_peer_row_hash_sets.resize(_nr_peer_nodes);
} else {
for (auto& x : _peer_row_hash_sets) {
x.clear();
}
}
}
repair_hash_set& peer_row_hash_sets(unsigned node_idx) {
return _peer_row_hash_sets[node_idx];
}
// Get a list of row hashes in _working_row_buf
future<repair_hash_set>
working_row_hashes() {
auto hashes = repair_hash_set();
for (auto& r : _working_row_buf) {
hashes.emplace(r.hash());
co_await coroutine::maybe_yield();
}
co_return std::move(hashes);
}
std::pair<std::optional<repair_sync_boundary>, bool>
get_common_sync_boundary(bool zero_rows,
std::vector<repair_sync_boundary>& sync_boundaries,
std::vector<repair_hash>& combined_hashes) {
if (sync_boundaries.empty()) {
throw std::runtime_error("sync_boundaries is empty");
}
if(combined_hashes.empty()) {
throw std::runtime_error("combined_hashes is empty");
}
// Get the smallest sync boundary in the list as the common sync boundary
std::sort(sync_boundaries.begin(), sync_boundaries.end(),
[this] (const auto& a, const auto& b) { return this->_cmp(a, b) < 0; });
repair_sync_boundary sync_boundary_min = sync_boundaries.front();
// Check if peers have identical combined hashes and sync boundary
bool same_hashes = std::adjacent_find(combined_hashes.begin(), combined_hashes.end(),
std::not_equal_to<repair_hash>()) == combined_hashes.end();
bool same_boundary = std::adjacent_find(sync_boundaries.begin(), sync_boundaries.end(),
[this] (const repair_sync_boundary& a, const repair_sync_boundary& b) { return this->_cmp(a, b) != 0; }) == sync_boundaries.end();
rlogger.debug("get_common_sync_boundary: zero_rows={}, same_hashes={}, same_boundary={}, combined_hashes={}, sync_boundaries={}",
zero_rows, same_hashes, same_boundary, combined_hashes, sync_boundaries);
bool already_synced = same_hashes && same_boundary && !zero_rows;
return std::pair<std::optional<repair_sync_boundary>, bool>(sync_boundary_min, already_synced);
}
future<> close() noexcept {
return _repair_reader ? _repair_reader->close() : make_ready_future<>();
}
private:
future<uint64_t> do_estimate_partitions_on_all_shards(const dht::token_range& range) {
return estimate_partitions(_db, _schema->ks_name(), _schema->cf_name(), range);
}
future<uint64_t> do_estimate_partitions_on_local_shard() {
auto& cf = _db.local().find_column_family(_schema->id());
return cf.estimated_partitions_in_range(_range);
}
future<uint64_t> get_estimated_partitions() {
auto gate_held = _gate.hold();
if (_repair_master || _same_sharding_config || _is_tablet) {
co_return co_await do_estimate_partitions_on_local_shard();
} else {
auto sharder = dht::selective_token_range_sharder(_remote_sharder, _range, _master_node_shard_config.shard);
auto partitions_sum = uint64_t(0);
auto subranges = uint64_t(0);
for (auto shard_range = sharder.next(); shard_range; shard_range = sharder.next()) {
++subranges;
auto partitions = co_await do_estimate_partitions_on_all_shards(*shard_range);
partitions_sum += partitions;
}
_local_range_estimation = local_range_estimation {
.master_subranges_count = subranges,
.partitions_count = partitions_sum
};
co_return partitions_sum;
}
}
future<> set_estimated_partitions(uint64_t estimated_partitions) {
auto gate_held = _gate.hold();
_estimated_partitions = estimated_partitions;
_repair_writer->set_estimated_partitions(_estimated_partitions);
co_return;
}
dht::static_sharder make_remote_sharder() {
return dht::static_sharder(_master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb);
}
bool is_same_sharding_config(replica::column_family& cf) {
rlogger.debug("is_same_sharding_config: remote_shard={}, remote_shard_count={}, remote_ignore_msb={}",
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb);
auto& sharder = cf.get_effective_replication_map()->get_sharder(*_schema);
return sharder.shard_count() == _master_node_shard_config.shard_count
&& sharder.sharding_ignore_msb() == _master_node_shard_config.ignore_msb
&& this_shard_id() == _master_node_shard_config.shard;
}
future<size_t> get_repair_rows_size(const std::list<repair_row>& rows) const {
size_t sz = 0;
for (const repair_row& r : rows) {
sz += r.size();
co_await coroutine::maybe_yield();
}
co_return sz;
}
// Get the size of rows in _row_buf
future<size_t> row_buf_size() const {
return get_repair_rows_size(_row_buf);
}
// return the combined checksum of rows in _row_buf
future<repair_hash> row_buf_csum() {
repair_hash combined;
for (repair_row& r : _row_buf) {
combined.add(r.hash());
co_await coroutine::maybe_yield();
}
co_return combined;
}
void handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (mf.is_partition_start()) {
auto& start = mf.as_partition_start();
_repair_reader->set_current_dk(start.key());
if (!start.partition_tombstone()) {
// Ignore partition_start with empty partition tombstone
return;
}
} else if (mf.is_end_of_partition()) {
_repair_reader->clear_current_dk();
return;
}
auto hash = _repair_hasher.do_hash_for_mf(*_repair_reader->get_current_dk(), mf);
repair_row r(freeze(*_schema, mf), position_in_partition(mf.position()), _repair_reader->get_current_dk(), hash, is_dirty_on_master::no);
rlogger.trace("Reading: r.boundary={}, r.hash={}", r.boundary(), r.hash());
auto sz = r.size();
_metrics.row_from_disk_nr++;
_metrics.row_from_disk_bytes += sz;
cur_size += sz;
new_rows_size += sz;
cur_rows.push_back(std::move(r));
}
future<> prepare_sstables_for_incremental_repair() {
auto& table = _db.local().find_column_family(_schema->id());
table_id tid = table.schema()->id();
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
auto last_token = _range.end() ? _range.end()->value() : dht::maximum_token();
auto id = tmap.get_tablet_id(last_token);
auto range = tmap.get_token_range(id);
if (range != _range) {
on_internal_error(rlogger, format("Repair range={} does not match tablet range={}", _range, range));
}
bool full = is_incremental_repair_using_all_sstables();
auto& tinfo = tmap.get_tablet_info(id);
auto sstables_repaired_at = tinfo.sstables_repaired_at;
auto gid = locator::global_tablet_id{tid, id};
// Consider this:
// 1) n1 is the topology coordinator
// 2) n1 schedules and executes a tablet repair with session id s1 for a tablet on n3 an n4.
// 3) n3 and n4 take and store the lock in _rs._repair_compaction_locks[s1]
// 4) n1 steps down before it executes locator::tablet_transition_stage::end_repair
// 5) n2 becomes the new topology coordinator
// 6) n2 runs locator::tablet_transition_stage::repair again
// 7) n3 and n4 try to take the lock again and hang since the lock is already taken
// To avoid the deadlock, we can throw in step 7 so that n2 will
// proceed to the end_repair stage and release the lock. After that,
// the scheduler could schedule the tablet repair again.
if (_rs._repair_compaction_locks.contains(gid)) {
auto msg = fmt::format("Tablet repair session={} table={} is in progress", _frozen_topology_guard, tid);
rlogger.info("{}", msg);
throw std::runtime_error(msg);
}
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
rlogger.debug("Disabling compaction for range={} for incremental repair", _range);
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
_rs._repair_compaction_locks[gid].push_back(std::move(lock_holder));
}
auto sstables = co_await table.take_storage_snapshot(_range);
_incremental_repair_meta.sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_schema, _range));
_incremental_repair_meta.sstables_repaired_at = sstables_repaired_at;
for (auto& snap : sstables) {
co_await coroutine::maybe_yield();
auto& sst = snap.sst;
auto bytes_on_disk = sst->bytes_on_disk();
if (!full && repair::is_repaired(sstables_repaired_at, sst)) {
rlogger.info("Skipped adding sst={} repaired_at={} sstables_repaired_at={} being_repaired={} session_id={} for incremental repair",
sst->toc_filename(), sst->get_stats_metadata().repaired_at, sstables_repaired_at, sst->being_repaired, _frozen_topology_guard);
_metrics.inc_sst_skipped_bytes += bytes_on_disk;
} else {
sst->mark_as_being_repaired(_frozen_topology_guard);
rlogger.info("Added sst={} repaired_at={} sstables_repaired_at={} being_repaired={} session_id={} for incremental repair",
sst->toc_filename(), sst->get_stats_metadata().repaired_at, sstables_repaired_at, sst->being_repaired, _frozen_topology_guard);
_incremental_repair_meta.sst_set->insert(sst);
_metrics.inc_sst_read_bytes += bytes_on_disk;
}
}
// Note: It is safe to re-enable compaction again because all
// sstables particiating the repair have been marked as
// being_repaired which will be ignored by the new unrepaired
// compaction.
reenablers_and_holders.cres.clear();
rlogger.info("Re-enabled compaction for range={} for incremental repair", _range);
co_await utils::get_local_injector().inject("wait_after_prepare_sstables_for_incremental_repair", utils::wait_for_message(5min));
}
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
// This reads rows from where the reader left last time into _row_buf
// _current_sync_boundary or _last_sync_boundary have no effect on the reader neither.
future<std::tuple<std::list<repair_row>, size_t>>
read_rows_from_disk(size_t cur_size) {
using value_type = std::tuple<std::list<repair_row>, size_t>;
size_t new_rows_size = 0;
std::list<repair_row> cur_rows;
std::exception_ptr ex;
if (!_repair_reader) {
// We are about to create a real evictable reader, so drop the fake
// reader (evicted or not), we don't need it anymore.
_db.local().get_reader_concurrency_semaphore().unregister_inactive_read(std::move(_fake_inactive_read_handle));
if (is_incremental_repair()) {
co_await prepare_sstables_for_incremental_repair();
}
_repair_reader.emplace(_db,
_db.local().find_column_family(_schema->id()),
_schema,
_permit,
_range,
_remote_sharder,
_master_node_shard_config.shard,
_seed,
std::invoke([this]() {
if (is_incremental_repair()) {
return repair_reader::read_strategy::incremental_repair;
}
if (_repair_master || _same_sharding_config || _is_tablet) {
rlogger.debug("repair_reader: meta_id={}, _repair_master={}, _same_sharding_config={},"
"read_strategy {} is chosen",
_repair_meta_id, _repair_master, _same_sharding_config,
repair_reader::read_strategy::local);
return repair_reader::read_strategy::local;
}
// multishard_filter means load all the data in the range and apply
// filter by master shard on top, discarding partitions from other shards.
// multishard_split means split the range into multiple subranges,
// each containing only the required data from the master shard.
// For situations with a sparse data set spread across numerous ranges,
// the overhead from continuously switching between these ranges,
// specifically during the fast_forward_to function on the multishard_reader,
// can become the main factor in performance.
// Similarly, with multishard_filter, reading all the partitions within
// the range can lead to the next_partition cost dominating the overall cost.
// The heuristic here chooses the strategy with minimal such cost.
// Note that with multishard_filter we don't read entire partitions which
// can be a huge waste. We only fill the buffer inside
// mutation_reader, if a partition is found to belong to the incorrect
// master shard, we call next_partition(), which effectively clears
// the buffer until the next partition is reached.
if (!_local_range_estimation) {
// this should not normally happen since the master
// calls get_estimated_partitions before get_sync_boundary
rlogger.warn("repair_reader: meta_id={}, no _local_range_estimation, "
"read_strategy {} is chosen",
_repair_meta_id, repair_reader::read_strategy::multishard_split);
return repair_reader::read_strategy::multishard_split;
}
const auto read_strategy =
_local_range_estimation->partitions_count <= _local_range_estimation->master_subranges_count
? repair_reader::read_strategy::multishard_filter
: repair_reader::read_strategy::multishard_split;
rlogger.debug("repair_reader: meta_id={}, _local_range_estimation: partitions_count={}, "
"master_subranges_count={}, read_strategy {} is chosen",
_repair_meta_id,
_local_range_estimation->partitions_count,
_local_range_estimation->master_subranges_count,
read_strategy);
return read_strategy;
}),
_compaction_time,
_incremental_repair_meta);
}
try {
while (cur_size < _max_row_buf_size) {
_gate.check();
mutation_fragment_opt mfopt = co_await _repair_reader->read_mutation_fragment();
if (!mfopt) {
co_await _repair_reader->on_end_of_stream();
break;
}
handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows);
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await _repair_reader->on_end_of_stream();
co_await coroutine::return_exception_ptr(std::move(ex));
}
_repair_reader->pause();
co_return value_type(std::move(cur_rows), new_rows_size);
}
future<> clear_row_buf() {
return utils::clear_gently(_row_buf);
}
future<> clear_working_row_buf() {
co_await utils::clear_gently(_working_row_buf);
_working_row_buf_combined_hash.clear();
}
// Read rows from disk until _max_row_buf_size of rows are filled into _row_buf.
// Calculate the combined checksum of the rows
// Calculate the total size of the rows in _row_buf
future<get_sync_boundary_response>
get_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
if (skipped_sync_boundary) {
_current_sync_boundary = skipped_sync_boundary;
co_await clear_row_buf();
}
// Here is the place we update _last_sync_boundary
rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary);
_last_sync_boundary = _current_sync_boundary;
co_await clear_working_row_buf();
size_t cur_size = co_await row_buf_size();
auto rows = co_await read_rows_from_disk(cur_size);
auto&& [new_rows, new_rows_size] = rows;
size_t new_rows_nr = new_rows.size();
_row_buf.splice(_row_buf.end(), new_rows);
repair_hash row_buf_combined_hash = co_await row_buf_csum();
size_t row_buf_bytes = co_await row_buf_size();
std::optional<repair_sync_boundary> sb_max;
if (!_row_buf.empty()) {
sb_max = _row_buf.back().boundary();
}
rlogger.debug("get_sync_boundary: Got nr={} rows, sb_max={}, row_buf_size={}, repair_hash={}, skipped_sync_boundary={}",
new_rows_nr, sb_max, row_buf_bytes, row_buf_combined_hash, skipped_sync_boundary);
co_return get_sync_boundary_response{sb_max, row_buf_combined_hash, row_buf_bytes, new_rows_size, new_rows_nr};
}
future<> move_row_buf_to_working_row_buf() {
if (_cmp(_row_buf.back().boundary(), *_current_sync_boundary) <= 0) {
// Fast path
_working_row_buf.swap(_row_buf);
co_return;
}
size_t sz = _row_buf.size();
for (auto it = _row_buf.rbegin(); it != _row_buf.rend(); ++it) {
// Move the rows > _current_sync_boundary to _working_row_buf
// Delete the rows > _current_sync_boundary from _row_buf
// Swap _working_row_buf and _row_buf so that _working_row_buf
// contains rows within (_last_sync_boundary,
// _current_sync_boundary], _row_buf contains rows within
// (_current_sync_boundary, ...]
repair_row& r = *it;
if (_cmp(r.boundary(), *_current_sync_boundary) <= 0) {
break;
}
_working_row_buf.push_front(std::move(r));
co_await coroutine::maybe_yield();
}
_row_buf.resize(_row_buf.size() - _working_row_buf.size());
_row_buf.swap(_working_row_buf);
if (sz != _working_row_buf.size() + _row_buf.size()) {
throw std::runtime_error(format("incorrect row_buf and working_row_buf size, before={}, after={} + {}",
sz, _working_row_buf.size(), _row_buf.size()));
}
}
// Move rows from <_row_buf> to <_working_row_buf> according to
// _last_sync_boundary and common_sync_boundary. That is rows within the
// (_last_sync_boundary, _current_sync_boundary] in <_row_buf> are moved
// into the <_working_row_buf>
future<get_combined_row_hash_response>
request_row_hashes(const std::optional<repair_sync_boundary>& common_sync_boundary) {
if (!common_sync_boundary) {
throw std::runtime_error("common_sync_boundary is empty");
}
_current_sync_boundary = common_sync_boundary;
rlogger.trace("SET _current_sync_boundary to {}, common_sync_boundary={}", _current_sync_boundary, common_sync_boundary);
_working_row_buf.clear();
_working_row_buf_combined_hash.clear();
if (_row_buf.empty()) {
co_return get_combined_row_hash_response();
}
co_await move_row_buf_to_working_row_buf();
for (repair_row& r : _working_row_buf) {
_working_row_buf_combined_hash.add(r.hash());
co_await coroutine::maybe_yield();
}
co_return get_combined_row_hash_response{_working_row_buf_combined_hash};
}
future<std::list<repair_row>>
copy_rows_from_working_row_buf() {
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
rows.push_back(r);
co_await coroutine::maybe_yield();
}
co_return rows;
}
future<std::list<repair_row>>
copy_rows_from_working_row_buf_within_set_diff(repair_hash_set set_diff) {
std::list<repair_row> rows;
for (const repair_row& r : _working_row_buf) {
if (set_diff.contains(r.hash())) {
rows.push_back(r);
}
co_await coroutine::maybe_yield();
}
co_return rows;
}
// Return rows in the _working_row_buf with hash within the given sef_diff
// Give a set of row hashes, return the corresponding rows
// If needs_all_rows is set, return all the rows in _working_row_buf, ignore the set_diff
future<std::list<repair_row>>
get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows = needs_all_rows_t::no) {
if (needs_all_rows) {
if (!_repair_master || _nr_peer_nodes == 1) {
return make_ready_future<std::list<repair_row>>(std::move(_working_row_buf));
}
return copy_rows_from_working_row_buf();
} else {
return copy_rows_from_working_row_buf_within_set_diff(std::move(set_diff));
}
}
future<> do_apply_rows(std::list<repair_row> row_diff, update_working_row_buf update_buf) {
auto sem_units = co_await get_units(_repair_writer->sem(), 1);
_repair_writer->create_writer();
while (!row_diff.empty()) {
repair_row& r = row_diff.front();
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
r.reset_mutation_fragment();
auto dk_with_hash = r.get_dk_with_hash();
co_await _repair_writer->do_write(std::move(dk_with_hash), std::move(mf));
row_diff.pop_front();
co_await coroutine::maybe_yield();
}
}
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
// Must run inside a seastar thread
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, locator::host_id from, update_working_row_buf update_buf,
update_peer_row_hash_sets update_hash_set, unsigned node_idx) {
if (rows.empty()) {
return;
}
auto row_diff = to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).get();
auto sz = get_repair_rows_size(row_diff).get();
stats().rx_row_bytes += sz;
stats().rx_row_nr += row_diff.size();
stats().rx_row_nr_peer[from] += row_diff.size();
if (update_buf) {
// Both row_diff and _working_row_buf and are ordered, merging
// two sored list to make sure the combination of row_diff
// and _working_row_buf are ordered.
utils::merge_to_gently(_working_row_buf, row_diff,
[this] (const repair_row& x, const repair_row& y) { return _cmp(x.boundary(), y.boundary()) < 0; });
for (auto& r : row_diff) {
thread::maybe_yield();
_working_row_buf_combined_hash.add(r.hash());
}
}
if (update_hash_set) {
_peer_row_hash_sets[node_idx] = row_diff
| std::views::transform([] (repair_row& r) { thread::maybe_yield(); return r.hash(); })
| std::ranges::to<repair_hash_set>();
}
// Repair rows in row_diff will be flushed to disk by flush_rows_in_working_row_buf,
// so we skip calling do_apply_rows here.
_dirty_on_master = is_dirty_on_master::yes;
// Clear gently to avoid stalls
utils::clear_gently(row_diff).get();
}
public:
future<const locator::token_metadata*> get_tm_for_small_table_optimization_check(const locator::token_metadata* tm) {
auto sem_units = co_await get_units(_small_table_optimization_tm_sem, 1);
if (!tm) {
throw std::runtime_error("The token_metadata pointer is nullptr");
}
if (_small_table_optimization_tm_calculated) {
co_return _small_table_optimization_tm.get();
}
std::unique_ptr<locator::token_metadata> tmptr;
if (_reason == streaming::stream_reason::bootstrap) {
auto myid = tm->get_my_id();
std::unordered_set<dht::token> bootstrap_tokens;
for (auto [token, host_id] : tm->get_bootstrap_tokens()) {
if (myid == host_id) {
bootstrap_tokens.insert(token);
}
}
if (!bootstrap_tokens.empty()) {
// The node is a bootstrapping node
tmptr = std::make_unique<locator::token_metadata>(co_await tm->clone_only_token_map());
co_await tmptr->update_normal_tokens(bootstrap_tokens, myid);
rlogger.debug("small_table_optimization: Got bootstrap tokens={}", bootstrap_tokens);
}
} else if (_reason == streaming::stream_reason::decommission) {
auto myid = tm->get_my_id();
auto& leaving = tm->get_leaving_endpoints();
if (!leaving.empty() && leaving.contains(myid)) {
// This node is leaving
tmptr = std::make_unique<locator::token_metadata>(co_await tm->clone_after_all_left());
rlogger.debug("small_table_optimization: Got leaving node={}", leaving);
}
}
_small_table_optimization_tm = std::move(tmptr);
_small_table_optimization_tm_calculated = true;
co_return _small_table_optimization_tm.get();
}
public:
// Must run inside a seastar thread
void flush_rows_in_working_row_buf(std::optional<small_table_optimization_params> small_table_optimization) {
if (_dirty_on_master) {
_dirty_on_master = is_dirty_on_master::no;
} else {
return;
}
if (small_table_optimization) {
flush_rows(_schema, _working_row_buf, _repair_writer, small_table_optimization, this);
} else {
flush_rows(_schema, _working_row_buf, _repair_writer);
}
}
private:
future<>
apply_rows_on_follower(repair_rows_on_wire rows) {
if (rows.empty()) {
co_return;
}
std::list<repair_row> row_diff = co_await to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher);
co_await do_apply_rows(std::move(row_diff), update_working_row_buf::no);
}
struct fragments_limiter {
sstring ks;
sstring cf;
size_t max_fragments_size;
size_t max_fragments_nr;
size_t fragments_size = 0;
size_t fragments_nr = 0;
// Max fragments size and number are allowed in a single repair_row_on_wire
// which is sent as a rpc stream message
static constexpr size_t _max_fragments_size = 240 * 1024;
static constexpr size_t _max_fragments_nr = 32 * 1024;
fragments_limiter(sstring keyspace, sstring table)
: ks(keyspace)
, cf(table) {
max_fragments_size = _max_fragments_size;
max_fragments_nr = _max_fragments_nr;
auto inject_max_fragments_size = utils::get_local_injector().inject_parameter<uint32_t>("row_level_repair_max_fragments_size");
if (inject_max_fragments_size) {
max_fragments_size = *inject_max_fragments_size;
rlogger.info("Set inject_max_fragments_size {} for table={}.{}", max_fragments_size, ks, cf);
}
auto inject_max_fragments_nr = utils::get_local_injector().inject_parameter<uint32_t>("row_level_repair_max_fragments_nr");
if (inject_max_fragments_nr) {
max_fragments_nr = *inject_max_fragments_nr;
rlogger.info("Set inject_max_fragments_nr {} for table={}.{}", max_fragments_nr, ks, cf);
}
}
void reset(size_t size) {
fragments_size = size;
fragments_nr = 1;
};
void add(size_t size) {
fragments_size += size;
++fragments_nr;
}
bool no_split(size_t size) {
return (fragments_size + size) < max_fragments_size && (fragments_nr + 1) < max_fragments_nr;
}
};
future<repair_rows_on_wire> to_repair_rows_on_wire(std::list<repair_row> row_list) {
lw_shared_ptr<const decorated_key_with_hash> last_dk_with_hash;
repair_rows_on_wire rows;
size_t row_bytes = co_await get_repair_rows_size(row_list);
_metrics.tx_row_nr += row_list.size();
_metrics.tx_row_bytes += row_bytes;
fragments_limiter limiter(_schema->ks_name(), _schema->cf_name());
auto msg_split_feature = bool(_db.local().features().repair_msg_split);
while (!row_list.empty()) {
repair_row r = std::move(row_list.front());
row_list.pop_front();
const auto& dk_with_hash = r.get_dk_with_hash();
auto mf = std::move(r.get_frozen_mutation());
const auto size = mf.representation().size();
bool same_pk = !rows.empty() && last_dk_with_hash && dk_with_hash->dk.tri_compare(*_schema, last_dk_with_hash->dk) == 0;
if (same_pk && (!msg_split_feature || limiter.no_split(size))) {
limiter.add(size);
rows.back().push_mutation_fragment(std::move(mf));
} else {
last_dk_with_hash = dk_with_hash;
limiter.reset(size);
auto r = same_pk && msg_split_feature ? repair_row_on_wire({std::move(mf)}) : repair_row_on_wire(dk_with_hash->dk.key(), {std::move(mf)});
rows.push_back(std::move(r));
}
co_await coroutine::maybe_yield();
}
co_return rows;
};
public:
// RPC API
// Return the hashes of the rows in _working_row_buf
future<repair_hash_set>
get_full_row_hashes(locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_full_row_hashes_handler();
}
repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, remote_node,
_repair_meta_id, dst_cpu_id);
rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size());
_metrics.rx_hashes_nr += hashes.size();
stats().rx_hashes_nr += hashes.size();
stats().rpc_call_nr++;
co_return hashes;
}
private:
future<> get_full_row_hashes_source_op(
lw_shared_ptr<repair_hash_set> current_hashes,
locator::host_id remote_node,
unsigned node_idx,
rpc::source<repair_hash_with_cmd>& source) {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("get_full_row_hashes: Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", remote_node, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_hashes->insert(hash_cmd.hash);
} else if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set) {
co_return;
} else if (hash_cmd.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_full_row_hashes: Peer failed to process");
} else {
throw std::runtime_error("get_full_row_hashes: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_get_full_row_hashes.mark_source_closed(node_idx);
throw std::runtime_error("get_full_row_hashes: Got unexpected end of stream");
}
future<> get_full_row_hashes_sink_op(rpc::sink<repair_stream_cmd>& sink) {
std::exception_ptr ep;
try {
co_await sink(repair_stream_cmd::get_full_row_hashes);
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
future<repair_hash_set>
get_full_row_hashes_with_rpc_stream(locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_full_row_hashes_handler();
}
auto current_hashes = make_lw_shared<repair_hash_set>();
auto [sink, source] = co_await _sink_source_for_get_full_row_hashes.get_sink_source(remote_node, node_idx, dst_cpu_id);
co_await coroutine::all(
[&] { return get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source); },
[&] { return get_full_row_hashes_sink_op(sink); }
);
stats().rx_hashes_nr += current_hashes->size();
_metrics.rx_hashes_nr += current_hashes->size();
co_return std::move(*current_hashes);
}
// RPC handler
future<repair_hash_set>
get_full_row_hashes_handler() {
auto gate_held = _gate.hold();
co_return co_await working_row_hashes();
}
// RPC API
// Return the combined hashes of the current working row buf
future<get_combined_row_hash_response>
get_combined_row_hash(std::optional<repair_sync_boundary> common_sync_boundary, locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_combined_row_hash_handler(common_sync_boundary);
}
get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, remote_node,
_repair_meta_id, common_sync_boundary, dst_cpu_id);
stats().rpc_call_nr++;
stats().rx_hashes_nr++;
_metrics.rx_hashes_nr++;
co_return resp;
}
// RPC handler
future<get_combined_row_hash_response>
get_combined_row_hash_handler(std::optional<repair_sync_boundary> common_sync_boundary) {
// We can not call this function twice. The good thing is we do not use
// retransmission at messaging_service level, so no message will be retransmitted.
rlogger.trace("Calling get_combined_row_hash_handler");
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_return co_await request_row_hashes(common_sync_boundary);
}
// RPC API
future<>
repair_row_level_start(locator::host_id remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return;
}
stats().rpc_call_nr++;
// Even though remote partitioner name is ignored in the current version of
// repair, we still have to send something to keep compatibility with nodes
// that run older versions. This will make it possible to run mixed cluster.
// Murmur3 is appropriate because that's the only supported partitioner at
// the time this change is introduced.
sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner";
rpc::optional<repair_row_level_start_response> resp =
co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, remote_node,
_repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed,
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb,
remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id, _frozen_topology_guard, _repaired_at, _incremental_mode);
if (resp && resp->status == repair_row_level_start_status::no_such_column_family) {
throw replica::no_such_column_family(ks_name, cf_name);
} else {
co_return;
}
}
// RPC handler
static future<repair_row_level_start_response>
repair_row_level_start_handler(repair_service& repair, locator::host_id from_id, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason,
gc_clock::time_point compaction_time, abort_source& as, service::frozen_topology_guard topo_guard, std::optional<int64_t> repaired_at, locator::tablet_repair_incremental_mode incremental_mode) {
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
repair.my_host_id(), from_id, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
try {
// Trigger read barrier to ensure that session_id is visible.
auto& mm = repair.get_migration_manager();
co_await mm.get_group0_barrier().trigger(mm.get_abort_source());
co_await repair.insert_repair_meta(from_id, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as, topo_guard, repaired_at, incremental_mode);
co_return repair_row_level_start_response{repair_row_level_start_status::ok};
} catch (replica::no_such_column_family&) {
co_return repair_row_level_start_response{repair_row_level_start_status::no_such_column_family};
}
}
// RPC API
future<> repair_row_level_stop(locator::host_id remote_node, sstring ks_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id, bool mark_as_repaired) {
if (remote_node == myhostid()) {
co_await stop();
if (mark_as_repaired) {
co_await mark_sstable_as_repaired();
}
co_return;
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, remote_node,
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id, mark_as_repaired);
}
// RPC handler
static future<>
repair_row_level_stop_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, bool mark_as_repaired) {
rlogger.debug("<<< Finished Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}",
rs.my_host_id(), from, repair_meta_id, ks_name, cf_name, range);
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::row_level_stop_started);
co_await rs.remove_repair_meta(from, repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), mark_as_repaired);
rm->set_repair_state_for_local_node(repair_state::row_level_stop_finished);
}
// RPC API
future<uint64_t> repair_get_estimated_partitions(locator::host_id remote_node, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_estimated_partitions();
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, remote_node, _repair_meta_id, dst_cpu_id);
}
// RPC handler
static future<uint64_t> repair_get_estimated_partitions_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id) {
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_estimated_partitions_started);
uint64_t partitions = co_await rm->get_estimated_partitions();
rm->set_repair_state_for_local_node(repair_state::get_estimated_partitions_finished);
co_return partitions;
}
// RPC API
future<> repair_set_estimated_partitions(locator::host_id remote_node, uint64_t estimated_partitions, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await set_estimated_partitions(estimated_partitions);
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, remote_node, _repair_meta_id, estimated_partitions, dst_cpu_id);
}
// RPC handler
static future<> repair_set_estimated_partitions_handler(repair_service& rs, locator::host_id from, uint32_t repair_meta_id, uint64_t estimated_partitions) {
auto rm = rs.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::set_estimated_partitions_started);
co_await rm->set_estimated_partitions(estimated_partitions);
rm->set_repair_state_for_local_node(repair_state::set_estimated_partitions_finished);
}
// RPC API
// Return the largest sync point contained in the _row_buf , current _row_buf checksum, and the _row_buf size
future<get_sync_boundary_response>
get_sync_boundary(locator::host_id remote_node, std::optional<repair_sync_boundary> skipped_sync_boundary, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
co_return co_await get_sync_boundary_handler(skipped_sync_boundary);
}
stats().rpc_call_nr++;
co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, remote_node, _repair_meta_id, skipped_sync_boundary, dst_cpu_id);
}
// RPC handler
future<get_sync_boundary_response>
get_sync_boundary_handler(std::optional<repair_sync_boundary> skipped_sync_boundary) {
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_return co_await get_sync_boundary(std::move(skipped_sync_boundary));
}
// RPC API
// Return rows in the _working_row_buf with hash within the given sef_diff
// Must run inside a seastar thread
void get_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (needs_all_rows || !set_diff.empty()) {
if (remote_node == myhostid()) {
return;
}
if (needs_all_rows) {
set_diff.clear();
} else {
stats().tx_hashes_nr += set_diff.size();
_metrics.tx_hashes_nr += set_diff.size();
}
stats().rpc_call_nr++;
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, remote_node,
_repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get();
if (!rows.empty()) {
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx);
}
}
}
// Must run inside a seastar thread
void get_row_diff_and_update_peer_row_hash_sets(locator::host_id remote_node, unsigned node_idx, shard_id dst_cpu_id) {
if (remote_node == myhostid()) {
return;
}
stats().rpc_call_nr++;
repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, remote_node,
_repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get();
if (!rows.empty()) {
apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx);
}
}
private:
// Must run inside a seastar thread
void get_row_diff_source_op(
update_peer_row_hash_sets update_hash_set,
locator::host_id remote_node,
unsigned node_idx,
rpc::sink<repair_hash_with_cmd>& sink,
rpc::source<repair_row_on_wire_with_cmd>& source) {
repair_rows_on_wire current_rows;
for (;;) {
std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = source().get();
if (row_opt) {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff: Inject sender error in source loop");
}
auto row = std::move(std::get<0>(row_opt.value()));
if (row.cmd == repair_stream_cmd::row_data) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with data");
current_rows.push_back(std::move(row.row));
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
rlogger.trace("get_row_diff: Got repair_row_on_wire with nullopt");
apply_rows_on_master_in_thread(std::move(current_rows), remote_node, update_working_row_buf::yes, update_hash_set, node_idx);
break;
} else if (row.cmd == repair_stream_cmd::error) {
throw std::runtime_error("get_row_diff: Peer failed to process");
} else {
throw std::runtime_error("get_row_diff: Got unexpected repair_stream_cmd");
}
} else {
_sink_source_for_get_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("get_row_diff: Got unexpected end of stream");
}
}
}
future<> get_row_diff_sink_op(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
rpc::sink<repair_hash_with_cmd>& sink,
locator::host_id remote_node) {
std::exception_ptr ep;
try {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff: Inject sender error in sink loop");
}
if (needs_all_rows) {
rlogger.trace("get_row_diff: request with repair_stream_cmd::needs_all_rows");
co_await sink(repair_hash_with_cmd{repair_stream_cmd::needs_all_rows, repair_hash()});
co_await sink.flush();
co_return;
}
for (const repair_hash& hash : set_diff) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
// Must run inside a seastar thread
void get_row_diff_with_rpc_stream(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
update_peer_row_hash_sets update_hash_set,
locator::host_id remote_node,
unsigned node_idx,
shard_id dst_cpu_id) {
if (needs_all_rows || !set_diff.empty()) {
if (remote_node == myhostid()) {
return;
}
if (needs_all_rows) {
set_diff.clear();
} else {
stats().tx_hashes_nr += set_diff.size();
_metrics.tx_hashes_nr += set_diff.size();
}
stats().rpc_call_nr++;
auto f = _sink_source_for_get_row_diff.get_sink_source(remote_node, node_idx, dst_cpu_id).get();
rpc::sink<repair_hash_with_cmd>& sink = std::get<0>(f);
rpc::source<repair_row_on_wire_with_cmd>& source = std::get<1>(f);
auto sink_op = get_row_diff_sink_op(std::move(set_diff), needs_all_rows, sink, remote_node);
get_row_diff_source_op(update_hash_set, remote_node, node_idx, sink, source);
sink_op.get();
}
}
// RPC handler
future<repair_rows_on_wire> get_row_diff_handler(repair_hash_set set_diff, needs_all_rows_t needs_all_rows) {
auto gate_held = _gate.hold();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
co_return co_await to_repair_rows_on_wire(std::move(row_diff));
}
// RPC API
// Send rows in the _working_row_buf with hash within the given sef_diff
future<> put_row_diff(repair_hash_set set_diff, needs_all_rows_t needs_all_rows, locator::host_id remote_node, shard_id dst_cpu_id) {
if (!set_diff.empty()) {
if (remote_node == myhostid()) {
co_return;
}
size_t sz = set_diff.size();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, remote_node, _repair_meta_id, std::move(rows), dst_cpu_id);
}
}
private:
future<> put_row_diff_source_op(
locator::host_id remote_node,
unsigned node_idx,
rpc::source<repair_stream_cmd>& source) {
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
repair_stream_cmd status = std::move(std::get<0>(status_opt.value()));
rlogger.trace("put_row_diff: Got status code from follower={} for put_row_diff, status={}", remote_node, int(status));
if (status == repair_stream_cmd::put_rows_done) {
co_return;
} else if (status == repair_stream_cmd::error) {
throw std::runtime_error(format("put_row_diff: Repair follower={} failed in put_row_diff handler, status={}", remote_node, int(status)));
} else {
throw std::runtime_error("put_row_diff: Got unexpected repair_stream_cmd");
}
}
_sink_source_for_put_row_diff.mark_source_closed(node_idx);
throw std::runtime_error("put_row_diff: Got unexpected end of stream");
}
future<> put_row_diff_sink_op(
repair_rows_on_wire rows,
rpc::sink<repair_row_on_wire_with_cmd>& sink,
locator::host_id remote_node) {
std::exception_ptr ep;
try {
for (repair_row_on_wire& row : rows) {
rlogger.trace("put_row_diff: send row");
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)});
}
rlogger.trace("put_row_diff: send empty row");
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
rlogger.trace("put_row_diff: send done");
co_await sink.flush();
} catch (...) {
ep = std::current_exception();
}
if (ep) {
co_await sink.close();
std::rethrow_exception(ep);
}
}
public:
future<> put_row_diff_with_rpc_stream(
repair_hash_set set_diff,
needs_all_rows_t needs_all_rows,
locator::host_id remote_node, unsigned node_idx,
std::optional<small_table_optimization_params> small_table_optimization,
shard_id dst_cpu_id) {
if (set_diff.empty()) {
co_return;
}
if (remote_node == myhostid()) {
co_return;
}
size_t sz = set_diff.size();
std::list<repair_row> row_diff = co_await get_row_diff(std::move(set_diff), needs_all_rows);
if (row_diff.size() != sz) {
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
}
if (small_table_optimization) {
auto& strat = small_table_optimization->erm->get_replication_strategy();
const auto* tm = &small_table_optimization->erm->get_token_metadata();
const auto* tmptr = co_await get_tm_for_small_table_optimization_check(tm);
if (tmptr) {
tm = tmptr;
}
std::list<repair_row> tmp;
for (auto& row : row_diff) {
repair_row r = std::move(row);
const auto& dk = r.get_dk_with_hash()->dk;
auto eps = co_await strat.calculate_natural_endpoints(dk.token(), *tm);
if (eps.contains(remote_node)) {
tmp.push_back(std::move(r));
} else {
rlogger.trace("master: put : ignore row, token={}", dk.token());
}
}
row_diff = std::move(tmp);
}
size_t row_bytes = co_await get_repair_rows_size(row_diff);
stats().tx_row_nr += row_diff.size();
stats().tx_row_nr_peer[remote_node] += row_diff.size();
stats().tx_row_bytes += row_bytes;
stats().rpc_call_nr++;
repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff));
auto [sink, source] = co_await _sink_source_for_put_row_diff.get_sink_source(remote_node, node_idx, dst_cpu_id);
auto source_op = [&] () mutable -> future<> {
co_await put_row_diff_source_op(remote_node, node_idx, source);
};
auto sink_op = [&] () mutable -> future<> {
co_await put_row_diff_sink_op(std::move(rows), sink, remote_node);
};
co_await coroutine::all(source_op, sink_op);
}
// RPC handler
future<> put_row_diff_handler(repair_rows_on_wire rows) {
if (_rs.is_disabled() && _is_eligible_to_repair_rejection) {
co_await coroutine::return_exception(std::runtime_error("Repair service is disabled"));
}
auto gate_held = _gate.hold();
auto& cf = _db.local().find_column_family(_schema->id());
cf.update_off_strategy_trigger();
co_await apply_rows_on_follower(std::move(rows));
}
public:
future<> mark_sstable_as_repaired() {
auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired();
if (!_incremental_repair_meta.sst_set && !sstables.empty()) {
co_return;
}
auto& table = _db.local().find_column_family(_schema->id());
auto& cm = table.get_compaction_manager();
int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1;
auto modifier = [repaired_at] (sstables::sstable& new_sst) {
new_sst.update_repaired_at(repaired_at);
};
std::unordered_map<compaction::compaction_group_view*, std::vector<sstables::shared_sstable>> sstables_by_group;
auto add_sstable = [&] (const sstables::shared_sstable& sst) {
if (sst->should_update_repaired_at(repaired_at)) {
auto& view = table.compaction_group_view_for_sstable(sst);
sstables_by_group[&view].push_back(sst);
}
};
if (_incremental_repair_meta.sst_set) {
_incremental_repair_meta.sst_set->for_each_sstable(add_sstable);
}
for (auto& sst : sstables) {
add_sstable(sst);
}
for (auto& [view, ssts] : sstables_by_group) {
for (auto& sst : ssts) {
rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair",
sst->toc_filename(), repaired_at, sst->being_repaired);
}
auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts),
sstables::component_type::Statistics, modifier);
// remove the old sstables from incremental repair meta and add the new ones
for (auto& ss : rewritten_sstables) {
bool erased = _incremental_repair_meta.sst_set->erase(ss.first);
if (erased) {
_incremental_repair_meta.sst_set->insert(ss.second);
}
auto it = sstables.find(ss.first);
if (it != sstables.end()) {
sstables.erase(it);
sstables.insert(ss.second);
}
}
}
}
};
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization, repair_meta* rm) {
auto cmp = position_in_partition::tri_compare(*s);
lw_shared_ptr<mutation_fragment> last_mf;
lw_shared_ptr<const decorated_key_with_hash> last_dk;
const auto* tm = small_table_optimization ? &small_table_optimization->erm->get_token_metadata() : nullptr;
if (small_table_optimization && rm) {
const auto* tmptr = rm->get_tm_for_small_table_optimization_check(tm).get();
if (tmptr) {
tm = tmptr;
}
}
for (auto& r : rows) {
thread::maybe_yield();
if (!r.dirty_on_master()) {
continue;
}
const auto& dk = r.get_dk_with_hash()->dk;
if (small_table_optimization) {
// Check if the token is owned by the node
auto eps = small_table_optimization->erm->get_replication_strategy().calculate_natural_endpoints(dk.token(), *tm).get();
if (!eps.contains(small_table_optimization->erm->get_topology().my_host_id())) {
rlogger.trace("master: ignore row, token={}", dk.token());
continue;
}
}
writer->create_writer();
auto mf = r.get_mutation_fragment_ptr();
if (last_mf && last_dk &&
cmp(last_mf->position(), mf->position()) == 0 &&
dk.tri_compare(*s, last_dk->dk) == 0 &&
last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
last_mf = mf;
last_dk = r.get_dk_with_hash();
}
r.reset_mutation_fragment();
}
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
}
// Must run inside a seastar thread
static repair_hash_set
get_set_diff(const repair_hash_set& x, const repair_hash_set& y) {
repair_hash_set set_diff;
// Note std::set_difference needs x and y are sorted.
std::copy_if(x.begin(), x.end(), std::inserter(set_diff, set_diff.end()),
[&y] (auto& item) { thread::maybe_yield(); return !y.contains(item); });
return set_diff;
}
static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt);
static future<> repair_get_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
rlogger.trace("Got repair_hash_with_cmd from peer={}, hash={}, cmd={}", from, hash_cmd.hash, int(hash_cmd.cmd));
if (hash_cmd.cmd == repair_stream_cmd::hash_data) {
current_set_diff.insert(hash_cmd.hash);
return make_ready_future<>();
} else {
return repair_get_row_diff_with_rpc_stream_process_op_slow_path(repair, from, src_cpu_id, dst_cpu_id, repair_meta_id, std::move(sink), std::move(source), error, current_set_diff, std::move(hash_cmd_opt));
}
}
static future<> repair_get_row_diff_with_rpc_stream_process_op_slow_path(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source,
bool &error,
repair_hash_set& current_set_diff,
std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt) {
repair_hash_with_cmd hash_cmd = std::get<0>(hash_cmd_opt.value());
if (hash_cmd.cmd == repair_stream_cmd::end_of_current_hash_set || hash_cmd.cmd == repair_stream_cmd::needs_all_rows) {
if (inject_rpc_stream_error) {
throw std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop");
}
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
_metrics.rx_hashes_nr += current_set_diff.size();
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(current_set_diff)));
repair_rows_on_wire rows_on_wire = co_await repair.invoke_on(dst_cpu_id, [&] (repair_service& local_repair) -> future<repair_rows_on_wire> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_started);
if (fp.get_owner_shard() == this_shard_id()) {
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
} else {
repair_rows_on_wire rows = co_await rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows));
rm->set_repair_state_for_local_node(repair_state::get_row_diff_with_rpc_stream_finished);
co_return rows;
}
});
if (rows_on_wire.empty()) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
} else {
for (repair_row_on_wire& row : rows_on_wire) {
auto cmd = repair_row_on_wire_with_cmd{repair_stream_cmd::row_data, std::move(row)};
co_await sink(cmd);
co_await cmd.row.clear_gently();
}
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::end_of_current_rows, repair_row_on_wire()});
}
co_await sink.flush();
co_return;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<repair_rows_on_wire> clone_gently(const repair_rows_on_wire& rows) {
repair_rows_on_wire cloned;
for (const auto& row : rows) {
cloned.push_back(row);
co_await seastar::coroutine::maybe_yield();
}
co_return cloned;
}
static future<> repair_put_row_diff_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_stream_cmd> sink,
rpc::source<repair_row_on_wire_with_cmd> source,
bool& error,
repair_rows_on_wire& current_rows,
std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt) {
auto row = std::move(std::get<0>(row_opt.value()));
if (row.cmd == repair_stream_cmd::row_data) {
rlogger.trace("Got repair_rows_on_wire from peer={}, got row_data", from);
current_rows.push_back(std::move(row.row));
co_return;
} else if (row.cmd == repair_stream_cmd::end_of_current_rows) {
rlogger.trace("Got repair_rows_on_wire from peer={}, got end_of_current_rows", from);
auto fp = make_foreign(std::make_unique<repair_rows_on_wire>(std::move(current_rows)));
co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id, fp = std::move(fp)] (repair_service& local_repair) mutable -> future<> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_started);
if (fp.get_owner_shard() == this_shard_id()) {
co_await rm->put_row_diff_handler(std::move(*fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
} else {
// Gently clone to avoid copy stall on destination shard
repair_rows_on_wire local_rows = co_await clone_gently(*fp);
co_await seastar::when_all_succeed(rm->put_row_diff_handler(std::move(local_rows)), utils::clear_gently(fp));
rm->set_repair_state_for_local_node(repair_state::put_row_diff_with_rpc_stream_finished);
}
});
co_await sink(repair_stream_cmd::put_rows_done);
co_await sink.flush();
co_return;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<stop_iteration> repair_get_full_row_hashes_with_rpc_stream_process_op(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source,
bool &error,
std::optional<std::tuple<repair_stream_cmd>> status_opt) {
repair_stream_cmd status = std::get<0>(status_opt.value());
rlogger.trace("Got register_repair_get_full_row_hashes_with_rpc_stream from peer={}, status={}", from, int(status));
if (status == repair_stream_cmd::get_full_row_hashes) {
repair_hash_set hashes = co_await repair.invoke_on(dst_cpu_id, [from, repair_meta_id] (repair_service& local_repair) -> future<repair_hash_set> {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
repair_hash_set hashes = co_await rm->get_full_row_hashes_handler();
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
_metrics.tx_hashes_nr += hashes.size();
co_return hashes;
});
for (const repair_hash& hash : hashes) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::hash_data, hash});
}
co_await sink(repair_hash_with_cmd{repair_stream_cmd::end_of_current_hash_set, repair_hash()});
co_await sink.flush();
co_return stop_iteration::no;
} else {
throw std::runtime_error("Got unexpected repair_stream_cmd");
}
}
static future<> repair_get_row_diff_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_row_on_wire_with_cmd> sink,
rpc::source<repair_hash_with_cmd> source) {
bool error = false;
repair_hash_set current_set_diff = repair_hash_set();
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_hash_with_cmd>> hash_cmd_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
co_await repair_get_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_set_diff,
std::move(hash_cmd_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_put_row_diff_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_stream_cmd> sink,
rpc::source<repair_row_on_wire_with_cmd> source) {
std::exception_ptr outer_exception;
bool error = false;
repair_rows_on_wire current_rows = repair_rows_on_wire();
try {
while (std::optional<std::tuple<repair_row_on_wire_with_cmd>> row_opt = co_await source()) {
std::exception_ptr ep;
try {
if (error) {
continue;
}
co_await repair_put_row_diff_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
current_rows,
std::move(row_opt));
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_put_row_diff_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_stream_cmd::error);
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
sharded<repair_service>& repair,
locator::host_id from,
uint32_t src_cpu_id,
uint32_t dst_cpu_id,
uint32_t repair_meta_id,
rpc::sink<repair_hash_with_cmd> sink,
rpc::source<repair_stream_cmd> source) {
std::exception_ptr outer_exception;
try {
while (std::optional<std::tuple<repair_stream_cmd>> status_opt = co_await source()) {
bool error = false;
std::exception_ptr ep;
try {
if (error) {
continue;
}
auto stop = co_await repair_get_full_row_hashes_with_rpc_stream_process_op(repair, from,
src_cpu_id,
dst_cpu_id,
repair_meta_id,
sink,
source,
error,
std::move(status_opt));
if (stop) {
break;
}
} catch (...) {
ep = std::current_exception();
rlogger.warn("repair_get_full_row_hashes_with_rpc_stream_handler: from={} repair_meta_id={} error={}", from, repair_meta_id, ep);
error = true;
}
if (ep) {
co_await sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()});
}
}
} catch (...) {
outer_exception = std::current_exception();
}
co_await sink.close();
if (outer_exception) {
std::rethrow_exception(outer_exception);
}
}
future<repair_update_system_table_response> repair_service::repair_update_system_table_handler(gms::inet_address from, repair_update_system_table_request req) {
rlogger.debug("repair[{}]: Got repair_update_system_table_request from node={}, range={}, repair_time={}", req.repair_uuid, from, req.range, req.repair_time);
auto& db = this->get_db();
bool is_valid_range = true;
if (req.range.start()) {
if (req.range.start()->is_inclusive()) {
is_valid_range = false;
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
is_valid_range = false;
}
}
if (!is_valid_range) {
throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range));
}
co_await db.invoke_on_all([&req] (replica::database& local_db) {
auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state();
return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time);
});
db::system_keyspace::repair_history_entry ent;
ent.id = req.repair_uuid;
ent.table_uuid = req.table_uuid;
ent.ts = db_clock::from_time_t(gc_clock::to_time_t(req.repair_time));
ent.ks = req.keyspace_name;
ent.cf = req.table_name;
auto range_start = req.range.start() ? req.range.start()->value() : dht::minimum_token();
ent.range_start = dht::token::to_int64(range_start);
auto range_end = req.range.end() ? req.range.end()->value() : dht::maximum_token();
ent.range_end = dht::token::to_int64(range_end);
co_await _sys_ks.local().update_repair_history(std::move(ent));
co_return repair_update_system_table_response();
}
future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) {
if (this_shard_id() != 0) {
co_return co_await container().invoke_on(0, [&] (auto& rs) {
return rs.repair_flush_hints_batchlog_handler(from, std::move(req));
});
}
rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} hints_timeout={}s batchlog_timeout={}s",
req.repair_uuid, from, req.hints_timeout.count(), req.batchlog_timeout.count());
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
bool updated = false;
auto now = gc_clock::now();
auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms());
auto cache_disabled = cache_time == std::chrono::milliseconds(0);
auto flush_time = now;
db::all_batches_replayed all_replayed = db::all_batches_replayed::yes;
if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) {
// Empty targets meants all nodes
db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::vector<locator::host_id>{});
lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout;
try {
bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized");
if (!_bm.local_is_initialized() || bm_throw) {
throw std::runtime_error("Backlog manager isn't initialized");
}
co_await coroutine::all(
[this, &from, &req, &sync_point, &deadline] () -> future<> {
rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline);
rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
co_return;
},
[this, now, cache_disabled, &flush_time, &cache_time, &from, &req, &all_replayed] () -> future<> {
rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from);
auto last_replay = _bm.local().get_last_replay();
bool issue_flush = false;
if (cache_disabled) {
issue_flush = true;
flush_time = now;
} else {
if (now < last_replay) {
flush_time = now;
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_now", fmt::to_string(flush_time));
} else if (now - last_replay < cache_time) {
// Skip the replay request since last_replay is already
// updated since last _flush_hints_batchlog_time
// update. It is fine to use last_replay for the hint
// flush time because last_replay (batchlog replay
// time) is smaller than now (hint flush time).
flush_time = last_replay;
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_last_replay", fmt::to_string(flush_time));
} else {
// Issue the replay so the last_replay will be updated
// to bigger than now after the call.
issue_flush = true;
flush_time = now;
}
}
if (issue_flush) {
all_replayed = co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no);
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time));
}
rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={} all_replayed={}", req.repair_uuid, from, issue_flush, all_replayed);
}
);
if (!all_replayed) {
throw std::runtime_error("Not all batchlog entries were replayed");
}
} catch (...) {
rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}: {}",
req.repair_uuid, from, std::current_exception());
throw;
}
co_await container().invoke_on_all([flush_time] (repair_service& rs) {
rs._flush_hints_batchlog_time = flush_time;
});
updated = true;
} else {
utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time));
}
auto duration = std::chrono::duration<float>(gc_clock::now() - now);
rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}",
req.repair_uuid, from, updated, _flush_hints_batchlog_time, cache_time, duration);
repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time };
co_return resp;
}
future<> repair_service::init_ms_handlers() {
auto& ms = this->_messaging;
ms.register_repair_get_row_diff_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_hash_with_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_get_row_diff_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_row_on_wire_with_cmd>>(sink);
});
ms.register_repair_put_row_diff_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_put_row_diff_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_stream_cmd>>(sink);
});
ms.register_repair_get_full_row_hashes_with_rpc_stream([this, &ms] (const rpc::client_info& cinfo, uint64_t repair_meta_id, rpc::source<repair_stream_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
// Start a new fiber.
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
(void)repair_get_full_row_hashes_with_rpc_stream_handler(container(), from, src_cpu_id, shard, repair_meta_id, sink, source).handle_exception(
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
rlogger.warn("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
});
return make_ready_future<rpc::sink<repair_hash_with_cmd>>(sink);
});
ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
return container().invoke_on(shard, [from, repair_meta_id] (repair_service& local_repair) {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_started);
return rm->get_full_row_hashes_handler().then([rm] (repair_hash_set hashes) {
rm->set_repair_state_for_local_node(repair_state::get_full_row_hashes_finished);
_metrics.tx_hashes_nr += hashes.size();
return hashes;
});
}) ;
});
ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
std::optional<repair_sync_boundary> common_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id,
common_sync_boundary = std::move(common_sync_boundary)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
_metrics.tx_hashes_nr++;
rm->set_repair_state_for_local_node(repair_state::get_combined_row_hash_started);
return rm->get_combined_row_hash_handler(std::move(common_sync_boundary)).then([rm] (get_combined_row_hash_response resp) {
rm->set_repair_state_for_local_node(repair_state::get_combined_row_hash_finished);
return resp;
});
});
});
ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
std::optional<repair_sync_boundary> skipped_sync_boundary, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
return container().invoke_on(shard, [from, repair_meta_id,
skipped_sync_boundary = std::move(skipped_sync_boundary)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_sync_boundary_started);
return rm->get_sync_boundary_handler(std::move(skipped_sync_boundary)).then([rm] (get_sync_boundary_response resp) {
rm->set_repair_state_for_local_node(repair_state::get_sync_boundary_finished);
return resp;
});
});
});
ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
repair_hash_set set_diff, bool needs_all_rows, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
_metrics.rx_hashes_nr += set_diff.size();
auto fp = make_foreign(std::make_unique<repair_hash_set>(std::move(set_diff)));
return container().invoke_on(shard, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::get_row_diff_started);
if (fp.get_owner_shard() == this_shard_id()) {
return rm->get_row_diff_handler(std::move(*fp), repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_finished);
return rows;
});
} else {
return rm->get_row_diff_handler(*fp, repair_meta::needs_all_rows_t(needs_all_rows)).then([rm] (repair_rows_on_wire rows) {
rm->set_repair_state_for_local_node(repair_state::get_row_diff_finished);
return rows;
});
}
});
});
ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
repair_rows_on_wire row_diff, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto fp = make_foreign(std::make_unique<repair_rows_on_wire>(std::move(row_diff)));
return container().invoke_on(shard, [from, repair_meta_id, fp = std::move(fp)] (repair_service& local_repair) mutable {
auto rm = local_repair.get_repair_meta(from, repair_meta_id);
rm->set_repair_state_for_local_node(repair_state::put_row_diff_started);
if (fp.get_owner_shard() == this_shard_id()) {
return rm->put_row_diff_handler(std::move(*fp)).then([rm] {
rm->set_repair_state_for_local_node(repair_state::put_row_diff_finished);
});
} else {
return rm->put_row_diff_handler(*fp).then([rm] {
rm->set_repair_state_for_local_node(repair_state::put_row_diff_finished);
});
}
});
});
ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version,
rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time, rpc::optional<shard_id> dst_cpu_id_opt,
rpc::optional<service::frozen_topology_guard> topo_guard, rpc::optional<std::optional<int64_t>> repaired_at,
rpc::optional<locator::tablet_repair_incremental_mode> incremental_mode) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from_id = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from_id, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this,
topo_guard = topo_guard.value_or(service::default_session_id), repaired_at = repaired_at.value_or(std::nullopt), incremental_mode = incremental_mode.value_or(locator::tablet_repair_incremental_mode::disabled)] (repair_service& local_repair) mutable {
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
return repair_meta::repair_row_level_start_handler(local_repair, from_id, src_cpu_id, repair_meta_id, std::move(ks_name),
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
shard_config{remote_shard, remote_shard_count, remote_ignore_msb},
schema_version, r, ct, _repair_module->abort_source(), topo_guard, repaired_at, incremental_mode);
});
});
ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional<shard_id> dst_cpu_id_opt, rpc::optional<bool> mark_as_repaired_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
bool mark_as_repaired = mark_as_repaired_opt.value_or(false);
return container().invoke_on(shard, [from, repair_meta_id, ks_name, cf_name, range, mark_as_repaired] (repair_service& local_repair) mutable {
return repair_meta::repair_row_level_stop_handler(local_repair, from, repair_meta_id,
std::move(ks_name), std::move(cf_name), std::move(range), mark_as_repaired);
});
});
ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id] (repair_service& local_repair) mutable {
return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id);
});
});
ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
uint64_t estimated_partitions, rpc::optional<shard_id> dst_cpu_id_opt) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt);
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return container().invoke_on(shard, [from, repair_meta_id, estimated_partitions] (repair_service& local_repair) mutable {
return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions);
});
});
ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) {
return make_ready_future<std::vector<row_level_diff_detect_algorithm>>(suportted_diff_detect_algorithms());
});
ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_update_system_table_handler(from, std::move(req));
});
ser::repair_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) {
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return repair_flush_hints_batchlog_handler(from, std::move(req));
});
ser::repair_rpc_verbs::register_repair_update_compaction_ctrl(&ms, [this] (const rpc::client_info& cinfo, locator::global_tablet_id gid, service::frozen_topology_guard topo_guard) -> future<> {
co_await container().invoke_on_all([gid, topo_guard] (repair_service& local_repair) mutable -> future<> {
auto& table = local_repair.get_db().local().find_column_family(gid.table);
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(gid.table);
auto* trinfo = tmap.get_tablet_transition_info(gid.tablet);
if (!trinfo) {
auto msg = fmt::format("Skipped repair_update_compaction_ctrl gid={} session_id={} since tablet is not in transition", gid, topo_guard);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
if (trinfo->stage != locator::tablet_transition_stage::end_repair) {
auto msg = fmt::format("Skipped repair_update_compaction_ctrl gid={} session_id={} since tablet is not in tablet_transition_stage::end_repair", gid, topo_guard);
rlogger.warn("{}", msg);
throw std::runtime_error(msg);
}
auto range = tmap.get_token_range(gid.tablet);
co_await table.clear_being_repaired_for_range(range);
auto removed = local_repair._repair_compaction_locks.erase(gid);
rlogger.info("Got repair_update_compaction_ctrl gid={} session_id={} removed={}", gid, topo_guard, removed);
});
});
ser::repair_rpc_verbs::register_repair_update_repaired_at_for_merge(&ms, [this] (const rpc::client_info& cinfo, table_id table) -> future<> {
rlogger.debug("Got repair_update_repaired_at_for_merge table={}", table);
co_await container().invoke_on_all([table] (repair_service& local_repair) mutable -> future<> {
auto& t = local_repair.get_db().local().find_column_family(table);
co_await t.update_repaired_at_for_merge();
});
});
return make_ready_future<>();
}
future<> repair_service::uninit_ms_handlers() {
auto& ms = this->_messaging;
return when_all_succeed(
ms.unregister_repair_get_row_diff_with_rpc_stream(),
ms.unregister_repair_put_row_diff_with_rpc_stream(),
ms.unregister_repair_get_full_row_hashes_with_rpc_stream(),
ser::repair_rpc_verbs::unregister(&ms)
).discard_result();
}
class repair_meta_tracker {
boost::intrusive::list<repair_meta,
boost::intrusive::member_hook<repair_meta, repair_meta::tracker_link_type, &repair_meta::_tracker_link>,
boost::intrusive::constant_time_size<false>> _repair_metas;
public:
void add(repair_meta& rm) {
_repair_metas.push_back(rm);
}
};
namespace debug {
static thread_local repair_meta_tracker repair_meta_for_masters;
static thread_local repair_meta_tracker repair_meta_for_followers;
}
static void add_to_repair_meta_for_masters(repair_meta& rm) {
debug::repair_meta_for_masters.add(rm);
}
static void add_to_repair_meta_for_followers(repair_meta& rm) {
debug::repair_meta_for_followers.add(rm);
}
class row_level_repair {
repair::shard_repair_task_impl& _shard_task;
sstring _cf_name;
table_id _table_id;
dht::token_range _range;
host_id_vector_replica_set _all_live_peer_nodes;
std::vector<std::optional<shard_id>> _all_live_peer_shards;
bool _small_table_optimization;
// Repair master and followers will propose a sync boundary. Each of them
// read N bytes of rows from disk, the row with largest
// `position_in_partition` value is the proposed sync boundary of that
// node. The repair master uses `get_sync_boundary` rpc call to
// get all the proposed sync boundary and stores in in
// `_sync_boundaries`. The `get_sync_boundary` rpc call also
// returns the combined hashes and the total size for the rows which are
// in the `_row_buf`. `_row_buf` buffers the rows read from sstable. It
// contains rows at most of `_max_row_buf_size` bytes.
// If all the peers return the same `_sync_boundaries` and
// `_combined_hashes`, we think the rows are synced.
// If not, we proceed to the next step.
std::vector<repair_sync_boundary> _sync_boundaries;
std::vector<repair_hash> _combined_hashes;
// `common_sync_boundary` is the boundary all the peers agrees on
std::optional<repair_sync_boundary> _common_sync_boundary = {};
// `_skipped_sync_boundary` is used in case we find the range is synced
// only with the `get_sync_boundary` rpc call. We use it to make
// sure the remote peers update the `_current_sync_boundary` and
// `_last_sync_boundary` correctly.
std::optional<repair_sync_boundary> _skipped_sync_boundary = {};
// If the total size of the `_row_buf` on either of the nodes is zero,
// we set this flag, which is an indication that rows are not synced.
bool _zero_rows = false;
// Sum of estimated_partitions on all peers
uint64_t _estimated_partitions = 0;
// A flag indicates any error during the repair
bool _failed = false;
// Seed for the repair row hashing. If we ever had a hash conflict for a row
// and we are not using stable hash, there is chance we will fix the row in
// the next repair.
uint64_t _seed;
gc_clock::time_point _start_time;
bool _is_tablet;
service::frozen_topology_guard _topo_guard;
public:
row_level_repair(repair::shard_repair_task_impl& shard_task,
sstring cf_name,
table_id table_id,
dht::token_range range,
std::vector<locator::host_id> all_live_peer_nodes,
bool small_table_optimization,
gc_clock::time_point start_time,
service::frozen_topology_guard topo_guard)
: _shard_task(shard_task)
, _cf_name(std::move(cf_name))
, _table_id(std::move(table_id))
, _range(std::move(range))
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
, _small_table_optimization(small_table_optimization)
, _seed(get_random_seed())
, _start_time(start_time)
, _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets())
, _topo_guard(topo_guard)
{
repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range);
auto& map = r_neighbors.shard_map;
for (auto& n : _all_live_peer_nodes) {
auto it = map.find(n);
if (it != map.end()) {
_all_live_peer_shards.push_back(it->second);
} else {
_all_live_peer_shards.push_back(std::nullopt);
}
}
if (_all_live_peer_shards.size() != _all_live_peer_nodes.size()) {
on_internal_error(rlogger, seastar::format("The size of shards and nodes do not match table={} range={} shards={} nodes={}",
_cf_name, _range, _all_live_peer_shards, _all_live_peer_nodes));
}
}
private:
enum class op_status {
next_round,
next_step,
all_done,
};
host_id_vector_replica_set sort_peer_nodes(const std::vector<locator::host_id>& nodes) {
host_id_vector_replica_set sorted_nodes(nodes.begin(), nodes.end());
auto& topology = get_erm()->get_topology();
topology.sort_by_proximity(topology.my_host_id(), sorted_nodes);
return sorted_nodes;
}
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
size_t size = is_rpc_stream_supported(algo) ? repair::task_manager_module::max_repair_memory_per_range : 256 * 1024;
if (_small_table_optimization) {
// For small table optimization, we reduce the buffer size to reduce memory consumption.
size /= _all_live_peer_nodes.size();
}
return size;
}
// Step A: Negotiate sync boundary to use
op_status negotiate_sync_boundary(repair_meta& master) {
_shard_task.check_in_abort_or_shutdown();
_sync_boundaries.clear();
_combined_hashes.clear();
_zero_rows = false;
rlogger.debug("ROUND {}, _last_sync_boundary={}, _current_sync_boundary={}, _skipped_sync_boundary={}",
master.stats().round_nr, master.last_sync_boundary(), master.current_sync_boundary(), _skipped_sync_boundary);
master.stats().round_nr++;
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
auto dst_cpu_id = ns.shard;
// By calling `get_sync_boundary`, the `_last_sync_boundary`
// is moved to the `_current_sync_boundary` or
// `_skipped_sync_boundary` if it is not std::nullopt.
ns.state = repair_state::get_sync_boundary_started;
try {
get_sync_boundary_response res = co_await master.get_sync_boundary(node, _skipped_sync_boundary, dst_cpu_id);
ns.state = repair_state::get_sync_boundary_finished;
master.stats().row_from_disk_bytes[node] += res.new_rows_size;
master.stats().row_from_disk_nr[node] += res.new_rows_nr;
if (res.boundary && res.row_buf_size > 0) {
_sync_boundaries.push_back(*res.boundary);
_combined_hashes.push_back(res.row_buf_combined_csum);
} else {
// row_size equals 0 means there is no data from on
// that node, so we ignore the sync boundary of
// this node when calculating common sync boundary
_zero_rows = true;
}
rlogger.debug("Called master.get_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}",
node, res.boundary, res.row_buf_combined_csum, res.row_buf_size, _zero_rows, _skipped_sync_boundary);
} catch (...) {
auto ep = std::current_exception();
rlogger.warn("repair[{}]: get_sync_boundary: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
})).get();
rlogger.debug("sync_boundaries nr={}, combined_hashes nr={}",
_sync_boundaries.size(), _combined_hashes.size());
if (!_sync_boundaries.empty()) {
// We have data to sync between (_last_sync_boundary, _current_sync_boundary]
auto res = master.get_common_sync_boundary(_zero_rows, _sync_boundaries, _combined_hashes);
_common_sync_boundary = res.first;
bool already_synced = res.second;
rlogger.debug("Calling master._get_common_sync_boundary: common_sync_boundary={}, already_synced={}",
_common_sync_boundary, already_synced);
// If rows between (_last_sync_boundary, _current_sync_boundary] are synced, goto first step
// This is the first fast path.
if (already_synced) {
_skipped_sync_boundary = _common_sync_boundary;
rlogger.debug("Skip set skipped_sync_boundary={}", _skipped_sync_boundary);
master.stats().round_nr_fast_path_already_synced++;
return op_status::next_round;
} else {
_skipped_sync_boundary = std::nullopt;
}
} else {
master.stats().round_nr_fast_path_already_synced++;
// We are done with this range because all the nodes have no more data.
return op_status::all_done;
}
return op_status::next_step;
}
// Step B: Get missing rows from peer nodes so that local node contains all the rows
op_status get_missing_rows_from_follower_nodes(repair_meta& master) {
_shard_task.check_in_abort_or_shutdown();
// `combined_hashes` contains the combined hashes for the
// `_working_row_buf`. Like `_row_buf`, `_working_row_buf` contains
// rows which are within the (_last_sync_boundary, _current_sync_boundary]
// By calling `get_combined_row_hash(_common_sync_boundary)`,
// all the nodes move the `_current_sync_boundary` to `_common_sync_boundary`,
// Rows within the (_last_sync_boundary, _current_sync_boundary] are
// moved from the `_row_buf` to `_working_row_buf`.
std::vector<repair_hash> combined_hashes;
combined_hashes.resize(master.all_nodes().size());
parallel_for_each(std::views::iota(size_t(0), master.all_nodes().size()), coroutine::lambda([&] (size_t idx) -> future<> {
// Request combined hashes from all nodes between (_last_sync_boundary, _current_sync_boundary]
// Each node will
// - Set `_current_sync_boundary` to `_common_sync_boundary`
// - Move rows from `_row_buf` to `_working_row_buf`
// But the full hashes (each and every hashes for the rows in
// the `_working_row_buf`) are not returned until repair master
// explicitly requests with get_full_row_hashes() below as
// an optimization. Because if the combined_hashes from all
// peers are identical, we think rows in the `_working_row_buff`
// are identical, there is no need to transfer each and every
// row hashes to the repair master.
auto& ns = master.all_nodes()[idx];
ns.state = repair_state::get_combined_row_hash_started;
try {
get_combined_row_hash_response resp = co_await master.get_combined_row_hash(_common_sync_boundary, ns.node, ns.shard);
master.all_nodes()[idx].state = repair_state::get_combined_row_hash_finished;
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}", master.all_nodes()[idx].node, resp);
combined_hashes[idx]= std::move(resp);
} catch (...) {
auto ep = std::current_exception();
auto& node = master.all_nodes()[idx].node;
rlogger.warn("repair[{}]: get_combined_row_hash: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
})).get();
// If all the peers has the same combined_hashes. This means they contain
// the identical rows. So there is no need to sync for this sync boundary.
bool same_combined_hashes = std::adjacent_find(combined_hashes.begin(), combined_hashes.end(),
std::not_equal_to<repair_hash>()) == combined_hashes.end();
if (same_combined_hashes) {
// `_working_row_buf` on all the nodes are the same
// This is the second fast path.
master.stats().round_nr_fast_path_same_combined_hashes++;
return op_status::next_round;
}
master.reset_peer_row_hash_sets();
// Note: We can not work on _all_live_peer_nodes in parallel,
// because syncing with _all_live_peer_nodes in serial avoids
// getting the same rows from more than one peers.
for (unsigned node_idx = 0; node_idx < _all_live_peer_nodes.size(); node_idx++) {
auto& ns = master.all_nodes()[node_idx + 1];
auto& node = _all_live_peer_nodes[node_idx];
auto dst_cpu_id = ns.shard;
try {
// Here is an optimization to avoid transferring full rows hashes,
// if remote and local node, has the same combined_hashes.
// For example:
// node1: 1 2 3
// node2: 1 2 3 4
// node3: 1 2 3 4
// After node1 get the row 4 from node2, node1 update it is
// combined_hashes, so we can avoid fetching the full row hashes from node3.
if (combined_hashes[node_idx + 1] == master.working_row_buf_combined_hash()) {
// local node and peer node have the same combined hash. This
// means we can set peer_row_hash_sets[n] to local row hashes
// without fetching it from peers to save network traffic.
master.peer_row_hash_sets(node_idx) = master.working_row_hashes().get();
rlogger.debug("Calling optimize master.working_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
continue;
}
// Fast path: if local has zero row and remote has rows, request them all.
if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) {
master.peer_row_hash_sets(node_idx).clear();
if (master.use_rpc_stream()) {
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc stream");
ns.state = repair_state::get_row_diff_with_rpc_stream_started;
master.get_row_diff_with_rpc_stream({}, repair_meta::needs_all_rows_t::yes, repair_meta::update_peer_row_hash_sets::yes, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_with_rpc_stream_finished;
} else {
rlogger.debug("FastPath: get_row_diff with needs_all_rows_t::yes rpc verb");
ns.state = repair_state::get_row_diff_and_update_peer_row_hash_sets_started;
master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_and_update_peer_row_hash_sets_finished;
}
continue;
}
rlogger.debug("Before master.get_full_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
// Ask the peer to send the full list hashes in the working row buf.
if (master.use_rpc_stream()) {
ns.state = repair_state::get_full_row_hashes_with_rpc_stream_started;
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes_with_rpc_stream(node, node_idx, dst_cpu_id).get();
ns.state = repair_state::get_full_row_hashes_with_rpc_stream_finished;
} else {
ns.state = repair_state::get_full_row_hashes_started;
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node, dst_cpu_id).get();
ns.state = repair_state::get_full_row_hashes_finished;
}
rlogger.debug("After master.get_full_row_hashes for node {}, hash_sets={}",
node, master.peer_row_hash_sets(node_idx).size());
// With hashes of rows from peer node, we can figure out
// what rows repair master is missing. Note we get missing
// data from repair follower 1, apply the rows, then get
// missing data from repair follower 2 and so on. We do it
// sequentially because the rows from repair follower 1 to
// repair master might reduce the amount of missing data
// between repair master and repair follower 2.
repair_hash_set set_diff = get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes().get());
// Request missing sets from peer node
rlogger.debug("Before get_row_diff to node {}, local={}, peer={}, set_diff={}",
node, master.working_row_hashes().get().size(), master.peer_row_hash_sets(node_idx).size(), set_diff.size());
// If we need to pull all rows from the peer. We can avoid
// sending the row hashes on wire by setting needs_all_rows flag.
auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size());
if (master.use_rpc_stream()) {
ns.state = repair_state::get_row_diff_with_rpc_stream_started;
master.get_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, repair_meta::update_peer_row_hash_sets::no, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_with_rpc_stream_finished;
} else {
ns.state = repair_state::get_row_diff_started;
master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx, dst_cpu_id);
ns.state = repair_state::get_row_diff_finished;
}
rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myhostid(), master.working_row_hashes().get().size());
} catch (...) {
rlogger.warn("repair[{}]: get_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, std::current_exception());
throw;
}
}
master.flush_rows_in_working_row_buf(_small_table_optimization ? std::make_optional(small_table_optimization_params{ .erm = get_erm() }) : std::nullopt);
return op_status::next_step;
}
// Step C: Send missing rows to the peer nodes
void send_missing_rows_to_follower_nodes(repair_meta& master) {
// At this time, repair master contains all the rows between (_last_sync_boundary, _current_sync_boundary]
// So we can figure out which rows peer node are missing and send the missing rows to them
_shard_task.check_in_abort_or_shutdown();
repair_hash_set local_row_hash_sets = master.working_row_hashes().get();
auto sz = _all_live_peer_nodes.size();
std::vector<repair_hash_set> set_diffs(sz);
for (size_t idx : std::views::iota(size_t(0), sz)) {
set_diffs[idx] = get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
}
parallel_for_each(std::views::iota(size_t(0), sz), coroutine::lambda([&] (size_t idx) -> future<> {
auto& ns = master.all_nodes()[idx + 1];
auto dst_cpu_id = ns.shard;
auto needs_all_rows = repair_meta::needs_all_rows_t(master.peer_row_hash_sets(idx).empty());
auto& set_diff = set_diffs[idx];
rlogger.debug("Calling master.put_row_diff to node {}, set_diff={}, needs_all_rows={}", _all_live_peer_nodes[idx], set_diff.size(), needs_all_rows);
auto& node = master.all_nodes()[idx].node;
if (master.use_rpc_stream()) {
ns.state = repair_state::put_row_diff_with_rpc_stream_started;
try {
co_await master.put_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], idx, _small_table_optimization ? std::make_optional(small_table_optimization_params{ .erm = get_erm() }) : std::nullopt, dst_cpu_id);
ns.state = repair_state::put_row_diff_with_rpc_stream_finished;
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("repair[{}]: put_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
} else {
ns.state = repair_state::put_row_diff_started;
try {
co_await master.put_row_diff(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], dst_cpu_id);
ns.state = repair_state::put_row_diff_finished;
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("repair[{}]: put_row_diff: got error from node={}, keyspace={}, table={}, range={}, error={}",
_shard_task.global_repair_id.uuid(), node, _shard_task.get_keyspace(), _cf_name, _range, ep);
std::rethrow_exception(ep);
}
}
})).get();
master.stats().round_nr_slow_path++;
}
private:
locator::effective_replication_map_ptr get_erm() {
return _shard_task.get_erm();
}
private:
// Update system.repair_history table
future<> update_system_repair_table() {
// Update repair_history table only if it is a reguar repair.
if (_shard_task.reason() != streaming::stream_reason::repair) {
co_return;
}
auto my_address = get_erm()->get_topology().my_host_id();
// Update repair_history table only if all replicas have been repaired
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_shard_task.get_total_rf() != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.get_total_rf(), repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
if (!_shard_task.hints_batchlog_flushed()) {
co_return;
}
// The tablet repair time for tombstone gc will be updated when the
// system.tablet.repair_time is updated.
if (_is_tablet && _shard_task.sched_info.sched_by_scheduler) {
rlogger.debug("repair[{}]: Skipped to update system.repair_history for tablet repair scheduled by scheduler total_rf={} repaired_replicas={} local={} peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.get_total_rf(), repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
repair_service& rs = _shard_task.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_shard_task.global_repair_id.uuid(), _table_id, _shard_task.get_keyspace(), _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(my_address);
co_await coroutine::parallel_for_each(all_nodes, [this, req] (locator::host_id node) -> future<> {
try {
auto& ms = _shard_task.messaging.local();
repair_update_system_table_response resp = co_await ser::repair_rpc_verbs::send_repair_update_system_table(&ms, node, req);
(void)resp; // nothing to do with the response yet
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.global_repair_id.uuid(), node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _shard_task.global_repair_id.uuid(), node, std::current_exception());
}
});
co_return;
}
public:
future<> run() {
return seastar::async([this] {
_shard_task.check_in_abort_or_shutdown();
auto repair_meta_id = _shard_task.rs.get_next_repair_meta_id().get();
auto algorithm = get_common_diff_detect_algorithm(_shard_task.messaging.local(), _all_live_peer_nodes);
auto max_row_buf_size = get_max_row_buf_size(algorithm);
auto& cf = _shard_task.db.local().find_column_family(_table_id);
auto& sharder = cf.get_effective_replication_map()->get_sharder(*(cf.schema()));
auto master_node_shard_config = shard_config {
this_shard_id(),
sharder.shard_count(),
sharder.sharding_ignore_msb()
};
auto s = cf.schema();
auto schema_version = s->version();
bool table_dropped = false;
auto& mem_sem = _shard_task.rs.memory_sem();
auto max = _shard_task.rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * max_row_buf_size;
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max);
auto mem_permit = seastar::get_units(mem_sem, wanted).get();
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
_shard_task.global_repair_id.uuid(), wanted, mem_sem.current(), max);
auto permit = _shard_task.db.local().obtain_reader_permit(_shard_task.db.local().find_column_family(_table_id), "repair-meta", db::no_timeout, {}).get();
auto compaction_time = gc_clock::now();
std::optional<int64_t> repaired_at;
bool enable_incremental_repair = _shard_task.db.local().features().tablet_incremental_repair && _is_tablet &&
_shard_task.sched_info.incremental_mode != locator::tablet_repair_incremental_mode::disabled &&
_shard_task.sched_info.sched_by_scheduler &&
!_shard_task.sched_info.for_tablet_rebuild;
if (enable_incremental_repair) {
auto& table = _shard_task.db.local().find_column_family(_table_id);
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_table_id);
auto last_token = _range.end() ? _range.end()->value() : dht::maximum_token();
auto& tinfo = tmap.get_tablet_info(last_token);
auto sstables_repaired_at = tinfo.sstables_repaired_at;
repaired_at = sstables_repaired_at + 1;
}
repair_meta master(_shard_task.rs,
_shard_task.db.local().find_column_family(_table_id),
s,
std::move(permit),
_range,
algorithm,
max_row_buf_size,
_seed,
repair_master::yes,
repair_meta_id,
_shard_task.reason(),
std::move(master_node_shard_config),
_all_live_peer_nodes,
_all_live_peer_nodes.size(),
_all_live_peer_shards,
this,
compaction_time,
_topo_guard,
repaired_at,
_shard_task.sched_info.incremental_mode);
auto auto_stop_master = defer([&master] {
try {
master.stop().get();
} catch (...) {
std::exception_ptr ep = std::current_exception();
rlogger.warn("Failed auto-stopping Row Level Repair (Master): {}. Ignored.", ep);
}
});
rlogger.debug(">>> Started Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_size={}",
master.myhostid(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_keyspace(), _cf_name, schema_version, _range, _seed, max_row_buf_size);
std::exception_ptr ex = nullptr;
std::vector<repair_node_state> nodes_to_stop;
nodes_to_stop.reserve(master.all_nodes().size());
try {
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
ns.state = repair_state::row_level_start_started;
co_await master.repair_row_level_start(node, _shard_task.get_keyspace(), _cf_name, _range, schema_version, _shard_task.reason(), compaction_time, ns.shard);
ns.state = repair_state::row_level_start_finished;
nodes_to_stop.push_back(ns);
ns.state = repair_state::get_estimated_partitions_started;
uint64_t partitions = co_await master.repair_get_estimated_partitions(node, ns.shard);
ns.state = repair_state::get_estimated_partitions_finished;
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);
_estimated_partitions += partitions;
})).get();
if (!master.all_nodes().empty()) {
// Use the average number of partitions, instead of the sum
// of the partitions, as the estimated partitions in a
// given range. The bigger the estimated partitions, the
// more memory bloom filter for the sstable would consume.
_estimated_partitions /= master.all_nodes().size();
// In addition, estimate the difference between nodes is
// less than the specified ratio for regular repair.
// Underestimation will not be a big problem since those
// sstables produced by repair will go through off-strategy
// later anyway. The worst case is that we have a worse
// false positive ratio than expected temporarily when the
// sstable is still in maintenance set.
//
// To save memory and have less different conditions, we
// use the estimation for RBNO repair as well.
_estimated_partitions *= _shard_task.db.local().get_config().repair_partition_count_estimation_ratio();
}
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
const auto& node = ns.node;
rlogger.trace("Get repair_set_estimated_partitions for node={}, estimated_partitions={}", node, _estimated_partitions);
ns.state = repair_state::set_estimated_partitions_started;
co_await master.repair_set_estimated_partitions(node, _estimated_partitions, ns.shard);
ns.state = repair_state::set_estimated_partitions_finished;
})).get();
while (true) {
auto status = negotiate_sync_boundary(master);
if (status == op_status::next_round) {
continue;
} else if (status == op_status::all_done) {
break;
}
status = get_missing_rows_from_follower_nodes(master);
if (status == op_status::next_round) {
continue;
}
send_missing_rows_to_follower_nodes(master);
}
} catch (replica::no_such_column_family& e) {
table_dropped = true;
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_shard_task.global_repair_id.uuid(), this_shard_id(), _shard_task.get_keyspace(), _cf_name, _range, e);
_failed = true;
} catch (std::exception& e) {
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_shard_task.global_repair_id.uuid(), this_shard_id(), _shard_task.get_keyspace(), _cf_name, _range, e);
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
_failed = true;
ex = std::current_exception();
}
bool mark_as_repaired = false;
if (master.is_incremental_repair() && !_failed) {
mark_as_repaired = true;
}
utils::get_local_injector().inject("repair_finish_wait", utils::wait_for_message(300s)).get();
parallel_for_each(nodes_to_stop, coroutine::lambda([&] (repair_node_state& ns) -> future<> {
auto node = ns.node;
master.set_repair_state(repair_state::row_level_stop_started, node);
co_await master.repair_row_level_stop(node, _shard_task.get_keyspace(), _cf_name, _range, ns.shard, mark_as_repaired);
master.set_repair_state(repair_state::row_level_stop_finished, node);
})).get();
_shard_task.update_statistics(master.stats());
if (_failed) {
if (table_dropped) {
throw replica::no_such_column_family(_shard_task.get_keyspace(), _cf_name);
} else {
throw nested_exception(std::make_exception_ptr(std::runtime_error(fmt::format("Failed to repair for keyspace={}, cf={}, range={}", _shard_task.get_keyspace(),
_cf_name, _range))), std::move(ex));
}
} else {
update_system_repair_table().get();
}
rlogger.debug("<<< Finished Row Level Repair (Master): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, range={}, tx_hashes_nr={}, rx_hashes_nr={}, tx_row_nr={}, rx_row_nr={}, row_from_disk_bytes={}, row_from_disk_nr={}",
master.myhostid(), _all_live_peer_nodes, master.repair_meta_id(), _shard_task.get_keyspace(), _cf_name, _range, master.stats().tx_hashes_nr, master.stats().rx_hashes_nr, master.stats().tx_row_nr, master.stats().rx_row_nr, master.stats().row_from_disk_bytes, master.stats().row_from_disk_nr);
});
}
};
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
sstring cf_name, table_id table_id, dht::token_range range,
const std::vector<locator::host_id>& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time,
service::frozen_topology_guard topo_guard) {
auto start_time = flush_time;
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
auto t = std::chrono::steady_clock::now();
auto update_time = seastar::defer([&] {
if (is_tablet && !is_tablet_rebuild) {
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
_metrics.tablet_time_ms += duration.count();
}
});
co_await repair.run();
}
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {
repair_service& _repair_service;
public:
row_level_repair_gossip_helper(repair_service& repair_service) noexcept
: _repair_service(repair_service)
{}
future<> remove_row_level_repair(locator::host_id node) {
rlogger.debug("Started to remove row level repair on all shards for node {}", node);
try {
co_await _repair_service.container().invoke_on_all([node] (repair_service& local_repair) {
return local_repair.remove_repair_meta(node);
});
rlogger.debug("Finished to remove row level repair on all shards for node {}", node);
} catch(...) {
rlogger.warn("Failed to remove row level repair for node {}: {}", node, std::current_exception());
}
}
virtual future<> on_dead(
gms::inet_address endpoint,
locator::host_id id,
gms::endpoint_state_ptr state,
gms::permit_id) override {
return remove_row_level_repair(id);
}
virtual future<> on_remove(
gms::inet_address endpoint,
locator::host_id id,
gms::permit_id) override {
return remove_row_level_repair(id);
}
virtual future<> on_restart(
gms::inet_address endpoint,
locator::host_id id,
gms::endpoint_state_ptr ep_state,
gms::permit_id) override {
return remove_row_level_repair(id);
}
};
repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
sharded<gms::gossiper>& gossiper,
netw::messaging_service& ms,
sharded<replica::database>& db,
sharded<service::storage_proxy>& sp,
sharded<db::batchlog_manager>& bm,
sharded<db::system_keyspace>& sys_ks,
db::view::view_builder& vb,
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
: _tsm(tsm)
, _gossiper(gossiper)
, _messaging(ms)
, _db(db)
, _sp(sp)
, _bm(bm)
, _sys_ks(sys_ks)
, _view_builder(vb)
, _view_building_worker(vbw)
, _repair_module(seastar::make_shared<repair::task_manager_module>(tm, *this, max_repair_memory))
, _mm(mm)
, _node_ops_metrics(_repair_module)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
_gossip_helper = make_shared<row_level_repair_gossip_helper>(*this);
_gossiper.local().register_(_gossip_helper);
}
}
future<> repair_service::start(utils::disk_space_monitor* dsm) {
if (dsm && (this_shard_id() == 0)) {
_out_of_space_subscription = dsm->subscribe(_db.local().get_config().critical_disk_utilization_level, [this] (auto threshold_reached) {
if (threshold_reached) {
return container().invoke_on_all([] (repair_service& rs) { return rs.drain(); });
}
return container().invoke_on_all([] (repair_service& rs) { rs.enable(); });
});
}
_load_history_done = load_history();
co_await init_ms_handlers();
_state = state::running;
}
future<> repair_service::stop() {
try {
rlogger.debug("Stopping repair task module");
co_await _repair_module->stop();
rlogger.debug("Waiting on load_history_done");
co_await std::move(_load_history_done);
rlogger.debug("Uninitializing messaging service handlers");
co_await uninit_ms_handlers();
if (this_shard_id() == 0) {
rlogger.debug("Unregistering gossiper helper");
co_await _gossiper.local().unregister_(_gossip_helper);
}
_state = state::stopped;
rlogger.info("Stopped repair_service");
} catch (...) {
on_fatal_internal_error(rlogger, format("Failed stopping repair_service: {}", std::current_exception()));
}
}
void repair_service::enable() {
SCYLLA_ASSERT(_state == state::none || _state == state::running);
rlogger.info("Asked to enable");
if (_state == state::none) {
_state = state::running;
SCYLLA_ASSERT(_disabled_state_count == 0);
} else if (_disabled_state_count > 0 && --_disabled_state_count > 0) {
rlogger.debug("Repair service is still disabled, requires {} more call(s) to enable()", _disabled_state_count);
return;
}
rlogger.info("Enabled");
}
future<> repair_service::drain() {
rlogger.info("Asked to drain");
++_disabled_state_count;
// Abort ongoing repairs
co_await abort_all();
rlogger.info("Drained");
}
repair_service::~repair_service() {
// Assert that repair service was explicitly stopped, if started.
// Otherwise, fiber(s) will be alive after the object is stopped.
SCYLLA_ASSERT(_state == state::none || _state == state::stopped);
}
static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
return shard_id(repair_id.uuid().get_most_significant_bits()) % smp::count;
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time, bool is_tablet) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time, is_tablet] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
if (rh.repair_time > repair_time) {
rh.repair_time = repair_time;
}
auto finished_shards = ++(rh.finished_ranges[table_id][range]);
// Tablet repair runs only on one shard
if (finished_shards == smp::count || is_tablet) {
// All shards have finished repair the range. Send an rpc to ask peers to update system.repair_history table
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_history table, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return rh.repair_time;
} else {
rlogger.debug("repair[{}]: Finished range {} for table {} on all shards, updating system.repair_historytable, finished_shards={}",
repair_id, range, table_id, finished_shards);
co_return std::nullopt;
}
});
}
future<> repair_service::cleanup_history(tasks::task_id repair_id) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
rs._finished_ranges_history.erase(repair_id);
rlogger.debug("repair[{}]: Finished cleaning up repair_service history", repair_id);
});
}
future<> repair_service::load_history() {
try {
co_await get_db().local().get_tables_metadata().parallel_for_each_table(coroutine::lambda([&] (table_id table_uuid, lw_shared_ptr<replica::table> table) -> future<> {
auto shard = utils::uuid_xor_to_uint32(table_uuid.uuid()) % smp::count;
if (shard != this_shard_id()) {
co_return;
}
auto permit = co_await seastar::get_units(_load_parallelism_semaphore, 1);
rlogger.info("Loading repair history for keyspace={}, table={}, table_uuid={}",
table->schema()->ks_name(), table->schema()->cf_name(), table_uuid);
co_await _sys_ks.local().get_repair_history(table_uuid, [this] (const auto& entry) -> future<> {
get_repair_module().check_in_shutdown();
auto start = entry.range_start == std::numeric_limits<int64_t>::min() ? dht::minimum_token() : dht::token::from_int64(entry.range_start);
auto end = entry.range_end == std::numeric_limits<int64_t>::min() ? dht::maximum_token() : dht::token::from_int64(entry.range_end);
auto range = dht::token_range(dht::token_range::bound(start, false), dht::token_range::bound(end, true));
auto repair_time = to_gc_clock(entry.ts);
rlogger.debug("Loading repair history for keyspace={}, table={}, table_uuid={}, repair_time={}, range={}",
entry.ks, entry.cf, entry.table_uuid, entry.ts, range);
try {
co_await get_db().invoke_on_all([table_uuid = entry.table_uuid, range, repair_time] (replica::database& local_db) {
auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state();
gc_state.update_repair_time(table_uuid, range, repair_time);
});
} catch (...) {
rlogger.warn("Failed to update repair history time for keyspace={}, table={}, range={}, repair_time={}",
entry.ks, entry.cf, range, repair_time);
}
});
}));
} catch (const abort_requested_exception&) {
// Ignore
} catch (...) {
rlogger.warn("Failed to update repair history time: {}. Ignored", std::current_exception());
}
}
repair_meta_ptr repair_service::get_repair_meta(locator::host_id from, uint32_t repair_meta_id) {
node_repair_meta_id id{from, repair_meta_id};
auto it = repair_meta_map().find(id);
if (it == repair_meta_map().end()) {
throw std::runtime_error(format("get_repair_meta: repair_meta_id {} for node {} does not exist", id.repair_meta_id, id.ip));
} else {
return it->second;
}
}
future<>
repair_service::insert_repair_meta(
locator::host_id from_id,
uint32_t src_cpu_id,
uint32_t repair_meta_id,
dht::token_range range,
row_level_diff_detect_algorithm algo,
uint64_t max_row_buf_size,
uint64_t seed,
shard_config master_node_shard_config,
table_schema_version schema_version,
streaming::stream_reason reason,
gc_clock::time_point compaction_time,
abort_source& as,
service::frozen_topology_guard topo_guard,
std::optional<int64_t> repaired_at,
locator::tablet_repair_incremental_mode incremental_mode) {
schema_ptr s = co_await get_migration_manager().get_schema_for_write(schema_version, from_id, src_cpu_id, get_messaging(), as);
auto& db = get_db();
reader_permit permit = co_await db.local().obtain_reader_permit(db.local().find_column_family(s->id()), "repair-meta", db::no_timeout, {});
node_repair_meta_id id{from_id, repair_meta_id};
auto rm = seastar::make_shared<repair_meta>(*this,
get_db().local().find_column_family(s->id()),
s,
std::move(permit),
range,
algo,
max_row_buf_size,
seed,
repair_master::no,
repair_meta_id,
reason,
std::move(master_node_shard_config),
host_id_vector_replica_set{from_id},
compaction_time,
topo_guard,
repaired_at,
incremental_mode);
rm->set_repair_state_for_local_node(repair_state::row_level_start_started);
bool insertion = repair_meta_map().emplace(id, rm).second;
if (!insertion) {
rlogger.warn("insert_repair_meta: repair_meta_id {} for node {} already exists, replace existing one", id.repair_meta_id, id.ip);
repair_meta_map()[id] = rm;
rm->set_repair_state_for_local_node(repair_state::row_level_start_finished);
} else {
rlogger.debug("insert_repair_meta: Inserted repair_meta_id {} for node {}", id.repair_meta_id, id.ip);
}
}
future<>
repair_service::remove_repair_meta(const locator::host_id& from,
uint32_t repair_meta_id,
sstring ks_name,
sstring cf_name,
dht::token_range range,
bool mark_as_repaired) {
node_repair_meta_id id{from, repair_meta_id};
auto it = repair_meta_map().find(id);
if (it == repair_meta_map().end()) {
rlogger.warn("remove_repair_meta: repair_meta_id {} for node {} does not exist", id.repair_meta_id, id.ip);
co_return;
} else {
auto rm = it->second;
repair_meta_map().erase(it);
rlogger.debug("remove_repair_meta: Stop repair_meta_id {} for node {} started", id.repair_meta_id, id.ip);
co_await rm->stop();
if (mark_as_repaired) {
co_await rm->mark_sstable_as_repaired();
}
rlogger.debug("remove_repair_meta: Stop repair_meta_id {} for node {} finished", id.repair_meta_id, id.ip);
}
}
future<>
repair_service::remove_repair_meta(locator::host_id from) {
rlogger.debug("Remove all repair_meta for single node {}", from);
auto repair_metas = make_lw_shared<utils::chunked_vector<repair_meta_ptr>>();
for (auto it = repair_meta_map().begin(); it != repair_meta_map().end();) {
if (it->first.ip == from) {
repair_metas->push_back(it->second);
it = repair_meta_map().erase(it);
} else {
it++;
}
}
co_await coroutine::parallel_for_each(*repair_metas, [&] (auto& rm) -> future<> {
co_await rm->stop();
rm = {};
});
rlogger.debug("Removed all repair_meta for single node {}", from);
}
future<>
repair_service::remove_repair_meta() {
rlogger.debug("Remove all repair_meta for all nodes");
auto repair_metas = make_lw_shared<utils::chunked_vector<repair_meta_ptr>>(
repair_meta_map()
| std::views::values | std::ranges::to<utils::chunked_vector<repair_meta_ptr>>());
repair_meta_map().clear();
co_await coroutine::parallel_for_each(*repair_metas, [&] (auto& rm) -> future<> {
co_await rm->stop();
rm = {};
});
rlogger.debug("Removed all repair_meta for all nodes");
}
future<uint32_t> repair_service::get_next_repair_meta_id() {
return container().invoke_on(0, [] (repair_service& local_repair) {
return local_repair._next_repair_meta_id++;
});
}
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}
void repair_service::on_cleanup_for_drop_table(const table_id& id) {
// Prevent repair lock from being leaked in repair_service when table is dropped midway.
// The RPC verb that removes the lock on success path will not be called by coordinator after table was dropped.
// We also cannot move the lock from repair_service to repair_meta, since the lock must outlive the latter.
// Since tablet metadata has been erased at this point, we can simply erase all instances for the dropped table.
rlogger.debug("Cleaning up state for dropped table {}", id);
for (auto it = _repair_compaction_locks.begin(); it != _repair_compaction_locks.end();) {
auto& [global_tid, _] = *it;
if (global_tid.table == id) {
it = _repair_compaction_locks.erase(it);
} else {
it++;
}
}
}