storage_service: replicate_to_all_cores: use local pending_token_metadata_ptr

Rather than a _pending_token_metadata_ptr member in the storeage_service
class.  This is now much easier that the function was converted to a
coroutine.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2021-10-04 14:42:21 +03:00
parent 52f48f47f6
commit 045806cae7
2 changed files with 12 additions and 9 deletions

View File

@@ -45,6 +45,7 @@
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include "seastar/core/smp.hh"
#include "utils/UUID.hh"
#include "gms/inet_address.hh"
#include "log.hh"
@@ -1380,12 +1381,15 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
slogger.debug("Replicating token_metadata to all cores");
std::exception_ptr ex;
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
try {
_pending_token_metadata_ptr = tmptr;
auto base_shard = this_shard_id();
pending_token_metadata_ptr[base_shard] = tmptr;
// clone a local copy of updated token_metadata on all other shards
co_await container().invoke_on_others([tmptr] (storage_service& ss) -> future<> {
auto tm = co_await tmptr->clone_async();
ss._pending_token_metadata_ptr = make_token_metadata_ptr(std::move(tm));
co_await smp::invoke_on_others([&, base_shard, tmptr] () -> future<> {
pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async());
});
} catch (...) {
ex = std::current_exception();
@@ -1394,8 +1398,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
// Rollback on metadata replication error
if (ex) {
try {
co_await container().invoke_on_all([] (storage_service& ss) -> future<> {
if (auto tmptr = std::move(ss._pending_token_metadata_ptr)) {
co_await smp::invoke_on_all([&] () -> future<> {
if (auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()])) {
co_await tmptr->clear_gently();
}
});
@@ -1408,8 +1412,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
// Apply changes on all shards
try {
co_await container().invoke_on_all([] (storage_service& ss) {
ss._shared_token_metadata.set(std::move(ss._pending_token_metadata_ptr));
co_await container().invoke_on_all([&] (storage_service& ss) {
ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
});
} catch (...) {
// applying the changes on all shards should never fail

View File

@@ -249,7 +249,6 @@ private:
return utils::fb_utilities::get_broadcast_address();
}
/* This abstraction maintains the token/endpoint metadata information */
mutable_token_metadata_ptr _pending_token_metadata_ptr;
shared_token_metadata& _shared_token_metadata;
/* CDC generation management service.