repair: pass session_id to repair_writer_impl::create_writer

(cherry picked from commit 09c74aa294)
This commit is contained in:
Aleksandra Martyniuk
2025-03-10 14:23:44 +01:00
parent b4e37600d6
commit fb2c46dfbe
2 changed files with 12 additions and 7 deletions

View File

@@ -404,6 +404,7 @@ class repair_writer_impl : public repair_writer::impl {
sharded<db::view::view_builder>& _view_builder;
streaming::stream_reason _reason;
mutation_reader _queue_reader;
service::frozen_topology_guard _topo_guard;
public:
repair_writer_impl(
schema_ptr schema,
@@ -412,7 +413,8 @@ public:
sharded<db::view::view_builder>& view_builder,
streaming::stream_reason reason,
mutation_fragment_queue queue,
mutation_reader queue_reader)
mutation_reader queue_reader,
service::frozen_topology_guard topo_guard)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _mq(std::move(queue))
@@ -420,6 +422,7 @@ public:
, _view_builder(view_builder)
, _reason(reason)
, _queue_reader(std::move(queue_reader))
, _topo_guard(topo_guard)
{}
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override;
@@ -489,12 +492,11 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(*(w->schema()));
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), _topo_guard),
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
@@ -511,10 +513,11 @@ lw_shared_ptr<repair_writer> make_repair_writer(
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::view::view_builder>& view_builder) {
sharded<db::view::view_builder>& view_builder,
service::frozen_topology_guard topo_guard) {
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader), topo_guard);
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}
@@ -806,7 +809,7 @@ public:
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config(cf))
, _nr_peer_nodes(nr_peer_nodes)
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder()))
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder(), topo_guard))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, locator::host_id addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);

View File

@@ -4,6 +4,7 @@
#include <seastar/core/shared_ptr.hh>
#include "schema/schema_fwd.hh"
#include "reader_permit.hh"
#include "service/topology_guard.hh"
#include "streaming/stream_reason.hh"
#include "repair/decorated_key_with_hash.hh"
#include "readers/upgrading_consumer.hh"
@@ -150,5 +151,6 @@ lw_shared_ptr<repair_writer> make_repair_writer(
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator);
sharded<db::view::view_update_generator>& view_update_generator,
service::frozen_topology_guard topo_guard);