repair: release erm in repair_writer_impl::create_writer when possible

Currently, repair_writer_impl::create_writer keeps erm to ensure
that a sharder is valid. If we repair a tablet, erm blocks the state
machine and no operation on any tablet of this table might be performed.

Use auto_refreshing_sharder and topology_guard to ensure that the
operation is safe and that tablet operations on the whole table
aren't blocked.

Fixes: #23453.
(cherry picked from commit 1dc29ddc86)
This commit is contained in:
Aleksandra Martyniuk
2025-03-26 14:13:43 +01:00
committed by GitHub Action
parent c56e47f72f
commit 307f00a398

View File

@@ -9,6 +9,7 @@
#include <exception>
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include "dht/auto_refreshing_sharder.hh"
#include "gms/endpoint_state.hh"
#include "repair/repair.hh"
#include "message/messaging_service.hh"
@@ -486,21 +487,52 @@ mutation_fragment_queue make_mutation_fragment_queue(schema_ptr s, reader_permit
return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared<queue_reader_handle_adapter>(std::move(handle)));
}
struct sharder_helper {
struct tablet_sharder_keepalive {
std::unique_ptr<dht::auto_refreshing_sharder> sharder_ptr;
service::topology_guard topo_guard;
};
using sharder_keepalive = std::variant<tablet_sharder_keepalive, locator::effective_replication_map_ptr>;
sharder_keepalive keepalive;
const dht::sharder& sharder;
sharder_helper(sharder_keepalive s_keepalive, const dht::sharder& s)
: keepalive(std::move(s_keepalive))
, sharder(s)
{}
};
sharder_helper get_sharder_helper(replica::table& t, const schema& s, service::frozen_topology_guard frozen_topo_guard) {
if (frozen_topo_guard == service::default_session_id) {
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(s);
sharder_helper::sharder_keepalive keepalive = std::move(erm);
return sharder_helper{std::move(keepalive), sharder};
} else {
sharder_helper::tablet_sharder_keepalive keepalive{
.sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(t.shared_from_this()),
.topo_guard = frozen_topo_guard
};
auto& sharder = *keepalive.sharder_ptr;
return sharder_helper{std::move(keepalive), sharder};
}
}
void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
if (_writer_done) {
return;
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(*(w->schema()));
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
auto sharder = get_sharder_helper(t, *(w->schema()), _topo_guard);
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder.sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _view_builder, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), _topo_guard),
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
}).handle_exception([w, erm] (std::exception_ptr ep) {
}).handle_exception([w, keepalive = std::move(sharder.keepalive)] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
w->schema()->ks_name(), w->schema()->cf_name(), ep);
w->queue().abort(ep);