From 045806cae7b8af69068e4437fdce3d9cc2e8103b Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 4 Oct 2021 14:42:21 +0300 Subject: [PATCH] storage_service: replicate_to_all_cores: use local pending_token_metadata_ptr Rather than a _pending_token_metadata_ptr member in the storeage_service class. This is now much easier that the function was converted to a coroutine. Signed-off-by: Benny Halevy --- service/storage_service.cc | 20 ++++++++++++-------- service/storage_service.hh | 1 - 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 43b0c1de8e..d04763cf48 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -45,6 +45,7 @@ #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" #include "db/consistency_level.hh" +#include "seastar/core/smp.hh" #include "utils/UUID.hh" #include "gms/inet_address.hh" #include "log.hh" @@ -1380,12 +1381,15 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt slogger.debug("Replicating token_metadata to all cores"); std::exception_ptr ex; + std::vector pending_token_metadata_ptr; + pending_token_metadata_ptr.resize(smp::count); + try { - _pending_token_metadata_ptr = tmptr; + auto base_shard = this_shard_id(); + pending_token_metadata_ptr[base_shard] = tmptr; // clone a local copy of updated token_metadata on all other shards - co_await container().invoke_on_others([tmptr] (storage_service& ss) -> future<> { - auto tm = co_await tmptr->clone_async(); - ss._pending_token_metadata_ptr = make_token_metadata_ptr(std::move(tm)); + co_await smp::invoke_on_others([&, base_shard, tmptr] () -> future<> { + pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async()); }); } catch (...) { ex = std::current_exception(); @@ -1394,8 +1398,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt // Rollback on metadata replication error if (ex) { try { - co_await container().invoke_on_all([] (storage_service& ss) -> future<> { - if (auto tmptr = std::move(ss._pending_token_metadata_ptr)) { + co_await smp::invoke_on_all([&] () -> future<> { + if (auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()])) { co_await tmptr->clear_gently(); } }); @@ -1408,8 +1412,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt // Apply changes on all shards try { - co_await container().invoke_on_all([] (storage_service& ss) { - ss._shared_token_metadata.set(std::move(ss._pending_token_metadata_ptr)); + co_await container().invoke_on_all([&] (storage_service& ss) { + ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()])); }); } catch (...) { // applying the changes on all shards should never fail diff --git a/service/storage_service.hh b/service/storage_service.hh index bfae971d5b..9703935187 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -249,7 +249,6 @@ private: return utils::fb_utilities::get_broadcast_address(); } /* This abstraction maintains the token/endpoint metadata information */ - mutable_token_metadata_ptr _pending_token_metadata_ptr; shared_token_metadata& _shared_token_metadata; /* CDC generation management service.