From fb2c46dfbe9c6bd92826d98640bb6ae755d8c0fb Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Mon, 10 Mar 2025 14:23:44 +0100 Subject: [PATCH] repair: pass session_id to repair_writer_impl::create_writer (cherry picked from commit 09c74aa294a28a16aa99a14dc32d803569750498) --- repair/row_level.cc | 15 +++++++++------ repair/writer.hh | 4 +++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index c946eaf267..da402f8882 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -404,6 +404,7 @@ class repair_writer_impl : public repair_writer::impl { sharded& _view_builder; streaming::stream_reason _reason; mutation_reader _queue_reader; + service::frozen_topology_guard _topo_guard; public: repair_writer_impl( schema_ptr schema, @@ -412,7 +413,8 @@ public: sharded& view_builder, streaming::stream_reason reason, mutation_fragment_queue queue, - mutation_reader queue_reader) + mutation_reader queue_reader, + service::frozen_topology_guard topo_guard) : _schema(std::move(schema)) , _permit(std::move(permit)) , _mq(std::move(queue)) @@ -420,6 +422,7 @@ public: , _view_builder(view_builder) , _reason(reason) , _queue_reader(std::move(queue_reader)) + , _topo_guard(topo_guard) {} virtual void create_writer(lw_shared_ptr writer) override; @@ -489,12 +492,11 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { } replica::table& t = _db.local().find_column_family(_schema->id()); rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); - service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate // The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid. auto erm = t.get_effective_replication_map(); auto& sharder = erm->get_sharder(*(w->schema())); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), _topo_guard), t.stream_in_progress()).then([w, erm] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); @@ -511,10 +513,11 @@ lw_shared_ptr make_repair_writer( reader_permit permit, streaming::stream_reason reason, sharded& db, - sharded& view_builder) { + sharded& view_builder, + service::frozen_topology_guard topo_guard) { auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader)); + auto i = std::make_unique(schema, permit, db, view_builder, reason, std::move(queue), std::move(queue_reader), topo_guard); return make_lw_shared(schema, permit, std::move(i)); } @@ -806,7 +809,7 @@ public: , _remote_sharder(make_remote_sharder()) , _same_sharding_config(is_same_sharding_config(cf)) , _nr_peer_nodes(nr_peer_nodes) - , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder())) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, rs.get_view_builder(), topo_guard)) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, std::optional dst_cpu_id_opt, locator::host_id addr) { auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard); diff --git a/repair/writer.hh b/repair/writer.hh index 242e39149b..48f70971a3 100644 --- a/repair/writer.hh +++ b/repair/writer.hh @@ -4,6 +4,7 @@ #include #include "schema/schema_fwd.hh" #include "reader_permit.hh" +#include "service/topology_guard.hh" #include "streaming/stream_reason.hh" #include "repair/decorated_key_with_hash.hh" #include "readers/upgrading_consumer.hh" @@ -150,5 +151,6 @@ lw_shared_ptr make_repair_writer( streaming::stream_reason reason, sharded& db, sharded& sys_dist_ks, - sharded& view_update_generator); + sharded& view_update_generator, + service::frozen_topology_guard topo_guard);