diff --git a/repair/row_level.cc b/repair/row_level.cc index 133402dfd8..36985e29e9 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -9,6 +9,7 @@ #include #include #include +#include "dht/auto_refreshing_sharder.hh" #include "gms/endpoint_state.hh" #include "repair/repair.hh" #include "message/messaging_service.hh" @@ -486,21 +487,52 @@ mutation_fragment_queue make_mutation_fragment_queue(schema_ptr s, reader_permit return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared(std::move(handle))); } +struct sharder_helper { + struct tablet_sharder_keepalive { + std::unique_ptr sharder_ptr; + service::topology_guard topo_guard; + }; + using sharder_keepalive = std::variant; + + sharder_keepalive keepalive; + const dht::sharder& sharder; + + sharder_helper(sharder_keepalive s_keepalive, const dht::sharder& s) + : keepalive(std::move(s_keepalive)) + , sharder(s) + {} +}; + +sharder_helper get_sharder_helper(replica::table& t, const schema& s, service::frozen_topology_guard frozen_topo_guard) { + if (frozen_topo_guard == service::default_session_id) { + // The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid. + auto erm = t.get_effective_replication_map(); + auto& sharder = erm->get_sharder(s); + sharder_helper::sharder_keepalive keepalive = std::move(erm); + return sharder_helper{std::move(keepalive), sharder}; + } else { + sharder_helper::tablet_sharder_keepalive keepalive{ + .sharder_ptr = std::make_unique(t.shared_from_this()), + .topo_guard = frozen_topo_guard + }; + auto& sharder = *keepalive.sharder_ptr; + return sharder_helper{std::move(keepalive), sharder}; + } +} + void repair_writer_impl::create_writer(lw_shared_ptr w) { if (_writer_done) { return; } replica::table& t = _db.local().find_column_family(_schema->id()); rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); - // The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid. - auto erm = t.get_effective_replication_map(); - auto& sharder = erm->get_sharder(*(w->schema())); - _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader), + auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard); + _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader), streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), _topo_guard), - t.stream_in_progress()).then([w, erm] (uint64_t partitions) { + t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); - }).handle_exception([w, erm] (std::exception_ptr ep) { + }).handle_exception([w, keepalive = std::move(sharder.keepalive)] (std::exception_ptr ep) { rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}", w->schema()->ks_name(), w->schema()->cf_name(), ep); w->queue().abort(ep);