diff --git a/db/config.cc b/db/config.cc index ce47e93b89..f23b819b7b 100644 --- a/db/config.cc +++ b/db/config.cc @@ -837,7 +837,7 @@ db::config::config(std::shared_ptr exts) , replace_address(this, "replace_address", value_status::Used, "", "[[deprecated]] The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.") , replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "[[deprecated]] Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.") , ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ignore for replace operation using a comma-separated list of host IDs. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e") - , override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster") + , override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster (cannot be set if consistent-cluster-management is enabled") , enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based") , allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild,bootstrap,decommission", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild") , ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.") diff --git a/main.cc b/main.cc index 05e853c959..946ec54855 100644 --- a/main.cc +++ b/main.cc @@ -1603,6 +1603,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl group0_service.abort().get(); }); + // Setup group0 early in case the node is bootsrapped already and the group exists + // Need to do it before allowing incomming messaging service connections since + // storage proxy's and migration manager's verbs may access group0 + group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local(), cdc_generation_service.local()).get(); + with_scheduling_group(maintenance_scheduling_group, [&] { return messaging.invoke_on_all([&token_metadata] (auto& netw) { return netw.start_listen(token_metadata.local()); diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 45fa26ce95..b50b33caf1 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -507,9 +507,7 @@ static future synchronize_schema( const noncopyable_function()>& can_finish_early, abort_source&); -future<> raft_group0::setup_group0( - db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes, - std::optional replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_servic) { +future raft_group0::use_raft() { assert(this_shard_id() == 0); if (!_raft_gr.is_enabled()) { @@ -517,31 +515,53 @@ future<> raft_group0::setup_group0( // Note: if the local feature was enabled by every node earlier, that would enable the cluster // SUPPORTS_RAFT feature, and the node should then refuse to start during feature check // (because if the local feature is disabled, then the cluster feature - enabled in the cluster - is 'unknown' to us). - co_return; + co_return false; } if (((co_await _client.get_group0_upgrade_state()).second) == group0_upgrade_state::recovery) { group0_log.warn("setup_group0: Raft RECOVERY mode, skipping group 0 setup."); + co_return false; + } + + co_return true; +} + +future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) { + if (!co_await use_raft()) { + co_return; + } + + if (!sys_ks.bootstrap_complete()) { + // If bootsrap did not complete yet there is no group0 to setup + co_return; + } + + auto group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()}; + if (group0_id) { + // Group 0 ID is present => we've already joined group 0 earlier. + group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server."); + co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_service); + } else { + // Scylla has bootstrapped earlier but group 0 ID not present. This means we're upgrading. + // Upgrade will start through a feature listener created after we enter NORMAL state. + // + // See `raft_group0::finish_setup_after_join`. + upgrade_log.info( + "setup_group0: Scylla bootstrap completed before but group 0 ID not present." + " Internal upgrade-to-raft procedure will automatically start after every node finishes" + " upgrading to the new Scylla version."); + } +} + +future<> raft_group0::setup_group0( + db::system_keyspace& sys_ks, const std::unordered_set& initial_contact_nodes, + std::optional replace_info, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service) { + if (!co_await use_raft()) { co_return; } if (sys_ks.bootstrap_complete()) { - auto group0_id = raft::group_id{co_await db::system_keyspace::get_raft_group0_id()}; - if (group0_id) { - // Group 0 ID is present => we've already joined group 0 earlier. - group0_log.info("setup_group0: group 0 ID present. Starting existing Raft server."); - co_await start_server_for_group0(group0_id, ss, qp, mm, cdc_gen_servic); - } else { - // Scylla has bootstrapped earlier but group 0 ID not present. This means we're upgrading. - // Upgrade will start through a feature listener created after we enter NORMAL state. - // - // See `raft_group0::finish_setup_after_join`. - upgrade_log.info( - "setup_group0: Scylla bootstrap completed before but group 0 ID not present." - " Internal upgrade-to-raft procedure will automatically start after every node finishes" - " upgrading to the new Scylla version."); - } - + // If the node is bootsraped the group0 server should be setup already co_return; } @@ -553,7 +573,7 @@ future<> raft_group0::setup_group0( } group0_log.info("setup_group0: joining group 0..."); - co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_servic); + co_await join_group0(std::move(seeds), false /* non-voter */, ss, qp, mm, cdc_gen_service); group0_log.info("setup_group0: successfully joined group 0."); if (replace_info) { diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index 6d1a2f18a5..cb8edb6a15 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -131,7 +131,6 @@ public: // // If the local RAFT feature is enabled, does one of the following: // - join group 0 (if we're bootstrapping), - // - start existing group 0 server (if we bootstrapped before), // - prepare us for the upgrade procedure, which will create group 0 later (if we're upgrading). // // Cannot be called twice. @@ -140,6 +139,14 @@ public: future<> setup_group0(db::system_keyspace&, const std::unordered_set& initial_contact_nodes, std::optional, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service); + // Call during the startup procedure before networking is enabled during restart + // + // If the local RAFT feature is enabled start existing group 0 server. + // + // Cannot be called twice. + // + future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, cdc::generation_service& cdc_gen_service); + // Call at the end of the startup procedure, after the node entered NORMAL state. // `setup_group0()` must have finished earlier. // @@ -306,6 +313,9 @@ private: // Load the initial Raft <-> IP address map as seen by // the gossiper. void load_initial_raft_address_map(); + + // Returns true if raft is enabled + future use_raft(); }; } // end of namespace service diff --git a/service/storage_service.cc b/service/storage_service.cc index 16d995b0ae..1aaeda5e37 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1575,11 +1575,11 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi std::optional cdc_gen_id; if (_sys_ks.local().was_decommissioned()) { - if (_db.local().get_config().override_decommission()) { + if (_db.local().get_config().override_decommission() && !_db.local().get_config().consistent_cluster_management()) { slogger.warn("This node was decommissioned, but overriding by operator request."); co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED); } else { - auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set," + auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set and consistent cluster management is not in use," "or all existing data is removed and the node is bootstrapped again"); slogger.error("{}", msg); throw std::runtime_error(msg); @@ -1726,6 +1726,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi co_await _gossiper.start_gossiping(generation_number, app_states, advertise); assert(_group0); + // if the node is bootstrapped the functin will do nothing since we already created group0 in main.cc co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info, *this, qp, _migration_manager.local(), cdc_gen_service); raft::server* raft_server = co_await [this] () -> future { @@ -2724,24 +2725,24 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service, _group0 = &group0; _raft_topology_change_enabled = _group0->is_raft_enabled() && _db.local().get_config().check_experimental(db::experimental_features_t::feature::RAFT); - return seastar::async([this, &cdc_gen_service, &sys_dist_ks, &proxy, &qp] { - set_mode(mode::STARTING); + set_mode(mode::STARTING); - std::unordered_set loaded_endpoints; - if (_db.local().get_config().load_ring_state()) { - slogger.info("Loading persisted ring state"); - auto loaded_tokens = _sys_ks.local().load_tokens().get0(); - auto loaded_host_ids = _sys_ks.local().load_host_ids().get0(); - auto loaded_dc_rack = _sys_ks.local().load_dc_rack_info().get0(); + std::unordered_set loaded_endpoints; + if (_db.local().get_config().load_ring_state() && !_raft_topology_change_enabled) { + slogger.info("Loading persisted ring state"); + auto loaded_tokens = co_await _sys_ks.local().load_tokens(); + auto loaded_host_ids = co_await _sys_ks.local().load_host_ids(); + auto loaded_dc_rack = co_await _sys_ks.local().load_dc_rack_info(); - auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) { - if (loaded_dc_rack.contains(ep)) { - return loaded_dc_rack[ep]; - } else { - return locator::endpoint_dc_rack::default_location; - } - }; + auto get_dc_rack = [&loaded_dc_rack] (inet_address ep) { + if (loaded_dc_rack.contains(ep)) { + return loaded_dc_rack[ep]; + } else { + return locator::endpoint_dc_rack::default_location; + } + }; + if (slogger.is_enabled(logging::log_level::debug)) { for (auto& x : loaded_tokens) { slogger.debug("Loaded tokens: endpoint={}, tokens={}", x.first, x.second); } @@ -2749,44 +2750,44 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service, for (auto& x : loaded_host_ids) { slogger.debug("Loaded host_id: endpoint={}, uuid={}", x.first, x.second); } + } - auto tmlock = get_token_metadata_lock().get0(); - auto tmptr = get_mutable_token_metadata_ptr().get0(); - for (auto x : loaded_tokens) { - auto ep = x.first; - auto tokens = x.second; - if (ep == get_broadcast_address()) { - // entry has been mistakenly added, delete it - _sys_ks.local().remove_endpoint(ep).get(); - } else { - tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal); - tmptr->update_normal_tokens(tokens, ep).get(); - if (loaded_host_ids.contains(ep)) { - tmptr->update_host_id(loaded_host_ids.at(ep), ep); - } - loaded_endpoints.insert(ep); - _gossiper.add_saved_endpoint(ep).get(); + auto tmlock = co_await get_token_metadata_lock(); + auto tmptr = co_await get_mutable_token_metadata_ptr(); + for (auto x : loaded_tokens) { + auto ep = x.first; + auto tokens = x.second; + if (ep == get_broadcast_address()) { + // entry has been mistakenly added, delete it + co_await _sys_ks.local().remove_endpoint(ep); + } else { + tmptr->update_topology(ep, get_dc_rack(ep), locator::node::state::normal); + co_await tmptr->update_normal_tokens(tokens, ep); + if (loaded_host_ids.contains(ep)) { + tmptr->update_host_id(loaded_host_ids.at(ep), ep); } + loaded_endpoints.insert(ep); + co_await _gossiper.add_saved_endpoint(ep); } - replicate_to_all_cores(std::move(tmptr)).get(); } + co_await replicate_to_all_cores(std::move(tmptr)); + } - // Seeds are now only used as the initial contact point nodes. If the - // loaded_endpoints are empty which means this node is a completely new - // node, we use the nodes specified in seeds as the initial contact - // point nodes, otherwise use the peer nodes persisted in system table. - auto seeds = _gossiper.get_seeds(); - auto initial_contact_nodes = loaded_endpoints.empty() ? - std::unordered_set(seeds.begin(), seeds.end()) : - loaded_endpoints; - auto loaded_peer_features = _sys_ks.local().load_peer_features().get0(); - slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); - for (auto& x : loaded_peer_features) { - slogger.info("peer={}, supported_features={}", x.first, x.second); - } - join_token_ring(cdc_gen_service, sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), qp).get(); - }); + // Seeds are now only used as the initial contact point nodes. If the + // loaded_endpoints are empty which means this node is a completely new + // node, we use the nodes specified in seeds as the initial contact + // point nodes, otherwise use the peer nodes persisted in system table. + auto seeds = _gossiper.get_seeds(); + auto initial_contact_nodes = loaded_endpoints.empty() ? + std::unordered_set(seeds.begin(), seeds.end()) : + loaded_endpoints; + auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); + slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", + initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + for (auto& x : loaded_peer_features) { + slogger.info("peer={}, supported_features={}", x.first, x.second); + } + co_return co_await join_token_ring(cdc_gen_service, sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), qp); } future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept {