From 5ab2db961380e06a7471f68f935905a581dd8337 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 27 Oct 2025 10:21:19 +0100 Subject: [PATCH 1/2] paxos_state: inline shards_for_writes into get_replica_lock No need to have two functions since both callers of get_replica_lock() use shards_for_writes() to compute the shards where the locks must be acquired. Also while at it, inline the acquire() lambda in get_replica_lock() and replace it with a loop over shards. This makes the code more strightforward. --- service/paxos/paxos_state.cc | 59 +++++++++++++----------------------- service/paxos/paxos_state.hh | 4 +-- 2 files changed, 23 insertions(+), 40 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index e273177e65..47f69fb185 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -54,39 +54,9 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key) } } -future paxos_state::get_replica_lock(const dht::token& key, - clock_type::time_point timeout, const dht::shard_replica_set& shards) +future paxos_state::get_replica_lock(const schema& s, const dht::token& token, + clock_type::time_point timeout) { - if (shards.empty()) { - on_internal_error(logger, "empty shards"); - } - replica_guard replica_guard; - replica_guard.resize(shards.size()); - - auto acquire = [&shards, &replica_guard, &key, timeout](unsigned index) { - return smp::submit_to(shards[index], [&key, timeout] { - auto g = make_lw_shared(_paxos_table_lock, key, timeout); - return g->lock().then([g]{ return make_foreign(std::move(g)); }); - }).then([&replica_guard, index](guard_foreign_ptr g) { - replica_guard[index] = std::move(g); - }); - }; - - co_await acquire(0); - if (shards.size() > 1) { - co_await acquire(1); - } - - co_return replica_guard; -} - -future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { - guard m(_coordinator_lock, key, timeout); - co_await m.lock(); - co_return m; -} - -static dht::shard_replica_set shards_for_writes(const schema& s, dht::token token) { auto shards = s.table().shard_for_writes(token); if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) { const auto& erm = s.table().get_effective_replication_map(); @@ -98,10 +68,25 @@ static dht::shard_replica_set shards_for_writes(const schema& s, dht::token toke tablet_map.get_tablet_id(token), tablet_map); } on_internal_error(paxos_state::logger, - format("invalid shard, shard_for_writes {}, token {}{}", shards, token, tablet_map_desc)); + format("invalid shard, shards {}, token {}{}", shards, token, tablet_map_desc)); } std::ranges::sort(shards); - return shards; + + replica_guard replica_guard; + replica_guard.resize(shards.size()); + for (size_t i = 0; i < shards.size(); ++i) { + replica_guard[i] = co_await smp::submit_to(shards[i], [token, timeout] { + auto g = make_lw_shared(_paxos_table_lock, token, timeout); + return g->lock().then([g]{ return make_foreign(std::move(g)); }); + }); + } + co_return replica_guard; +} + +future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { + guard m(_coordinator_lock, key, timeout); + co_await m.lock(); + co_return m; } future paxos_state::prepare(storage_proxy& sp, paxos_store& paxos_store, tracing::trace_state_ptr tr_state, schema_ptr schema, @@ -119,8 +104,7 @@ future paxos_state::prepare(storage_proxy& sp, paxos_store& pa } }); - const auto shards = shards_for_writes(*schema, token); - auto guard = co_await get_replica_lock(token, timeout, shards); + auto guard = co_await get_replica_lock(*schema, token, timeout); // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up @@ -211,8 +195,7 @@ future paxos_state::accept(storage_proxy& sp, paxos_store& paxos_store, tr } }); - const auto shards = shards_for_writes(*schema, token); - auto guard = co_await get_replica_lock(token, timeout, shards); + auto guard = co_await get_replica_lock(*schema, token, timeout); auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); paxos_state state = co_await paxos_store.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index 1012417182..31df4aeb7d 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -86,8 +86,8 @@ private: using guard_foreign_ptr = foreign_ptr>; using replica_guard = boost::container::static_vector; - static future get_replica_lock(const dht::token& key, clock_type::time_point timeout, - const dht::shard_replica_set& shards); + static future get_replica_lock(const schema& s, const dht::token& token, + clock_type::time_point timeout); utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(); std::optional _accepted_proposal; From 478f7f545ab8ad890f757d77f082c1ded705910f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 27 Oct 2025 11:00:23 +0100 Subject: [PATCH 2/2] paxos_state: use shards_ready_for_reads Acquiring locks on both shards for the entire tablet migration period is redundant. In most cases, locking only the old shard or only the new shard is sufficient. Using shards_ready_for_reads reduces the situations in which we need to lock both shards to: * intra-node migrations only * only during the write_both_read_new state Once the global barrier completes in the write_both_read_new state, no requests remain on the old shard, so we can safely acquire locks only on the new shard. Fixes scylladb/scylladb#26727 --- service/paxos/paxos_state.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 47f69fb185..76ca11db12 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -57,7 +57,16 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key) future paxos_state::get_replica_lock(const schema& s, const dht::token& token, clock_type::time_point timeout) { - auto shards = s.table().shard_for_writes(token); + // When a tablet is migrated between shards on the same node, during the + // write_both_read_new state we begin switching reads to the new shard. + // Until the corresponding global barrier completes, some requests may still + // use write_both_read_old erm, while others already use the write_both_read_new erm. + // To ensure mutual exclusion between these two types of requests, we must + // acquire locks on both the old and new shards. + // Once the global barrier completes, no requests remain on the old shard, + // so we can safely switch to acquiring locks only on the new shard. + auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token); + if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) { const auto& erm = s.table().get_effective_replication_map(); const auto& rs = erm->get_replication_strategy();