service/raft: make this node's Raft ID available early in group registry

Raft ID was loaded or created late in the boot procedure, in
`storage_service::join_token_ring`.

Create it earlier, as soon as it's possible (when `system_keyspace`
is started), pass it to `raft_group_registry::start` and store it inside
`raft_group_registry`.

We will use this Raft ID stored in group registry in following patches.
Also this reduces the number of disk accesses for this node's Raft ID.
It's now loaded from disk once, stored in `raft_group_registry`, then
obtained from there when needed.

This moves `raft_group_registry::start` a bit later in the startup
procedure - after `system_keyspace` is started - but it doesn't make
a difference.
This commit is contained in:
Kamil Braun
2022-11-25 17:45:02 +01:00
parent dbd00fd3e9
commit 99fe580068
7 changed files with 74 additions and 56 deletions

View File

@@ -1113,7 +1113,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
throw bad_configuration_error();
}
}
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
supervisor::notify("starting query processor");
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
@@ -1151,6 +1150,13 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
cfg->host_id = sys_ks.local().load_local_host_id().get0();
if (raft_gr.local().is_enabled()) {
auto my_raft_id = service::load_or_create_my_raft_id(sys_ks.local()).get();
raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) {
return raft_gr.start(my_raft_id);
}).get();
}
group0_client.init().get();
db::sstables_format_selector sst_format_selector(gossiper.local(), feature_service, db);

View File

@@ -172,25 +172,8 @@ static future<group0_upgrade_state> send_get_group0_upgrade_state(netw::messagin
co_return state;
}
future<raft::server_id> raft_group0::load_my_id() {
assert(this_shard_id() == 0);
auto id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
if (!id) {
on_internal_error(group0_log, "load_my_id(): server ID for group 0 missing");
}
co_return id;
}
seastar::future<raft::server_id> raft_group0::load_or_create_my_id() {
assert(this_shard_id() == 0);
auto id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
if (id == raft::server_id{}) {
id = raft::server_id::create_random_id();
co_await db::system_keyspace::set_raft_server_id(id.id);
}
co_return id;
const raft::server_id& raft_group0::load_my_id() {
return _raft_gr.get_my_raft_id();
}
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id) {
@@ -371,7 +354,7 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id) {
assert(group0_id != raft::group_id{});
// The address map may miss our own id in case we connect
// to an existing Raft Group 0 leader.
auto my_id = co_await load_my_id();
auto my_id = load_my_id();
_raft_gr.address_map().add_or_update_entry(my_id, _gossiper.get_broadcast_address());
// At this time the group registry is already up and running,
// so the address map is getting all the notifications from
@@ -394,7 +377,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, bool as_
}
raft::server* server = nullptr;
auto my_id = co_await load_or_create_my_id();
auto my_id = load_my_id();
group0_log.info("server {} found no local group 0. Discovering...", my_id);
while (true) {
auto g0_info = co_await discover_group0(my_id, seeds);
@@ -656,7 +639,7 @@ void raft_group0::load_initial_raft_address_map() {
future<> raft_group0::finish_setup_after_join() {
if (joined_group0()) {
group0_log.info("finish_setup_after_join: group 0 ID present, loading server info.");
auto my_id = co_await load_my_id();
auto my_id = load_my_id();
if (!_raft_gr.group0().get_configuration().can_vote(my_id)) {
group0_log.info("finish_setup_after_join: becoming a voter in the group 0 configuration...");
// Just bootstrapped and joined as non-voter. Become a voter.
@@ -763,12 +746,7 @@ future<> raft_group0::leave_group0() {
"leave_group0: we're fully upgraded to use Raft but didn't join group 0. Please report a bug.");
}
auto my_id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
if (!my_id) {
on_internal_error(group0_log,
"leave_group0: we're fully upgraded to use Raft and group 0 ID is present but Raft server ID is not."
" Please report a bug.");
}
auto my_id = load_my_id();
// Note: if this gets stuck due to a failure, the DB admin can abort.
// FIXME: this gets stuck without failures if we're the leader (#10833)
@@ -823,12 +801,7 @@ future<> raft_group0::remove_from_group0(gms::inet_address node) {
" Please report a bug.", node));
}
auto my_id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
if (!my_id) {
on_internal_error(group0_log, format(
"remove_from_group0({}): we're fully upgraded to use Raft and group 0 ID is present but Raft server ID is not."
" Please report a bug.", node));
}
auto my_id = load_my_id();
// Find their group 0 server's Raft ID.
// Note: even if the removed node had the same inet_address as us, `find_replace_id` should correctly find them

View File

@@ -160,10 +160,12 @@ public:
// We'll look for the other node's Raft ID in the group 0 configuration.
future<> remove_from_group0(gms::inet_address host);
// Loads server id for group 0 from disk if present,
// otherwise randomly generates a new one and persists it.
// Execute on shard 0 only.
future<raft::server_id> load_or_create_my_id();
// Assumes that this node's Raft server ID is already initialized and returns it.
// It's a fatal error if the id is missing.
//
// The returned ID is not empty.
const raft::server_id& load_my_id();
private:
void init_rpc_verbs();
future<> uninit_rpc_verbs();
@@ -175,14 +177,6 @@ private:
raft_server_for_group create_server_for_group0(raft::group_id id, raft::server_id my_id);
// Assumes server id for group 0 is already persisted and loads it from disk.
// It's a fatal error if the id is missing.
//
// Execute on shard 0 only.
//
// The returned ID is not empty.
future<raft::server_id> load_my_id();
// Run the discovery algorithm.
//
// Discovers an existing group 0 cluster or elects a server (called a 'leader')

View File

@@ -8,6 +8,7 @@
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_rpc.hh"
#include "service/raft/raft_address_map.hh"
#include "db/system_keyspace.hh"
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
@@ -65,7 +66,6 @@ public:
return _alive_set.contains(srv);
}
};
// }}} direct_fd_proxy
// {{{ gossiper_state_change_subscriber_proxy
@@ -135,6 +135,16 @@ public:
// }}} gossiper_state_change_subscriber_proxy
future<raft::server_id> load_or_create_my_raft_id(db::system_keyspace& sys_ks) {
assert(this_shard_id() == 0);
auto id = raft::server_id{co_await db::system_keyspace::get_raft_server_id()};
if (id == raft::server_id{}) {
id = raft::server_id::create_random_id();
co_await db::system_keyspace::set_raft_server_id(id.id);
}
co_return id;
}
raft_group_registry::raft_group_registry(bool is_enabled, raft_address_map& address_map,
netw::messaging_service& ms, gms::gossiper& gossiper, direct_failure_detector::failure_detector& fd)
: _is_enabled(is_enabled)
@@ -291,11 +301,14 @@ future<> raft_group_registry::stop_servers() noexcept {
co_await g.close();
}
seastar::future<> raft_group_registry::start() {
if (!_is_enabled) {
co_return;
}
seastar::future<> raft_group_registry::start(raft::server_id my_id) {
assert(_is_enabled);
assert(!_my_id);
_my_id = my_id;
_gossiper.register_(_gossiper_proxy);
// Once a Raft server starts, it soon times out
// and starts an election, so RPC must be ready by
// then to send VoteRequest messages.
@@ -305,6 +318,13 @@ seastar::future<> raft_group_registry::start() {
direct_fd_clock::base::duration{std::chrono::seconds{1}}.count()));
}
const raft::server_id& raft_group_registry::get_my_raft_id() {
if (!_my_id) {
on_internal_error(rslog, "get_my_raft_id(): Raft ID not initialized");
}
return *_my_id;
}
seastar::future<> raft_group_registry::stop() {
if (!_is_enabled) {
co_return;

View File

@@ -22,6 +22,10 @@ class gossiper;
class echo_pinger;
}
namespace db {
class system_keyspace;
}
namespace service {
class raft_rpc;
@@ -49,6 +53,8 @@ class direct_fd_pinger;
class direct_fd_proxy;
class gossiper_state_change_subscriber_proxy;
future<raft::server_id> load_or_create_my_raft_id(db::system_keyspace&);
// This class is responsible for creating, storing and accessing raft servers.
// It also manages the raft rpc verbs initialization.
//
@@ -86,17 +92,30 @@ private:
// Group 0 id, valid only on shard 0 after boot/upgrade is over
std::optional<raft::group_id> _group0_id;
// My Raft ID. Shared between different Raft groups.
// Once set, must not be changed.
//
// FIXME: ideally we'd like this to be passed to the constructor.
// However storage_proxy/query_processor/system_keyspace are unavailable
// when we start raft_group_registry so we have to set it later,
// after system_keyspace is initialized.
std::optional<raft::server_id> _my_id;
public:
// `is_enabled` must be `true` iff the local RAFT feature is enabled.
raft_group_registry(bool is_enabled, raft_address_map&,
netw::messaging_service& ms, gms::gossiper& gs, direct_failure_detector::failure_detector& fd);
~raft_group_registry();
// Called manually at start
seastar::future<> start();
// If is_enabled(),
// Called manually at start on every shard, after system_keyspace is initialized.
seastar::future<> start(raft::server_id my_id);
// Called by sharded<>::stop()
seastar::future<> stop();
// Must not be called before `start`.
const raft::server_id& get_my_raft_id();
// Called by before stopping the database.
// May be called multiple times.
seastar::future<> drain_on_shutdown() noexcept;

View File

@@ -406,7 +406,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
if (_group0->is_raft_enabled()) {
auto my_id = co_await _group0->load_or_create_my_id();
auto my_id = _group0->load_my_id();
app_states.emplace(gms::application_state::RAFT_SERVER_ID, versioned_value::raft_server_id(my_id.id));
}
app_states.emplace(gms::application_state::RELEASE_VERSION, versioned_value::release_version());

View File

@@ -648,7 +648,6 @@ public:
raft_gr.start(cfg->check_experimental(db::experimental_features_t::feature::RAFT),
std::ref(raft_address_map), std::ref(ms), std::ref(gossiper), std::ref(fd)).get();
auto stop_raft_gr = deferred_stop(raft_gr);
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(ms), std::ref(mm), std::ref(gossiper)).get();
auto stop_streaming = defer([&stream_manager] { stream_manager.stop().get(); });
@@ -786,6 +785,13 @@ public:
}
}).get();
if (raft_gr.local().is_enabled()) {
auto my_raft_id = service::load_or_create_my_raft_id(sys_ks.local()).get();
raft_gr.invoke_on_all([my_raft_id] (service::raft_group_registry& raft_gr) {
return raft_gr.start(my_raft_id);
}).get();
}
group0_client.init().get();
auto stop_system_keyspace = defer([&sys_ks] {
db::qctx = {};