From 92049c3205eb09ae083c5a539febf799babbe316 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 29 Jan 2026 14:32:15 +0200 Subject: [PATCH] topology: remove upgrade to raft topology code We do no longer need this code since we expect that cluster to be upgraded before moving to this version. --- api/storage_service.cc | 11 +- idl/group0.idl.hh | 8 -- service/raft/raft_group0.cc | 59 +--------- service/raft/raft_group0.hh | 4 - service/storage_service.cc | 52 --------- service/storage_service.hh | 4 - service/topology_coordinator.cc | 189 +------------------------------- 7 files changed, 5 insertions(+), 322 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 2389fa9256..29b6a77076 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1574,16 +1574,7 @@ rest_reload_raft_topology_state(sharded& ss, service:: static future rest_upgrade_to_raft_topology(sharded& ss, std::unique_ptr req) { - apilog.info("Requested to schedule upgrade to raft topology"); - try { - co_await ss.invoke_on(0, [] (auto& ss) { - return ss.start_upgrade_to_raft_topology(); - }); - } catch (...) { - auto ex = std::current_exception(); - apilog.error("Failed to schedule upgrade to raft topology: {}", ex); - std::rethrow_exception(std::move(ex)); - } + apilog.info("Requested to schedule upgrade to raft topology, but this version does not need it since it uses raft topology by default."); co_return json_void(); } diff --git a/idl/group0.idl.hh b/idl/group0.idl.hh index 0f750adff0..85e37a5848 100644 --- a/idl/group0.idl.hh +++ b/idl/group0.idl.hh @@ -26,14 +26,6 @@ struct group0_peer_exchange { std::variant> info; }; -enum class group0_upgrade_state : uint8_t { - recovery = 0, - use_pre_raft_procedures = 1, - synchronize = 2, - use_post_raft_procedures = 3, -}; - -verb [[with_client_info, cancellable]] get_group0_upgrade_state () -> service::group0_upgrade_state; verb [[with_client_info, with_timeout, ip]] group0_peer_exchange (std::vector peers) -> service::group0_peer_exchange; verb [[with_client_info, with_timeout]] group0_modify_config (raft::group_id gid, std::vector add, std::vector del); diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index c3394991fa..ee4e0d607d 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -207,35 +207,15 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) { return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr); }); }); - - ser::group0_rpc_verbs::register_get_group0_upgrade_state(&shard0_this._ms.local(), - [&shard0_this] (const rpc::client_info&) -> future { - return smp::submit_to(0, [&shard0_this]() -> future { - const auto [holder, state] = co_await shard0_this._client.get_group0_upgrade_state(); - co_return state; - }); - }); } future<> raft_group0::uninit_rpc_verbs(netw::messaging_service& ms) { return when_all_succeed( ser::group0_rpc_verbs::unregister_group0_peer_exchange(&ms), - ser::group0_rpc_verbs::unregister_group0_modify_config(&ms), - ser::group0_rpc_verbs::unregister_get_group0_upgrade_state(&ms) + ser::group0_rpc_verbs::unregister_group0_modify_config(&ms) ).discard_result(); } -static future send_get_group0_upgrade_state(netw::messaging_service& ms, const locator::host_id addr, abort_source& as) { - auto state = co_await ser::group0_rpc_verbs::send_get_group0_upgrade_state(&ms, addr, as); - auto state_int = static_cast(state); - if (state_int > group0_upgrade_state_last) { - on_internal_error(upgrade_log, format( - "send_get_group0_upgrade_state: unknown value for `group0_upgrade_state` received from node {}: {}", - addr, state_int)); - } - co_return state; -} - const raft::server_id& raft_group0::load_my_id() { return _raft_gr.get_my_raft_id(); } @@ -1188,43 +1168,6 @@ struct sleep_with_exponential_backoff { } }; -future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) { - static constexpr auto rpc_timeout = std::chrono::seconds{5}; - static constexpr auto max_concurrency = 10; - - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - group0_members members0{_raft_gr.group0()}; - auto current_config = members0.get_host_ids(); - if (current_config.empty()) { - continue; - } - - std::unordered_set pending_nodes{current_config.begin(), current_config.end()}; - co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const locator::host_id& node) -> future<> { - try { - upgrade_log.info("wait_for_everybody_to_finish_upgrade: `send_get_group0_upgrade_state({})`", node); - const auto upgrade_state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(_ms.local()), node)); - if (upgrade_state == group0_upgrade_state::use_post_raft_procedures) { - pending_nodes.erase(node); - } - } catch (abort_requested_exception&) { - upgrade_log.warn("wait_for_everybody_to_finish_upgrade: abort requested during `send_get_group0_upgrade_state({})`", node); - throw; - } catch (...) { - upgrade_log.warn( - "wait_for_everybody_to_finish_upgrade: `send_get_group0_upgrade_state({})` failed: {}", - node, std::current_exception()); - } - }); - - if (pending_nodes.empty()) { - co_return; - } else { - upgrade_log.warn("wait_for_everybody_to_finish_upgrade: nodes {} didn't finish upgrade yet", pending_nodes); - } - } -} - void raft_group0::register_metrics() { namespace sm = seastar::metrics; _metrics.add_group("raft_group0", { diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index e821baae16..0f6d64fdbb 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -289,10 +289,6 @@ public: // or when joining an old cluster which does not support JOIN_NODE RPC). shared_ptr make_legacy_handshaker(raft::is_voter can_vote); - // Waits until all upgrade to raft group 0 finishes and all nodes switched - // to use_post_raft_procedures. - future<> wait_for_all_nodes_to_finish_upgrade(abort_source& as); - raft_group0_client& client() { return _client; } diff --git a/service/storage_service.cc b/service/storage_service.cc index 7166496b9d..cd7014593a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1613,58 +1613,6 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft co_await _sys_ks.local().set_must_synchronize_topology(false); } -future<> storage_service::start_upgrade_to_raft_topology() { - SCYLLA_ASSERT(this_shard_id() == 0); - - if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) { - co_return; - } - - if ((co_await _group0->client().get_group0_upgrade_state()).second != group0_upgrade_state::use_post_raft_procedures) { - throw std::runtime_error(fmt::format("Upgrade to schema-on-raft didn't complete yet. It is a prerequisite for starting " - "upgrade to raft topology. Refusing to continue. Consult the documentation for more details: {}", - raft_upgrade_doc)); - } - - if (!_feature_service.supports_consistent_topology_changes) { - throw std::runtime_error("The SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES feature is not enabled yet. " - "Not all nodes in the cluster might support topology on raft yet. Make sure that " - "all nodes in the cluster are upgraded to the same version. Refusing to continue."); - } - - if (auto unreachable = _gossiper.get_unreachable_nodes(); !unreachable.empty()) { - throw std::runtime_error(fmt::format( - "Nodes {} are seen as down. All nodes must be alive in order to start the upgrade. " - "Refusing to continue.", - unreachable)); - } - - while (true) { - auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{}); - - if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) { - co_return; - } - - rtlogger.info("requesting to start upgrade to topology on raft"); - topology_mutation_builder builder(guard.write_timestamp()); - builder.set_upgrade_state(topology::upgrade_state_type::build_coordinator_state); - topology_change change{{builder.build()}}; - group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "upgrade: start"); - - try { - co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{}); - break; - } catch (group0_concurrent_modification&) { - rtlogger.info("upgrade: concurrent operation is detected, retrying."); - continue; - } - }; - - rtlogger.info("upgrade to topology on raft is scheduled"); - co_return; -} - topology::upgrade_state_type storage_service::get_topology_upgrade_state() const { SCYLLA_ASSERT(this_shard_id() == 0); return _topology_state_machine._topology.upgrade_state; diff --git a/service/storage_service.hh b/service/storage_service.hh index 1d1c327124..42d1141b94 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1025,10 +1025,6 @@ public: future<> do_clusterwide_vnodes_cleanup(); future<> reset_cleanup_needed(); - // Starts the upgrade procedure to topology on raft. - // Must be called on shard 0. - future<> start_upgrade_to_raft_topology(); - // Must be called on shard 0. topology::upgrade_state_type get_topology_upgrade_state() const; diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 527a78ab25..e1d0c64411 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -3713,10 +3713,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber future<> refresh_tablet_load_stats(); future<> start_tablet_load_stats_refresher(); - // Precondition: the state machine upgrade state is not at upgrade_state::done. - future<> do_upgrade_step(group0_guard); - future<> build_coordinator_state(group0_guard); - future<> await_event() { _as.check(); co_await _topo_sm.event.when(); @@ -3767,8 +3763,6 @@ public: _db.get_notifier().register_listener(this); } - // Returns true if the upgrade was done, returns false if upgrade was interrupted. - future maybe_run_upgrade(); future<> run(); future<> stop(); @@ -4009,147 +4003,6 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() { } } -future<> topology_coordinator::do_upgrade_step(group0_guard guard) { - switch (_topo_sm._topology.upgrade_state) { - case topology::upgrade_state_type::not_upgraded: - on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error( - "topology_coordinator was started even though upgrade to raft topology was not requested yet"))); - - case topology::upgrade_state_type::build_coordinator_state: - utils::get_local_injector().inject("topology_coordinator_fail_to_build_state_during_upgrade", [] { - throw std::runtime_error("failed to build topology coordinator state due to error injection"); - }); - co_await build_coordinator_state(std::move(guard)); - co_return; - - case topology::upgrade_state_type::done: - on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error( - "topology_coordinator::do_upgrade_step called after upgrade was completed"))); - } -} - -future<> topology_coordinator::build_coordinator_state(group0_guard guard) { - // Wait until all nodes reach use_post_raft_procedures - rtlogger.info("waiting for all nodes to finish upgrade to raft schema"); - release_guard(std::move(guard)); - co_await _group0.wait_for_all_nodes_to_finish_upgrade(_as); - - auto tmptr = get_token_metadata_ptr(); - - auto sl_version = co_await _sys_ks.get_service_levels_version(); - if (!sl_version || *sl_version < 2) { - rtlogger.info("migrating service levels data"); - co_await qos::service_level_controller::migrate_to_v2(tmptr->get_normal_token_owners().size(), _sys_ks, _sys_ks.query_processor(), _group0.client(), _as); - } - - auto auth_version = co_await _sys_ks.get_auth_version(); - if (auth_version < db::system_keyspace::auth_version_t::v2) { - rtlogger.info("migrating system_auth keyspace data"); - co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(), - [this] (abort_source&) { return start_operation();}, _as); - } - - rtlogger.info("building initial raft topology state and CDC generation"); - guard = co_await start_operation(); - - auto get_application_state = [&] (locator::host_id host_id, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring { - const auto it = epmap.find(app_state); - if (it == epmap.end()) { - throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}: application state {} is missing in gossip", - host_id, app_state)); - } - // it's versioned_value::value(), not std::optional::value() - it does not throw - return it->second.value(); - }; - - // Create a new CDC generation - auto get_sharding_info_for_host_id = [&] (locator::host_id host_id) -> std::pair { - const auto eptr = _gossiper.get_endpoint_state_ptr(host_id); - if (!eptr) { - throw std::runtime_error(format("no gossiper endpoint state for node {}", host_id)); - } - const auto& epmap = eptr->get_application_state_map(); - const auto shard_count = std::stoi(get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT)); - const auto ignore_msb = std::stoi(get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS)); - return std::make_pair(shard_count, ignore_msb); - }; - auto [cdc_gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt, get_sharding_info_for_host_id); - guard = std::move(guard_); - - topology_mutation_builder builder(guard.write_timestamp()); - - std::set enabled_features; - - // Build per-node state - for (const auto& node: tmptr->get_topology().get_nodes()) { - if (!node.get().is_member()) { - continue; - } - - const auto& host_id = node.get().host_id(); - const auto eptr = _gossiper.get_endpoint_state_ptr(host_id); - if (!eptr) { - throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {} as gossip contains no data for it", host_id)); - } - - const auto& epmap = eptr->get_application_state_map(); - - const auto datacenter = get_application_state(host_id, epmap, gms::application_state::DC); - const auto rack = get_application_state(host_id, epmap, gms::application_state::RACK); - const auto tokens_v = tmptr->get_tokens(host_id); - const std::unordered_set tokens(tokens_v.begin(), tokens_v.end()); - const auto release_version = get_application_state(host_id, epmap, gms::application_state::RELEASE_VERSION); - const auto num_tokens = tokens.size(); - const auto shard_count = get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT); - const auto ignore_msb = get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS); - const auto supported_features_s = get_application_state(host_id, epmap, gms::application_state::SUPPORTED_FEATURES); - const auto supported_features = gms::feature_service::to_feature_set(supported_features_s); - - if (enabled_features.empty()) { - enabled_features = supported_features; - } else { - std::erase_if(enabled_features, [&] (const sstring& f) { return !supported_features.contains(f); }); - } - - builder.with_node(raft::server_id(host_id.uuid())) - .set("datacenter", datacenter) - .set("rack", rack) - .set("tokens", tokens) - .set("node_state", node_state::normal) - .set("release_version", release_version) - .set("num_tokens", (uint32_t)num_tokens) - .set("tokens_string", "") - .set("shard_count", (uint32_t)std::stoi(shard_count)) - .set("ignore_msb", (uint32_t)std::stoi(ignore_msb)) - .set("cleanup_status", cleanup_status::clean) - .set("request_id", utils::UUID()) - .set("supported_features", supported_features); - - rtlogger.debug("node {} will contain the following parameters: " - "datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}", - host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features); - } - - // Build the static columns - const bool add_ts_delay = true; // This is not the first generation, so add delay - auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay); - - const cdc::generation_id_v2 cdc_gen_id { - .ts = cdc_gen_ts, - .id = cdc_gen_uuid, - }; - - builder.set_version(topology::initial_version) - .set_fence_version(topology::initial_version) - .add_new_committed_cdc_generation(cdc_gen_id) - .add_enabled_features(std::move(enabled_features)); - - // Commit - builder.set_upgrade_state(topology::upgrade_state_type::done); - auto reason = "upgrade: build the initial state"; - co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason); -} - future<> topology_coordinator::fence_previous_coordinator() { // Write empty change to make sure that a guard taken by any previous coordinator cannot // be used to do a successful write any more. Otherwise the following can theoretically happen @@ -4270,39 +4123,6 @@ bool topology_coordinator::handle_topology_coordinator_error(std::exception_ptr return false; } -future topology_coordinator::maybe_run_upgrade() { - if (_topo_sm._topology.upgrade_state == topology::upgrade_state_type::done) { - // Upgrade was already done, nothing to do - co_return true; - } - - rtlogger.info("topology coordinator fiber is upgrading the cluster to raft topology mode"); - - auto abort = _as.subscribe([this] () noexcept { - _topo_sm.event.broadcast(); - }); - - while (!_as.abort_requested() && _topo_sm._topology.upgrade_state != topology::upgrade_state_type::done) { - bool sleep = false; - try { - auto guard = co_await start_operation(); - co_await do_upgrade_step(std::move(guard)); - } catch (...) { - sleep = handle_topology_coordinator_error(std::current_exception()); - } - if (sleep) { - try { - co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); - } catch (...) { - rtlogger.debug("sleep failed: {}", std::current_exception()); - } - } - co_await coroutine::maybe_yield(); - } - - co_return !_as.abort_requested(); -} - future<> topology_coordinator::run() { auto abort = _as.subscribe([this] () noexcept { _topo_sm.event.broadcast(); @@ -4447,12 +4267,9 @@ future<> run_topology_coordinator( lifecycle_notifier.register_subscriber(&coordinator); try { rtlogger.info("start topology coordinator fiber"); - const bool upgrade_done = co_await coordinator.maybe_run_upgrade(); - if (upgrade_done) { - co_await with_scheduling_group(group0.get_scheduling_group(), [&] { - return coordinator.run(); - }); - } + co_await with_scheduling_group(group0.get_scheduling_group(), [&] { + return coordinator.run(); + }); } catch (...) { ex = std::current_exception(); }