diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index e273177e65..76ca11db12 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -54,40 +54,19 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key) } } -future paxos_state::get_replica_lock(const dht::token& key, - clock_type::time_point timeout, const dht::shard_replica_set& shards) +future paxos_state::get_replica_lock(const schema& s, const dht::token& token, + clock_type::time_point timeout) { - if (shards.empty()) { - on_internal_error(logger, "empty shards"); - } - replica_guard replica_guard; - replica_guard.resize(shards.size()); + // When a tablet is migrated between shards on the same node, during the + // write_both_read_new state we begin switching reads to the new shard. + // Until the corresponding global barrier completes, some requests may still + // use write_both_read_old erm, while others already use the write_both_read_new erm. + // To ensure mutual exclusion between these two types of requests, we must + // acquire locks on both the old and new shards. + // Once the global barrier completes, no requests remain on the old shard, + // so we can safely switch to acquiring locks only on the new shard. + auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token); - auto acquire = [&shards, &replica_guard, &key, timeout](unsigned index) { - return smp::submit_to(shards[index], [&key, timeout] { - auto g = make_lw_shared(_paxos_table_lock, key, timeout); - return g->lock().then([g]{ return make_foreign(std::move(g)); }); - }).then([&replica_guard, index](guard_foreign_ptr g) { - replica_guard[index] = std::move(g); - }); - }; - - co_await acquire(0); - if (shards.size() > 1) { - co_await acquire(1); - } - - co_return replica_guard; -} - -future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { - guard m(_coordinator_lock, key, timeout); - co_await m.lock(); - co_return m; -} - -static dht::shard_replica_set shards_for_writes(const schema& s, dht::token token) { - auto shards = s.table().shard_for_writes(token); if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) { const auto& erm = s.table().get_effective_replication_map(); const auto& rs = erm->get_replication_strategy(); @@ -98,10 +77,25 @@ static dht::shard_replica_set shards_for_writes(const schema& s, dht::token toke tablet_map.get_tablet_id(token), tablet_map); } on_internal_error(paxos_state::logger, - format("invalid shard, shard_for_writes {}, token {}{}", shards, token, tablet_map_desc)); + format("invalid shard, shards {}, token {}{}", shards, token, tablet_map_desc)); } std::ranges::sort(shards); - return shards; + + replica_guard replica_guard; + replica_guard.resize(shards.size()); + for (size_t i = 0; i < shards.size(); ++i) { + replica_guard[i] = co_await smp::submit_to(shards[i], [token, timeout] { + auto g = make_lw_shared(_paxos_table_lock, token, timeout); + return g->lock().then([g]{ return make_foreign(std::move(g)); }); + }); + } + co_return replica_guard; +} + +future paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) { + guard m(_coordinator_lock, key, timeout); + co_await m.lock(); + co_return m; } future paxos_state::prepare(storage_proxy& sp, paxos_store& paxos_store, tracing::trace_state_ptr tr_state, schema_ptr schema, @@ -119,8 +113,7 @@ future paxos_state::prepare(storage_proxy& sp, paxos_store& pa } }); - const auto shards = shards_for_writes(*schema, token); - auto guard = co_await get_replica_lock(token, timeout, shards); + auto guard = co_await get_replica_lock(*schema, token, timeout); // When preparing, we need to use the same time as "now" (that's the time we use to decide if something // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up @@ -211,8 +204,7 @@ future paxos_state::accept(storage_proxy& sp, paxos_store& paxos_store, tr } }); - const auto shards = shards_for_writes(*schema, token); - auto guard = co_await get_replica_lock(token, timeout, shards); + auto guard = co_await get_replica_lock(*schema, token, timeout); auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot); paxos_state state = co_await paxos_store.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout); diff --git a/service/paxos/paxos_state.hh b/service/paxos/paxos_state.hh index 1012417182..31df4aeb7d 100644 --- a/service/paxos/paxos_state.hh +++ b/service/paxos/paxos_state.hh @@ -86,8 +86,8 @@ private: using guard_foreign_ptr = foreign_ptr>; using replica_guard = boost::container::static_vector; - static future get_replica_lock(const dht::token& key, clock_type::time_point timeout, - const dht::shard_replica_set& shards); + static future get_replica_lock(const schema& s, const dht::token& token, + clock_type::time_point timeout); utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID(); std::optional _accepted_proposal;