topology: remove upgrade to raft topology code

We do no longer need this code since we expect that cluster to be
upgraded before moving to this version.
This commit is contained in:
Gleb Natapov
2026-01-29 14:32:15 +02:00
parent 4a9cf687cc
commit 92049c3205
7 changed files with 5 additions and 322 deletions

View File

@@ -1574,16 +1574,7 @@ rest_reload_raft_topology_state(sharded<service::storage_service>& ss, service::
static
future<json::json_return_type>
rest_upgrade_to_raft_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("Requested to schedule upgrade to raft topology");
try {
co_await ss.invoke_on(0, [] (auto& ss) {
return ss.start_upgrade_to_raft_topology();
});
} catch (...) {
auto ex = std::current_exception();
apilog.error("Failed to schedule upgrade to raft topology: {}", ex);
std::rethrow_exception(std::move(ex));
}
apilog.info("Requested to schedule upgrade to raft topology, but this version does not need it since it uses raft topology by default.");
co_return json_void();
}

View File

@@ -26,14 +26,6 @@ struct group0_peer_exchange {
std::variant<std::monostate, service::group0_info, std::vector<service::discovery_peer>> info;
};
enum class group0_upgrade_state : uint8_t {
recovery = 0,
use_pre_raft_procedures = 1,
synchronize = 2,
use_post_raft_procedures = 3,
};
verb [[with_client_info, cancellable]] get_group0_upgrade_state () -> service::group0_upgrade_state;
verb [[with_client_info, with_timeout, ip]] group0_peer_exchange (std::vector<service::discovery_peer> peers) -> service::group0_peer_exchange;
verb [[with_client_info, with_timeout]] group0_modify_config (raft::group_id gid, std::vector<raft::config_member> add, std::vector<raft::server_id> del);

View File

