Merge 'Initialize group0 server on boot before allowing incoming requests' from Gleb

The series includes mostly cleanups and one bug fix.

The fix is for the race where messages that need to access group0 server are arriving
before the server is initialized.

* 'gleb/group0-sp-mm-race-v2' of github.com:scylladb/scylla-dev:
  service: raft: fix typo
  service: raft: split off setup_group0_if_exist from setup_group0
  storage_service: do not allow override_decommission flag if consistent cluster management is enabled
  storage_service: fix indentation after the previous patch
  storage_service: co-routinize storage_service::join_cluster() function
  storage_service: do not reload topology from peers table if topology over raft is enabled
  storage_service: optimize debug logging code in case debug log is not enabled
This commit is contained in:
Kamil Braun
2023-06-01 17:37:58 +02:00
5 changed files with 109 additions and 73 deletions

View File

@@ -837,7 +837,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, replace_address(this, "replace_address", value_status::Used, "", "[[deprecated]] The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "[[deprecated]] Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ignore for replace operation using a comma-separated list of host IDs. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster (cannot be set if consistent-cluster-management is enabled")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild,bootstrap,decommission", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")

View File

@@ -1603,6 +1603,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
group0_service.abort().get();
});
// Setup group0 early in case the node is bootsrapped already and the group exists
// Need to do it before allowing incomming messaging service connections since
// storage proxy's and migration manager's verbs may access group0
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local(), cdc_generation_service.local()).get();
with_scheduling_group(maintenance_scheduling_group, [&] {
return messaging.invoke_on_all([&token_metadata] (auto& netw) {
return netw.start_listen(token_metadata.local());

View File

@@ -507,9 +507,7 @@ static future<bool> synchronize_schema(
const noncopyable_function<future<bool>()>& can_finish_early,
abort_source&);
future<> raft_group0::setup_group0(
db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_servic) {
future<bool> raft_group0::use_raft() {
assert(this_shard_id() == 0);
if (!_raft_gr.is_enabled()) {
@@ -517,31 +515,53 @@ future<> raft_group0::setup_group0(
// Note: if the local feature was enabled by every node earlier, that would enable the cluster
// SUPPORTS_RAFT feature, and the node should then refuse to start during feature check
// (because if the local feature is disabled, then the cluster feature - enabled in the cluster - is 'unknown' to us).
co_return;
co_return false;
}
if (((co_await _client.get_group0_upgrade_state()).second) == group0_upgrade_state::recovery) {
group0_log.warn("setup_group0: Raft RECOVERY mode, skipping group 0 setup.");
co_return false;
}
co_return true;
}
future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) {
if (!co_await use_raft()) {
co_return;
}
if (!sys_ks.bootstrap_complete()) {
// If bootsrap did not complete yet there is no group0 to setup
co_return;
}
auto group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
if (group0_id) {
// Group 0 ID is present => we've already joined group 0 earlier.
group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server.");
co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_service);
} else {
// Scylla has bootstrapped earlier but group 0 ID not present. This means we're upgrading.
// Upgrade will start through a feature listener created after we enter NORMAL state.
//
// See `raft_group0::finish_setup_after_join`.
upgrade_log.info(
"setup_group0: Scylla bootstrap completed before but group 0 ID not present."
" Internal upgrade-to-raft procedure will automatically start after every node finishes"
" upgrading to the new Scylla version.");
}
}
future<> raft_group0::setup_group0(
db::system_keyspace& sys_ks, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info> replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) {
if (!co_await use_raft()) {
co_return;
}
if (sys_ks.bootstrap_complete()) {
auto group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()};
if (group0_id) {
// Group 0 ID is present => we've already joined group 0 earlier.
group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server.");
co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_servic);
} else {
// Scylla has bootstrapped earlier but group 0 ID not present. This means we're upgrading.
// Upgrade will start through a feature listener created after we enter NORMAL state.
//
// See `raft_group0::finish_setup_after_join`.
upgrade_log.info(
"setup_group0: Scylla bootstrap completed before but group 0 ID not present."
" Internal upgrade-to-raft procedure will automatically start after every node finishes"
" upgrading to the new Scylla version.");
}
// If the node is bootsraped the group0 server should be setup already
co_return;
}
@@ -553,7 +573,7 @@ future<> raft_group0::setup_group0(
}
group0_log.info("setup_group0: joining group 0...");
co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_servic);
co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_service);
group0_log.info("setup_group0: successfully joined group 0.");
if (replace_info) {

View File

@@ -131,7 +131,6 @@ public:
//
// If the local RAFT feature is enabled, does one of the following:
// - join group 0 (if we're bootstrapping),
// - start existing group 0 server (if we bootstrapped before),
// - prepare us for the upgrade procedure, which will create group 0 later (if we're upgrading).
//
// Cannot be called twice.
@@ -140,6 +139,14 @@ public:
future<> setup_group0(db::system_keyspace&, const std::unordered_set<gms::inet_address>& initial_contact_nodes,
std::optional<replace_info>, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Call during the startup procedure before networking is enabled during restart
//
// If the local RAFT feature is enabled start existing group 0 server.
//
// Cannot be called twice.
//
future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service);
// Call at the end of the startup procedure, after the node entered NORMAL state.
// `setup_group0()` must have finished earlier.
//
@@ -306,6 +313,9 @@ private:
// Load the initial Raft <-> IP address map as seen by
// the gossiper.
void load_initial_raft_address_map();
// Returns true if raft is enabled
future<bool> use_raft();
};
} // end of namespace service

