Merge 'LWT: use shards_ready_for_reads for replica locks' from Petr Gusev

When a tablet is migrated between shards on the same node, during the write_both_read_new state we begin switching reads to the new shard. Until the corresponding global barrier completes, some requests may still use write_both_read_old erm, while others already use the write_both_read_new erm. To ensure mutual exclusion between these two types of requests, we must acquire locks on both the old and new shards. Once the global barrier completes, no requests remain on the old shard, so we can safely switch to acquiring locks only on the new shard.

The idea came from the similar locking problem in the [counters for tablets PR](https://github.com/scylladb/scylladb/pull/26636#discussion_r2463932395).

Fixes scylladb/scylladb#26727

backport: need to backport to 2025.4

Closes scylladb/scylladb#26719

* https://github.com/scylladb/scylladb:
  paxos_state: use shards_ready_for_reads
  paxos_state: inline shards_for_writes into get_replica_lock
This commit is contained in:
Patryk Jędrzejczak
2025-10-28 10:37:53 +01:00
2 changed files with 32 additions and 40 deletions

View File

@@ -54,40 +54,19 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
}
}
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const dht::token& key,
clock_type::time_point timeout, const dht::shard_replica_set& shards)
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const schema& s, const dht::token& token,
clock_type::time_point timeout)
{
if (shards.empty()) {
on_internal_error(logger, "empty shards");
}
replica_guard replica_guard;
replica_guard.resize(shards.size());
// When a tablet is migrated between shards on the same node, during the
// write_both_read_new state we begin switching reads to the new shard.
// Until the corresponding global barrier completes, some requests may still
// use write_both_read_old erm, while others already use the write_both_read_new erm.
// To ensure mutual exclusion between these two types of requests, we must
// acquire locks on both the old and new shards.
// Once the global barrier completes, no requests remain on the old shard,
// so we can safely switch to acquiring locks only on the new shard.
auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token);
auto acquire = [&shards, &replica_guard, &key, timeout](unsigned index) {
return smp::submit_to(shards[index], [&key, timeout] {
auto g = make_lw_shared<guard>(_paxos_table_lock, key, timeout);
return g->lock().then([g]{ return make_foreign(std::move(g)); });
}).then([&replica_guard, index](guard_foreign_ptr g) {
replica_guard[index] = std::move(g);
});
};
co_await acquire(0);
if (shards.size() > 1) {
co_await acquire(1);
}
co_return replica_guard;
}
future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) {
guard m(_coordinator_lock, key, timeout);
co_await m.lock();
co_return m;
}
static dht::shard_replica_set shards_for_writes(const schema& s, dht::token token) {
auto shards = s.table().shard_for_writes(token);
if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) {
const auto& erm = s.table().get_effective_replication_map();
const auto& rs = erm->get_replication_strategy();
@@ -98,10 +77,25 @@ static dht::shard_replica_set shards_for_writes(const schema& s, dht::token toke
tablet_map.get_tablet_id(token), tablet_map);
}
on_internal_error(paxos_state::logger,
format("invalid shard, shard_for_writes {}, token {}{}", shards, token, tablet_map_desc));
format("invalid shard, shards {}, token {}{}", shards, token, tablet_map_desc));
}
std::ranges::sort(shards);
return shards;
replica_guard replica_guard;
replica_guard.resize(shards.size());
for (size_t i = 0; i < shards.size(); ++i) {
replica_guard[i] = co_await smp::submit_to(shards[i], [token, timeout] {
auto g = make_lw_shared<guard>(_paxos_table_lock, token, timeout);
return g->lock().then([g]{ return make_foreign(std::move(g)); });
});
}
co_return replica_guard;
}
future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, clock_type::time_point timeout) {
guard m(_coordinator_lock, key, timeout);
co_await m.lock();
co_return m;
}
future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& paxos_store, tracing::trace_state_ptr tr_state, schema_ptr schema,
@@ -119,8 +113,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& pa
}
});
const auto shards = shards_for_writes(*schema, token);
auto guard = co_await get_replica_lock(token, timeout, shards);
auto guard = co_await get_replica_lock(*schema, token, timeout);
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
@@ -211,8 +204,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, paxos_store& paxos_store, tr
}
});
const auto shards = shards_for_writes(*schema, token);
auto guard = co_await get_replica_lock(token, timeout, shards);
auto guard = co_await get_replica_lock(*schema, token, timeout);
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
paxos_state state = co_await paxos_store.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);

View File

@@ -86,8 +86,8 @@ private:
using guard_foreign_ptr = foreign_ptr<lw_shared_ptr<guard>>;
using replica_guard = boost::container::static_vector<guard_foreign_ptr, 2>;
static future<replica_guard> get_replica_lock(const dht::token& key, clock_type::time_point timeout,
const dht::shard_replica_set& shards);
static future<replica_guard> get_replica_lock(const schema& s, const dht::token& token,
clock_type::time_point timeout);
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID();
std::optional<proposal> _accepted_proposal;