@@ -207,35 +207,15 @@ void raft_group0::init_rpc_verbs(raft_group0& shard0_this) {
return shard0_this._raft_gr.get_server(gid).modify_config(std::move(add), std::move(del), nullptr);
});
});
ser::group0_rpc_verbs::register_get_group0_upgrade_state(&shard0_this._ms.local(),
[&shard0_this] (const rpc::client_info&) -> future<group0_upgrade_state> {
return smp::submit_to(0, [&shard0_this]() -> future<group0_upgrade_state> {
const auto [holder, state] = co_await shard0_this._client.get_group0_upgrade_state();
co_return state;
});
});
}
future<> raft_group0::uninit_rpc_verbs(netw::messaging_service& ms) {
return when_all_succeed(
ser::group0_rpc_verbs::unregister_group0_peer_exchange(&ms),
ser::group0_rpc_verbs::unregister_group0_modify_config(&ms),
ser::group0_rpc_verbs::unregister_get_group0_upgrade_state(&ms)
ser::group0_rpc_verbs::unregister_group0_modify_config(&ms)
).discard_result();
}
static future<group0_upgrade_state> send_get_group0_upgrade_state(netw::messaging_service& ms, const locator::host_id addr, abort_source& as) {
auto state = co_await ser::group0_rpc_verbs::send_get_group0_upgrade_state(&ms, addr, as);
auto state_int = static_cast<int8_t>(state);
if (state_int > group0_upgrade_state_last) {
on_internal_error(upgrade_log, format(
"send_get_group0_upgrade_state: unknown value for `group0_upgrade_state` received from node {}: {}",
addr, state_int));
}
co_return state;
}
const raft::server_id& raft_group0::load_my_id() {
return _raft_gr.get_my_raft_id();
}
@@ -1188,43 +1168,6 @@ struct sleep_with_exponential_backoff {
}
};
future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) {
static constexpr auto rpc_timeout = std::chrono::seconds{5};
static constexpr auto max_concurrency = 10;
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
group0_members members0{_raft_gr.group0()};
auto current_config = members0.get_host_ids();
if (current_config.empty()) {
continue;
}
std::unordered_set<locator::host_id> pending_nodes{current_config.begin(), current_config.end()};
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const locator::host_id& node) -> future<> {
try {
upgrade_log.info("wait_for_everybody_to_finish_upgrade: `send_get_group0_upgrade_state({})`", node);
const auto upgrade_state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(_ms.local()), node));
if (upgrade_state == group0_upgrade_state::use_post_raft_procedures) {
pending_nodes.erase(node);
}
} catch (abort_requested_exception&) {
upgrade_log.warn("wait_for_everybody_to_finish_upgrade: abort requested during `send_get_group0_upgrade_state({})`", node);
throw;
} catch (...) {
upgrade_log.warn(
"wait_for_everybody_to_finish_upgrade: `send_get_group0_upgrade_state({})` failed: {}",
node, std::current_exception());
}
});
if (pending_nodes.empty()) {
co_return;
} else {
upgrade_log.warn("wait_for_everybody_to_finish_upgrade: nodes {} didn't finish upgrade yet", pending_nodes);
}
}
}
void raft_group0::register_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("raft_group0", {

View File

@@ -289,10 +289,6 @@ public:
// or when joining an old cluster which does not support JOIN_NODE RPC).
shared_ptr<group0_handshaker> make_legacy_handshaker(raft::is_voter can_vote);
// Waits until all upgrade to raft group 0 finishes and all nodes switched
// to use_post_raft_procedures.
future<> wait_for_all_nodes_to_finish_upgrade(abort_source& as);
raft_group0_client& client() {
return _client;
}

View File

@@ -1613,58 +1613,6 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
co_await _sys_ks.local().set_must_synchronize_topology(false);
}
future<> storage_service::start_upgrade_to_raft_topology() {
SCYLLA_ASSERT(this_shard_id() == 0);
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
co_return;
}
if ((co_await _group0->client().get_group0_upgrade_state()).second != group0_upgrade_state::use_post_raft_procedures) {
throw std::runtime_error(fmt::format("Upgrade to schema-on-raft didn't complete yet. It is a prerequisite for starting "
"upgrade to raft topology. Refusing to continue. Consult the documentation for more details: {}",
raft_upgrade_doc));
}
if (!_feature_service.supports_consistent_topology_changes) {
throw std::runtime_error("The SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES feature is not enabled yet. "
"Not all nodes in the cluster might support topology on raft yet. Make sure that "
"all nodes in the cluster are upgraded to the same version. Refusing to continue.");
}
if (auto unreachable = _gossiper.get_unreachable_nodes(); !unreachable.empty()) {
throw std::runtime_error(fmt::format(
"Nodes {} are seen as down. All nodes must be alive in order to start the upgrade. "
"Refusing to continue.",
unreachable));
}
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
co_return;
}
rtlogger.info("requesting to start upgrade to topology on raft");
topology_mutation_builder builder(guard.write_timestamp());
builder.set_upgrade_state(topology::upgrade_state_type::build_coordinator_state);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "upgrade: start");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("upgrade: concurrent operation is detected, retrying.");
continue;
}
};
rtlogger.info("upgrade to topology on raft is scheduled");
co_return;
}
topology::upgrade_state_type storage_service::get_topology_upgrade_state() const {
SCYLLA_ASSERT(this_shard_id() == 0);
return _topology_state_machine._topology.upgrade_state;

View File

@@ -1025,10 +1025,6 @@ public:
future<> do_clusterwide_vnodes_cleanup();
future<> reset_cleanup_needed();
// Starts the upgrade procedure to topology on raft.
// Must be called on shard 0.
future<> start_upgrade_to_raft_topology();
// Must be called on shard 0.
topology::upgrade_state_type get_topology_upgrade_state() const;

View File

@@ -3713,10 +3713,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
future<> refresh_tablet_load_stats();
future<> start_tablet_load_stats_refresher();
// Precondition: the state machine upgrade state is not at upgrade_state::done.
future<> do_upgrade_step(group0_guard);
future<> build_coordinator_state(group0_guard);
future<> await_event() {
_as.check();
co_await _topo_sm.event.when();
@@ -3767,8 +3763,6 @@ public:
_db.get_notifier().register_listener(this);
}
// Returns true if the upgrade was done, returns false if upgrade was interrupted.
future<bool> maybe_run_upgrade();
future<> run();
future<> stop();
@@ -4009,147 +4003,6 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() {
}
}
future<> topology_coordinator::do_upgrade_step(group0_guard guard) {
switch (_topo_sm._topology.upgrade_state) {
case topology::upgrade_state_type::not_upgraded:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator was started even though upgrade to raft topology was not requested yet")));
case topology::upgrade_state_type::build_coordinator_state:
utils::get_local_injector().inject("topology_coordinator_fail_to_build_state_during_upgrade", [] {
throw std::runtime_error("failed to build topology coordinator state due to error injection");
});
co_await build_coordinator_state(std::move(guard));
co_return;
case topology::upgrade_state_type::done:
on_internal_error(rtlogger, std::make_exception_ptr(std::runtime_error(
"topology_coordinator::do_upgrade_step called after upgrade was completed")));
}
}
future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
// Wait until all nodes reach use_post_raft_procedures
rtlogger.info("waiting for all nodes to finish upgrade to raft schema");
release_guard(std::move(guard));
co_await _group0.wait_for_all_nodes_to_finish_upgrade(_as);
auto tmptr = get_token_metadata_ptr();
auto sl_version = co_await _sys_ks.get_service_levels_version();
if (!sl_version || *sl_version < 2) {
rtlogger.info("migrating service levels data");
co_await qos::service_level_controller::migrate_to_v2(tmptr->get_normal_token_owners().size(), _sys_ks, _sys_ks.query_processor(), _group0.client(), _as);
}
auto auth_version = co_await _sys_ks.get_auth_version();
if (auth_version < db::system_keyspace::auth_version_t::v2) {
rtlogger.info("migrating system_auth keyspace data");
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
[this] (abort_source&) { return start_operation();}, _as);
}
rtlogger.info("building initial raft topology state and CDC generation");
guard = co_await start_operation();
auto get_application_state = [&] (locator::host_id host_id, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring {
const auto it = epmap.find(app_state);
if (it == epmap.end()) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {}: application state {} is missing in gossip",
host_id, app_state));
}
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
};
// Create a new CDC generation
auto get_sharding_info_for_host_id = [&] (locator::host_id host_id) -> std::pair<size_t, uint8_t> {
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("no gossiper endpoint state for node {}", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto shard_count = std::stoi(get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT));
const auto ignore_msb = std::stoi(get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS));
return std::make_pair<size_t, uint8_t>(shard_count, ignore_msb);
};
auto [cdc_gen_uuid, guard_, mutation] = co_await prepare_and_broadcast_cdc_generation_data(tmptr, std::move(guard), std::nullopt, get_sharding_info_for_host_id);
guard = std::move(guard_);
topology_mutation_builder builder(guard.write_timestamp());
std::set<sstring> enabled_features;
// Build per-node state
for (const auto& node: tmptr->get_topology().get_nodes()) {
if (!node.get().is_member()) {
continue;
}
const auto& host_id = node.get().host_id();
const auto eptr = _gossiper.get_endpoint_state_ptr(host_id);
if (!eptr) {
throw std::runtime_error(format("failed to build initial raft topology state from gossip for node {} as gossip contains no data for it", host_id));
}
const auto& epmap = eptr->get_application_state_map();
const auto datacenter = get_application_state(host_id, epmap, gms::application_state::DC);
const auto rack = get_application_state(host_id, epmap, gms::application_state::RACK);
const auto tokens_v = tmptr->get_tokens(host_id);
const std::unordered_set<dht::token> tokens(tokens_v.begin(), tokens_v.end());
const auto release_version = get_application_state(host_id, epmap, gms::application_state::RELEASE_VERSION);
const auto num_tokens = tokens.size();
const auto shard_count = get_application_state(host_id, epmap, gms::application_state::SHARD_COUNT);
const auto ignore_msb = get_application_state(host_id, epmap, gms::application_state::IGNORE_MSB_BITS);
const auto supported_features_s = get_application_state(host_id, epmap, gms::application_state::SUPPORTED_FEATURES);
const auto supported_features = gms::feature_service::to_feature_set(supported_features_s);
if (enabled_features.empty()) {
enabled_features = supported_features;
} else {
std::erase_if(enabled_features, [&] (const sstring& f) { return !supported_features.contains(f); });
}
builder.with_node(raft::server_id(host_id.uuid()))
.set("datacenter", datacenter)
.set("rack", rack)
.set("tokens", tokens)
.set("node_state", node_state::normal)
.set("release_version", release_version)
.set("num_tokens", (uint32_t)num_tokens)
.set("tokens_string", "")
.set("shard_count", (uint32_t)std::stoi(shard_count))
.set("ignore_msb", (uint32_t)std::stoi(ignore_msb))
.set("cleanup_status", cleanup_status::clean)
.set("request_id", utils::UUID())
.set("supported_features", supported_features);
rtlogger.debug("node {} will contain the following parameters: "
"datacenter={}, rack={}, tokens={}, shard_count={}, ignore_msb={}, supported_features={}",
host_id, datacenter, rack, tokens, shard_count, ignore_msb, supported_features);
}
// Build the static columns
const bool add_ts_delay = true; // This is not the first generation, so add delay
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay);
const cdc::generation_id_v2 cdc_gen_id {
.ts = cdc_gen_ts,
.id = cdc_gen_uuid,
};
builder.set_version(topology::initial_version)
.set_fence_version(topology::initial_version)
.add_new_committed_cdc_generation(cdc_gen_id)
.add_enabled_features(std::move(enabled_features));
// Commit
builder.set_upgrade_state(topology::upgrade_state_type::done);
auto reason = "upgrade: build the initial state";
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
}
future<> topology_coordinator::fence_previous_coordinator() {
// Write empty change to make sure that a guard taken by any previous coordinator cannot
// be used to do a successful write any more. Otherwise the following can theoretically happen
@@ -4270,39 +4123,6 @@ bool topology_coordinator::handle_topology_coordinator_error(std::exception_ptr
return false;
}
future<bool> topology_coordinator::maybe_run_upgrade() {
if (_topo_sm._topology.upgrade_state == topology::upgrade_state_type::done) {
// Upgrade was already done, nothing to do
co_return true;
}
rtlogger.info("topology coordinator fiber is upgrading the cluster to raft topology mode");
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
});
while (!_as.abort_requested() && _topo_sm._topology.upgrade_state != topology::upgrade_state_type::done) {
bool sleep = false;
try {
auto guard = co_await start_operation();
co_await do_upgrade_step(std::move(guard));
} catch (...) {
sleep = handle_topology_coordinator_error(std::current_exception());
}
if (sleep) {
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
rtlogger.debug("sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_return !_as.abort_requested();
}
future<> topology_coordinator::run() {
auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.broadcast();
@@ -4447,12 +4267,9 @@ future<> run_topology_coordinator(
lifecycle_notifier.register_subscriber(&coordinator);
try {
rtlogger.info("start topology coordinator fiber");
const bool upgrade_done = co_await coordinator.maybe_run_upgrade();
if (upgrade_done) {
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
return coordinator.run();
});
}
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
return coordinator.run();
});
} catch (...) {
ex = std::current_exception();
}