View File

@@ -1575,11 +1575,11 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
std::optional<cdc::generation_id> cdc_gen_id;
if (_sys_ks.local().was_decommissioned()) {
if (_db.local().get_config().override_decommission()) {
if (_db.local().get_config().override_decommission() && !_db.local().get_config().consistent_cluster_management()) {
slogger.warn("This node was decommissioned, but overriding by operator request.");
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
} else {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set,"
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set and consistent cluster management is not in use,"
"or all existing data is removed and the node is bootstrapped again");
slogger.error("{}", msg);
throw std::runtime_error(msg);
@@ -1726,6 +1726,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
assert(_group0);
// if the node is bootstrapped the functin will do nothing since we already created group0 in main.cc
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info, *this, qp, _migration_manager.local(), cdc_gen_service);
raft::server* raft_server = co_await [this] () -> future<raft::server*> {
@@ -2724,24 +2725,24 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service,
_group0 = &group0;
_raft_topology_change_enabled = _group0->is_raft_enabled() && _db.local().get_config().check_experimental(db::experimental_features_t::feature::RAFT);
return seastar::async([this, &cdc_gen_service, &sys_dist_ks, &proxy, &qp] {
set_mode(mode::STARTING);
set_mode(mode::STARTING);
std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state()) {
slogger.info("Loading persisted ring state");
auto loaded_tokens = _sys_ks.local().load_tokens().get0();
auto loaded_host_ids = _sys_ks.local().load_host_ids().get0();
auto loaded_dc_rack = _sys_ks.local().load_dc_rack_info().get0();
std::unordered_set<inet_address> loaded_endpoints;
if (_db.local().get_config().load_ring_state() && !_raft_topology_change_enabled) {
slogger.info("Loading persisted ring state");
auto loaded_tokens = co_await _sys_ks.local().load_tokens();
auto loaded_host_ids = co_await _sys_ks.local().load_host_ids();
auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info();
auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) {
if (loaded_dc_rack.contains(ep)) {
return loaded_dc_rack[ep];
} else {
return locator::endpoint_dc_rack::default_location;
}
};
auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) {
if (loaded_dc_rack.contains(ep)) {
return loaded_dc_rack[ep];
} else {
return locator::endpoint_dc_rack::default_location;
}
};
if (slogger.is_enabled(logging::log_level::debug)) {
for (auto& x : loaded_tokens) {
slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second);
}
@@ -2749,44 +2750,44 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service,
for (auto& x : loaded_host_ids) {
slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second);
}
}
auto tmlock = get_token_metadata_lock().get0();
auto tmptr = get_mutable_token_metadata_ptr().get0();
for (auto x : loaded_tokens) {
auto ep = x.first;
auto tokens = x.second;
if (ep == get_broadcast_address()) {
// entry has been mistakenly added, delete it
_sys_ks.local().remove_endpoint(ep).get();
} else {
tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal);
tmptr->update_normal_tokens(tokens, ep).get();
if (loaded_host_ids.contains(ep)) {
tmptr->update_host_id(loaded_host_ids.at(ep), ep);
}
loaded_endpoints.insert(ep);
_gossiper.add_saved_endpoint(ep).get();
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
for (auto x : loaded_tokens) {
auto ep = x.first;
auto tokens = x.second;
if (ep == get_broadcast_address()) {
// entry has been mistakenly added, delete it
co_await _sys_ks.local().remove_endpoint(ep);
} else {
tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal);
co_await tmptr->update_normal_tokens(tokens, ep);
if (loaded_host_ids.contains(ep)) {
tmptr->update_host_id(loaded_host_ids.at(ep), ep);
}
loaded_endpoints.insert(ep);
co_await _gossiper.add_saved_endpoint(ep);
}
replicate_to_all_cores(std::move(tmptr)).get();
}
co_await replicate_to_all_cores(std::move(tmptr));
}
// Seeds are now only used as the initial contact point nodes. If the
// loaded_endpoints are empty which means this node is a completely new
// node, we use the nodes specified in seeds as the initial contact
// point nodes, otherwise use the peer nodes persisted in system table.
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints;
auto loaded_peer_features = _sys_ks.local().load_peer_features().get0();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
join_token_ring(cdc_gen_service, sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), qp).get();
});
// Seeds are now only used as the initial contact point nodes. If the
// loaded_endpoints are empty which means this node is a completely new
// node, we use the nodes specified in seeds as the initial contact
// point nodes, otherwise use the peer nodes persisted in system table.
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints;
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
co_return co_await join_token_ring(cdc_gen_service, sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), qp);
}
future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {