diff --git a/service/raft/group0_fwd.hh b/service/raft/group0_fwd.hh index 825a439008..c842fb3163 100644 --- a/service/raft/group0_fwd.hh +++ b/service/raft/group0_fwd.hh @@ -57,7 +57,7 @@ enum class group0_upgrade_state : uint8_t { // Schema changes may still arrive from other nodes for some time. However, if no failures occur // during the upgrade procedure, eventually all nodes should enter `synchronize` state. Then // the nodes ensure that schema is synchronized across the entire cluster before entering `use_post_raft_procedures`. - synchronize = 2, + synchronize = 2, // Deprecated // In `use_post_raft_procedures` state the upgrade is finished. The node performs schema changes // using group 0, i.e. by constructing appropriate Raft commands and sending them to the Raft group 0 cluster. diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 94b0ac271e..c3394991fa 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -694,16 +694,6 @@ struct group0_members { } }; -static future wait_for_peers_to_enter_synchronize_state( - const group0_members& members0, netw::messaging_service&, abort_source&, gate::holder pause_shutdown); -static future anyone_finished_upgrade( - const group0_members& members0, netw::messaging_service&, abort_source&); -static future synchronize_schema( - replica::database&, netw::messaging_service&, - const group0_members& members0, service::migration_manager&, - const noncopyable_function()>& can_finish_early, - abort_source&); - future raft_group0::use_raft() { SCYLLA_ASSERT(this_shard_id() == 0); @@ -799,44 +789,6 @@ future<> raft_group0::setup_group0( throw std::runtime_error{"injection: stop_after_joining_group0"}; }); - // Enter `synchronize` upgrade state in case the cluster we're joining has recently enabled Raft - // and is currently in the middle of `upgrade_to_group0()`. For that procedure to finish - // every member of group 0 (now including us) needs to enter `synchronize` state. - co_await _client.set_group0_upgrade_state(group0_upgrade_state::synchronize); - - group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft..."); - auto& group0_server = _raft_gr.group0(); - group0_members members0{group0_server}; - - // Perform a Raft read barrier so we know the set of group 0 members and our group 0 state is up-to-date. - co_await group0_server.read_barrier(&_abort_source); - - auto cfg = group0_server.get_configuration(); - if (!cfg.is_joint() && cfg.current.size() == 1) { - group0_log.info("setup_group0: we're the only member of the cluster."); - } else if (ss.raft_topology_change_enabled()) { - group0_log.info("setup_group0: cluster uses raft for topology. No need to sync schema."); - } else { - // We're joining an existing cluster - we're not the only member. - // - // Wait until one of group 0 members enters `group0_upgrade_state::use_post_raft_procedures` - // or all members enter `group0_upgrade_state::synchronize`. - // - // In a fully upgraded cluster this should finish immediately (if the network works well) - everyone is in `use_post_raft_procedures`. - // In a cluster that is currently in the middle of `upgrade_to_group0`, this will cause us to wait until the procedure finishes. - group0_log.info("setup_group0: waiting for peers to synchronize state..."); - if (co_await wait_for_peers_to_enter_synchronize_state(members0, _ms.local(), _abort_source, _shutdown_gate.hold())) { - // Everyone entered `synchronize` state. That means we're bootstrapping in the middle of `upgrade_to_group0`. - // We need to finish upgrade as others do. - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms.local()), std::ref(_abort_source)); - co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source); - } - } - - co_await utils::get_local_injector().inject("sleep_in_synchronize", [](auto& handler) -> future<> { - co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); - }); - group0_log.info("setup_group0: the cluster is ready to use Raft. Finishing."); co_await _client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures); } @@ -873,24 +825,10 @@ future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3 // We're either upgrading or in recovery mode. } - // Note: even if we joined group 0 before, it doesn't necessarily mean we finished with the whole - // upgrade procedure which has follow-up steps after joining group 0, hence we need to prepare - // the listener below (or fire the upgrade procedure if the feature is already enabled) even if - // we're already a member of group 0 by this point. - // `upgrade_to_group0()` will correctly detect and handle which case we're in: whether the procedure - // has already finished or we restarted in the middle after a crash. - if (!_feat.supports_raft_cluster_mgmt) { - group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, scheduling upgrade to start when it is."); + throw std::runtime_error("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, but was expected to be enabled at this point." + " If you are trying to upgrade a node pf a cluster that is not using Raft yet, this is no longer supported."); } - - // The listener may fire immediately, create a thread for that case. - co_await seastar::async([this, &ss, &qp, &mm, topology_change_enabled] { - _raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm, topology_change_enabled] { - group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature enabled. Starting internal upgrade-to-raft procedure."); - upgrade_to_group0(ss, qp, mm, topology_change_enabled).get(); - }); - }); } bool raft_group0::is_member(raft::server_id id, bool include_voters_only) { @@ -982,47 +920,6 @@ future raft_group0::wait_for_raft() { co_return false; } - if (!_feat.supports_raft_cluster_mgmt) { - // The Raft feature is not yet enabled. - // - // In that case we assume the cluster is in a partially-upgraded state (some nodes still use a version - // without the raft/group 0 code enabled) and skip the reconfiguration. - // - // In theory we could be wrong: the cluster may have just upgraded but the user managed to start - // a topology operation before we noticed it. Removing a node at this point could cause `upgrade_to_group0` - // on other nodes to get stuck because it requires contacting all peers which may include the removed node. - // - // This is unlikely and shouldn't happen if rolling upgrade is performed correctly. - // If it does happen, there's always the possibility of performing the manual group 0 recovery procedure. - group0_log.warn( - "Raft feature not enabled yet. Assuming that the cluster is partially upgraded" - " and skipping group 0 reconfiguration. However, if you already finished the rolling upgrade" - " procedure, this means that the node just hasn't noticed it yet. The internal upgrade-to-raft procedure" - " may get stuck. If that happens, manual recovery will be required." - " Consult the documentation for more details: {}", - raft_upgrade_doc); - co_return false; - } - - if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) { - // The feature is enabled but `upgrade_to_group0` did not finish yet. - // The upgrade procedure requires everyone to participate. In order to not block the cluster - // from doing the upgrades, we'll wait until we finish our procedure before doing the reconfiguration. - // - // Note: in theory it could happen that a node is removed/leaves the cluster immediately after - // it finishes upgrade, causing others to get stuck (trying to contact the node that left). - // It's unlikely and can be recovered from using the manual group 0 recovery procedure. - group0_log.info("Waiting until cluster fully upgrades to use Raft before proceeding..."); - co_await _client.wait_until_group0_upgraded(_abort_source); - group0_log.info("Cluster finished Raft upgrade procedure."); - } - - // We're fully upgraded, we must have joined group 0. - if (!joined_group0()) { - on_internal_error(group0_log, - "We're fully upgraded to use Raft but didn't join group 0. Please report a bug."); - } - group0_log.info("Performing a group 0 read barrier..."); co_await _raft_gr.group0().read_barrier(&_abort_source); group0_log.info("Finished group 0 read barrier."); @@ -1291,110 +1188,6 @@ struct sleep_with_exponential_backoff { } }; - -// Precondition: we joined group 0 and the server is running. -// Assumes we don't leave group 0 while running. -static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const group0_members& members0, abort_source& as) { - - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - // We fetch both config and peers on each iteration; we don't assume that they don't change. - // No new node should join while the procedure is running, but nodes may leave. - - auto current_config = members0.get_host_ids(); - if (current_config.empty()) { - // Not all addresses are known - continue; - } - std::sort(current_config.begin(), current_config.end()); - - auto peers = co_await sys_ks.load_peers_ids(); - std::sort(peers.begin(), peers.end()); - - std::vector missing_peers; - std::set_difference(peers.begin(), peers.end(), current_config.begin(), current_config.end(), std::back_inserter(missing_peers)); - - if (missing_peers.empty()) { - if (!members0.is_joint()) { - co_return; - } - - upgrade_log.info("group 0 configuration is joint: {}.", current_config); - continue; - } - - upgrade_log.info( - "group 0 configuration does not contain all peers yet." - " Missing peers: {}. Current group 0 config: {}.", - missing_peers, current_config); - } -} - -// Check if anyone entered `use_post_raft_procedures`. -// This is a best-effort single round-trip check; we don't retry if some nodes fail to answer. -static future anyone_finished_upgrade( - const group0_members& members0, netw::messaging_service& ms, abort_source& as) { - static constexpr auto max_concurrency = 10; - static constexpr auto rpc_timeout = std::chrono::seconds{5}; - - auto current_config = members0.get_host_ids(); - bool finished = false; - co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const locator::host_id& node) -> future<> { - try { - auto state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node)); - if (state == group0_upgrade_state::use_post_raft_procedures) { - finished = true; - } - } catch (abort_requested_exception&) { - upgrade_log.warn("anyone_finished_upgrade: abort requested during `send_get_group0_upgrade_state({})`", node); - throw; - } catch (...) { - // XXX: are there possible fatal errors which should cause us to abort the entire procedure? - upgrade_log.warn("anyone_finished_upgrade: `send_get_group0_upgrade_state({})` failed: {}", node, std::current_exception()); - } - }); - co_return finished; -} - -// Check if it's possible to reach everyone through `get_group0_upgrade_state` RPC. -static future<> check_remote_group0_upgrade_state_dry_run( - const noncopyable_function>()>& get_host_ids, - netw::messaging_service& ms, abort_source& as) { - static constexpr auto rpc_timeout = std::chrono::seconds{5}; - static constexpr auto max_concurrency = 10; - - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - // Note: we strive to get a response from everyone in a 'single round-trip', - // so we don't skip nodes which responded in earlier iterations. - // We contact everyone in each iteration even if some of these guys already answered. - // We fetch peers again on every attempt to handle the possibility of leaving nodes. - auto cluster_config = co_await get_host_ids(); - if (cluster_config.empty()) { - continue; - } - - bool retry = false; - co_await max_concurrent_for_each(cluster_config, max_concurrency, [&] (const locator::host_id& node) -> future<> { - try { - upgrade_log.info("check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})`", node); - co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node)); - } catch (abort_requested_exception&) { - upgrade_log.warn("check_remote_group0_upgrade_state_dry_run: abort requested during `send_get_group0_upgrade_state({})`", node); - throw; - } catch (...) { - // XXX: are there possible fatal errors which should cause us to abort the entire procedure? - upgrade_log.warn( - "check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})` failed: {}", - node, std::current_exception()); - retry = true; - } - }); - - if (!retry) { - co_return; - } - } -} - future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) { static constexpr auto rpc_timeout = std::chrono::seconds{5}; static constexpr auto max_concurrency = 10; @@ -1432,423 +1225,6 @@ future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) { } } -// Wait until all members of group 0 enter `group0_upgrade_state::synchronize` or some node enters -// `group0_upgrade_state::use_post_raft_procedures` (the latter meaning upgrade is finished and we can also finish). -// -// Precondition: we're in `synchronize` state. -// -// Returns `true` if we finished because everybody entered `synchronize`. -// Returns `false` if we finished because somebody entered `use_post_raft_procedures`. -static future wait_for_peers_to_enter_synchronize_state( - const group0_members& members0, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) { - static constexpr auto rpc_timeout = std::chrono::seconds{5}; - static constexpr auto max_concurrency = 10; - - auto entered_synchronize = make_lw_shared>(); - - // This is a work-around for boost tests where RPC module is not listening so we cannot contact ourselves. - // But really, except the (arguably broken) test code, we don't need to be treated as an edge case. All nodes are symmetric. - // For production code this line is unnecessary. - entered_synchronize->insert(ms.host_id()); - - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - // We fetch the config again on every attempt to handle the possibility of removing failed nodes. - auto current_members_set = members0.get_members(); - - ::tracker tracker; - auto retry = make_lw_shared(false); - - auto sub = as.subscribe([tracker] () mutable noexcept { - tracker.set_exception(std::make_exception_ptr(abort_requested_exception{})); - }); - if (!sub) { - upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: abort requested"); - throw abort_requested_exception{}; - } - - (void) [] (netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown, - raft::config_member_set current_members_set, group0_members members0, - lw_shared_ptr> entered_synchronize, - lw_shared_ptr retry, ::tracker tracker) -> future<> { - co_await max_concurrent_for_each(current_members_set, max_concurrency, [&] (const raft::config_member& member) -> future<> { - locator::host_id node{member.addr.id.uuid()}; - - if (entered_synchronize->contains(node)) { - co_return; - } - - try { - auto state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node)); - if (tracker.finished()) { - // A response from another node caused us to finish already. - co_return; - } - - switch (state) { - case group0_upgrade_state::use_post_raft_procedures: - upgrade_log.info("wait_for_peers_to_enter_synchronize_state: {} confirmed that they finished upgrade.", node); - tracker.set_value(true); - break; - case group0_upgrade_state::synchronize: - entered_synchronize->insert(node); - break; - default: - upgrade_log.info("wait_for_peers_to_enter_synchronize_state: node {} not in synchronize state yet...", node); - *retry = true; - } - } catch (abort_requested_exception&) { - upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: abort requested during `send_get_group0_upgrade_state({})`", node); - tracker.set_exception(std::current_exception()); - } catch (...) { - // XXX: are there possible fatal errors which should cause us to abort the entire procedure? - upgrade_log.warn( - "wait_for_peers_to_enter_synchronize_state: `send_get_group0_upgrade_state({})` failed: {}", - node, std::current_exception()); - *retry = true; - } - }); - - tracker.set_value(false); - }(ms, as, pause_shutdown, std::move(current_members_set), members0, entered_synchronize, retry, tracker); - - auto finish_early = co_await tracker.get(); - if (finish_early) { - co_return false; - } - - if (!*retry) { - co_return true; - } - } -} - -// Returning nullopt means we finished early (`can_finish_early` returned true). -static future>> -collect_schema_versions_from_group0_members( - netw::messaging_service& ms, const group0_members& members0, - const noncopyable_function()>& can_finish_early, - abort_source& as) { - static constexpr auto rpc_timeout = std::chrono::seconds{5}; - static constexpr auto max_concurrency = 10; - - std::unordered_map versions; - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - - // We fetch the config on each iteration; some nodes may leave. - auto current_config = members0.get_members() | - std::views::transform([] (auto m) { return locator::host_id{m.addr.id.uuid()}; }) | - std::ranges::to>(); - - if (current_config.empty()) { - continue; - } - - bool failed = false; - co_await max_concurrent_for_each(current_config, max_concurrency, [&] (locator::host_id& node) -> future<> { - if (versions.contains(node)) { - // This node was already contacted in a previous iteration. - co_return; - } - - try { - upgrade_log.info("synchronize_schema: `send_schema_check({})`", node); - versions.emplace(node, - co_await with_timeout(as, rpc_timeout, [&ms, node] (abort_source& as) mutable { - return ser::migration_manager_rpc_verbs::send_schema_check(&ms, node, as); - })); - } catch (abort_requested_exception&) { - upgrade_log.warn("synchronize_schema: abort requested during `send_schema_check({})`", node); - throw; - } catch (...) { - // XXX: are there possible fatal errors which should cause us to abort the entire procedure? - upgrade_log.warn("synchronize_schema: `send_schema_check({})` failed: {}", node, std::current_exception()); - failed = true; - } - }); - - if (failed) { - upgrade_log.warn("synchronize_schema: there were some failures when collecting remote schema versions."); - } else if (members0.is_joint()) { - upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", current_config); - } else { - co_return versions; - } - - upgrade_log.info("synchronize_schema: checking if we can finish early before retrying..."); - - if (co_await can_finish_early()) { - co_return std::nullopt; - } - - upgrade_log.info("synchronize_schema: could not finish early."); - } -} - -// Returning `true` means we synchronized schema. -// `false` means we finished early after calling `can_finish_early`. -// -// Postcondition for synchronizing schema (i.e. we return `true`): -// Let T0 be the point in time when this function starts. -// There is a schema version X and a point in time T > T0 such that: -// - the local schema version at T was X, -// - for every member of group 0 configuration there was a point in time T' -// such that T > T' > T0 and the schema version at this member at T' was X. -// -// Assuming that merging schema mutations is an associative, commutative and idempotent -// operation, everybody pulling from everybody (or verifying they have the same mutations) -// should cause everybody to arrive at the same result. -static future synchronize_schema( - replica::database& db, netw::messaging_service& ms, - const group0_members& members0, - service::migration_manager& mm, - const noncopyable_function()>& can_finish_early, - abort_source& as) { - static constexpr auto max_concurrency = 10; - - bool last_pull_successful = false; - size_t num_attempts_after_successful_pull = 0; - - for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) { - upgrade_log.info("synchronize_schema: collecting schema versions from group 0 members..."); - auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, members0, can_finish_early, as); - if (!remote_versions) { - upgrade_log.info("synchronize_schema: finished early."); - co_return false; - } - upgrade_log.info("synchronize_schema: collected remote schema versions."); - - auto my_version = db.get_version(); - upgrade_log.info("synchronize_schema: my version: {}", my_version); - - auto matched = std::erase_if(*remote_versions, [my_version] (const auto& p) { return p.second == my_version; }); - upgrade_log.info("synchronize_schema: schema mismatches: {}. {} nodes had a matching version.", *remote_versions, matched); - - if (remote_versions->empty()) { - upgrade_log.info("synchronize_schema: finished."); - co_return true; - } - - // Note: if we successfully merged schema from everyone in earlier iterations, but our schema versions - // are still not matching, that means our version is strictly more up-to-date than someone else's version. - // In that case we could switch to a push mode instead of pull mode (we push schema mutations to them); - // on the other hand this would further complicate the code and I assume that the regular schema synchronization - // mechanisms (gossiping schema digests and triggering pulls on the other side) should deal with this case, - // even though it may potentially take a bit longer than a pro-active approach. Furthermore, the other side - // is also executing `synchronize_schema` at some point, so having this problem should be extremely unlikely. - if (last_pull_successful) { - if ((++num_attempts_after_successful_pull) > 3) { - upgrade_log.error( - "synchronize_schema: we managed to pull schema from every other node, but our schema versions" - " are still different. The other side must have an outdated version and fail to pull it for some" - " reason. If this message keeps showing up, the internal upgrade-to-raft procedure is stuck;" - " try performing a rolling restart of your cluster." - " If that doesn't fix the problem, the system may require manual fixing of schema tables."); - } - } - - last_pull_successful = true; - co_await max_concurrent_for_each(*remote_versions, max_concurrency, [&] (const auto& p) -> future<> { - auto& [addr, _] = p; - - try { - upgrade_log.info("synchronize_schema: `merge_schema_from({})`", addr); - co_await mm.merge_schema_from(addr); - } catch (const rpc::closed_error& e) { - upgrade_log.warn("synchronize_schema: `merge_schema_from({})` failed due to connection error: {}", addr, e); - last_pull_successful = false; - } catch (timed_out_error&) { - upgrade_log.warn("synchronize_schema: `merge_schema_from({})` timed out", addr); - last_pull_successful = false; - } catch (abort_requested_exception&) { - upgrade_log.warn("synchronize_schema: abort requested during `merge_schema_from({})`", addr); - throw; - } catch (...) { - // We assume that every other exception type indicates a fatal error and happens because `merge_schema_from` - // failed to apply schema mutations from a remote, which is not something we can automatically recover from. - upgrade_log.error( - "synchronize_schema: fatal error in `merge_schema_from({})`: {}." - "\nCannot finish the upgrade procedure." - " Please fix your schema tables manually and try again by restarting the node.", - addr, std::current_exception()); - throw; - } - }); - - if (co_await can_finish_early()) { - upgrade_log.info("synchronize_schema: finishing early."); - co_return false; - } - } -} - -static auto warn_if_upgrade_takes_too_long() { - auto as = std::make_unique(); - auto task = [] (abort_source& as) -> future<> { - static constexpr auto warn_period = std::chrono::minutes{1}; - - while (!as.abort_requested()) { - try { - co_await sleep_abortable(warn_period, as); - } catch (const sleep_aborted&) { - co_return; - } - - upgrade_log.warn( - "Raft upgrade procedure taking longer than expected. Please check if all nodes are live and the network is healthy." - " If the upgrade procedure does not progress even though the cluster is healthy, try performing a rolling restart of the cluster." - " If that doesn't help or some nodes are dead and irrecoverable, manual recovery may be required." - " Consult the relevant documentation: {}", raft_upgrade_doc); - } - }(*as); - - return defer([task = std::move(task), as = std::move(as)] () mutable { - // Stop in background. - as->request_abort(); - (void)std::move(task).then([as = std::move(as)] {}); - }); -} - -future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) { - SCYLLA_ASSERT(this_shard_id() == 0); - - auto start_state = (co_await _client.get_group0_upgrade_state()).second; - switch (start_state) { - case group0_upgrade_state::recovery: - upgrade_log.info("RECOVERY mode. Not attempting upgrade."); - co_return; - case group0_upgrade_state::use_post_raft_procedures: - upgrade_log.info("Already upgraded."); - co_return; - case group0_upgrade_state::synchronize: - upgrade_log.warn( - "Restarting upgrade in `synchronize` state." - " A previous upgrade attempt must have been interrupted or failed."); - break; - case group0_upgrade_state::use_pre_raft_procedures: - upgrade_log.info("starting in `use_pre_raft_procedures` state."); - break; - } - - (void)[] (raft_group0& self, abort_source& as, group0_upgrade_state start_state, gate::holder pause_shutdown, service::storage_service& ss, cql3::query_processor& qp, - service::migration_manager& mm, bool topology_change_enabled) -> future<> { - auto warner = warn_if_upgrade_takes_too_long(); - try { - co_await self.do_upgrade_to_group0(start_state, ss, qp, mm, topology_change_enabled); - co_await self._client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures); - upgrade_log.info("Raft upgrade finished. Disabling migration_manager schema pulls."); - co_await mm.disable_schema_pulls(); - } catch (...) { - upgrade_log.error( - "Raft upgrade failed: {}.\nTry restarting the node to retry upgrade." - " If the procedure gets stuck, manual recovery may be required." - " Consult the relevant documentation: {}", std::current_exception(), raft_upgrade_doc); - } - }(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm, topology_change_enabled); -} - -// `start_state` is either `use_pre_raft_procedures` or `synchronize`. -future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) { - SCYLLA_ASSERT(this_shard_id() == 0); - - // Check if every peer knows about the upgrade procedure. - // - // In a world in which post-conditions and invariants are respected, the fact that `SUPPORTS_RAFT` feature - // is enabled would guarantee this. However, the cluster features mechanism is unreliable and there are - // scenarios where the feature gets enabled even though not everybody supports it. Attempts to fix this - // only unmask more issues; see #11225. In general, fixing the gossiper/features subsystem is a big - // project and we don't want this to block the Raft group 0 project (and we probably want to eventually - // move most application states - including supported feature sets - to group 0 anyway). - // - // Work around that by ensuring that everybody is able to answer to the `get_group0_upgrade_state` call - // before we proceed to the `join_group0` step, which doesn't tolerate servers leaving in the middle; - // once a node is selected as one of the seeds of the discovery algorithm, it must answer. This 'dry run' - // step, on the other hand, allows nodes to leave and will unblock as soon as all remaining peers are - // ready to answer. - upgrade_log.info("Waiting until everyone is ready to start upgrade..."); - auto get_host_ids = [this]() -> future> { - auto current_config = co_await _sys_ks.load_peers_ids(); - current_config.push_back(_gossiper.my_host_id()); - co_return current_config; - }; - co_await check_remote_group0_upgrade_state_dry_run(get_host_ids, _ms.local(), _abort_source); - - if (!joined_group0()) { - upgrade_log.info("Joining group 0..."); - auto handshaker = make_legacy_handshaker(raft::is_voter::yes); // Voter - co_await join_group0(co_await _sys_ks.load_peers(), std::move(handshaker), ss, qp, mm, _sys_ks, topology_change_enabled, join_node_request_params{}); - } else { - upgrade_log.info( - "We're already a member of group 0." - " Apparently we're restarting after a previous upgrade attempt failed."); - } - - // Start group 0 leadership monitor fiber. - _leadership_monitor = leadership_monitor_fiber(); - - group0_members members0{_raft_gr.group0()}; - - // After we joined, we shouldn't be removed from group 0 until the end of the procedure. - // The implementation of `leave_group0` waits until upgrade finishes before leaving the group. - // There is no guarantee that `remove_from_group0` from another node (that has - // finished upgrading) won't remove us after we enter `synchronize` but before we leave it; - // but then we're not needed for anything anymore and we can be shutdown, - // and we won't do anything harmful to other nodes while in `synchronize`, worst case being - // that we get stuck. - - upgrade_log.info("Waiting until every peer has joined Raft group 0..."); - co_await wait_until_every_peer_joined_group0(_sys_ks, members0, _abort_source); - upgrade_log.info("Every peer is a member of Raft group 0."); - - if (start_state == group0_upgrade_state::use_pre_raft_procedures) { - // We perform a schema synchronization step before entering `synchronize` upgrade state. - // - // This step is not necessary for correctness: we will make sure schema is synchronized - // after every node enters `synchronize`, where schema changes are disabled. - // - // However, it's good for reducing the chance that we get stuck later. If we manage to ensure that schema - // is synchronized now, there's a high chance that after entering `synchronize` state we won't have - // to do any additional schema pulls (only verify quickly that the schema is still in sync). - upgrade_log.info("Waiting for schema to synchronize across all nodes in group 0..."); - auto can_finish_early = [] { return make_ready_future(false); }; - co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source); - - // Before entering `synchronize`, perform a round-trip of `get_group0_upgrade_state` RPC calls - // to everyone as a dry run, just to check that nodes respond to this RPC. - // Obviously we may lose connectivity immediately after this function finishes, - // causing later steps to fail, but if network/RPC module is already broken, better to detect - // it now than after entering `synchronize` state. And if this steps succeeds, then there's - // a very high chance that the following steps succeed as well (we would need to be very unlucky otherwise). - upgrade_log.info("Performing a dry run of remote `get_group0_upgrade_state` calls..."); - co_await check_remote_group0_upgrade_state_dry_run( - [members0] { - return make_ready_future>(members0.get_host_ids()); - }, _ms.local(), _abort_source); - - utils::get_local_injector().inject("group0_upgrade_before_synchronize", - [] { throw std::runtime_error("error injection before group 0 upgrade enters synchronize"); }); - - upgrade_log.info("Entering synchronize state."); - upgrade_log.warn("Schema changes are disabled in synchronize state." - " If a failure makes us unable to proceed, manual recovery will be required."); - co_await _client.set_group0_upgrade_state(group0_upgrade_state::synchronize); - } - - upgrade_log.info("Waiting for all peers to enter synchronize state..."); - if (!(co_await wait_for_peers_to_enter_synchronize_state(members0, _ms.local(), _abort_source, _shutdown_gate.hold()))) { - upgrade_log.info("Another node already finished upgrade. We can finish early."); - co_return; - } - - upgrade_log.info("All peers in synchronize state. Waiting for schema to synchronize..."); - auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms.local()), std::ref(_abort_source)); - if (!(co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source))) { - upgrade_log.info("Another node already finished upgrade. We can finish early."); - co_return; - } - - upgrade_log.info("Schema synchronized."); -} - void raft_group0::register_metrics() { namespace sm = seastar::metrics; _metrics.add_group("raft_group0", { diff --git a/service/raft/raft_group0.hh b/service/raft/raft_group0.hh index b4ab4d248b..e821baae16 100644 --- a/service/raft/raft_group0.hh +++ b/service/raft/raft_group0.hh @@ -209,9 +209,8 @@ public: // which will start a procedure to create group 0 and switch administrative operations to use it. future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled); - // If Raft is disabled or in RECOVERY mode, returns `false`. + // If in RECOVERY mode, returns `false`. // Otherwise: - // - waits for the Raft upgrade procedure to finish if it's currently in progress, // - performs a Raft read barrier, // - returns `true`. // diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc index f90a086ec9..710e7db0ec 100644 --- a/service/raft/raft_group0_client.cc +++ b/service/raft/raft_group0_client.cc @@ -262,21 +262,8 @@ future raft_group0_client::start_operation(seastar::abort_source& auto [upgrade_lock_holder, upgrade_state] = std::move(upgrade_lock_and_state); switch (upgrade_state) { case group0_upgrade_state::synchronize: - logger.info("start_operation: waiting until local node leaves synchronize state to start a group 0 operation"); - upgrade_lock_holder.release(); - co_await when_any(wait_until_group0_upgraded(as), sleep_abortable(std::chrono::seconds{10}, as)); - // Checks whether above wait returned due to sleep timeout, which confirms the upgrade procedure stuck case. - // Returns the corresponding runtime error in such cases. - upgrade_lock_and_state = co_await get_group0_upgrade_state(); - upgrade_lock_holder = std::move(upgrade_lock_and_state.first); - upgrade_state = std::move(upgrade_lock_and_state.second); - upgrade_lock_holder.release(); - if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) { - throw std::runtime_error{ - "Cannot perform schema or topology changes during this time; the cluster is currently upgrading to use Raft for schema operations." - " If this error keeps happening, check the logs of your nodes to learn the state of upgrade. The upgrade procedure may get stuck" - " if there was a node failure."}; - } + // The version no longer support upgrade to group0, so the state should be unused + on_internal_error(logger, "unexpected group0 upgrade state 'synchronize' in start_operation"); [[fallthrough]]; case group0_upgrade_state::use_post_raft_procedures: { auto operation_holder = co_await get_units(_operation_mutex, 1, as); @@ -448,21 +435,6 @@ future<> raft_group0_client::set_group0_upgrade_state(group0_upgrade_state state } } -future<> raft_group0_client::wait_until_group0_upgraded(abort_source& as) { - auto sub = as.subscribe([this] () noexcept { _upgraded.broadcast(); }); - if (!sub) { - throw abort_requested_exception{}; - } - - co_await _upgraded.wait([this, &as, sub = std::move(sub)] { - return _upgrade_state == group0_upgrade_state::use_post_raft_procedures || as.abort_requested(); - }); - - if (as.abort_requested()) { - throw abort_requested_exception{}; - } -} - future> raft_group0_client::hold_read_apply_mutex(abort_source& as) { if (this_shard_id() != 0) { on_internal_error(logger, "hold_read_apply_mutex: must run on shard 0"); diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh index cfb528bc54..0b1a102299 100644 --- a/service/raft/raft_group0_client.hh +++ b/service/raft/raft_group0_client.hh @@ -195,9 +195,6 @@ public: // and follow the correct sequence of states. future<> set_group0_upgrade_state(group0_upgrade_state s); - // Wait until group 0 upgrade enters the `use_post_raft_procedures` state. - future<> wait_until_group0_upgraded(abort_source&); - future> hold_read_apply_mutex(abort_source&); bool in_recovery() const; diff --git a/service/storage_service.cc b/service/storage_service.cc index 45cc606d39..7166496b9d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2265,16 +2265,6 @@ future<> storage_service::join_topology(sharded& proxy, co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local(), false); co_await _cdc_gens.local().after_join(std::move(cdc_gen_id)); - // Waited on during stop() - (void)([] (storage_service& me, sharded& proxy) -> future<> { - try { - co_await me.track_upgrade_progress_to_topology_coordinator(proxy); - } catch (const abort_requested_exception&) { - // Ignore - } - // Other errors are handled internally by track_upgrade_progress_to_topology_coordinator - })(*this, proxy); - std::unordered_set ids; _gossiper.for_each_endpoint_state([this, &ids] (const gms::endpoint_state& ep) { if (_gossiper.is_normal(ep.get_host_id())) { @@ -2285,73 +2275,6 @@ future<> storage_service::join_topology(sharded& proxy, co_await _gossiper.notify_nodes_on_up(std::move(ids)); } -future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded& proxy) { - SCYLLA_ASSERT(_group0); - - while (true) { - _group0_as.check(); - try { - co_await _group0->client().wait_until_group0_upgraded(_group0_as); - - // First, wait for the feature to become enabled - shared_promise<> p; - auto sub = _feature_service.supports_consistent_topology_changes.when_enabled([&] () noexcept { p.set_value(); }); - rtlogger.debug("Waiting for cluster feature `SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`"); - co_await p.get_shared_future(_group0_as); - rtlogger.info("The cluster is ready to start upgrade to the raft topology. The procedure needs to be manually triggered. Refer to the documentation"); - - // Wait until upgrade is started - co_await _topology_state_machine.event.when([this] { - return !legacy_topology_change_enabled(); - }); - rtlogger.info("upgrade to raft topology has started"); - break; - } catch (const seastar::abort_requested_exception&) { - throw; - } catch (...) { - rtlogger.error("the fiber tracking readiness of upgrade to raft topology got an unexpected error: {}", std::current_exception()); - } - - co_await sleep_abortable(std::chrono::seconds(1), _group0_as); - } - - // Start the topology coordinator monitor fiber. If we are the leader, this will start - // the topology coordinator which is responsible for driving the upgrade process. - try { - _raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate()); - } catch (...) { - // The calls above can theoretically fail due to coroutine frame allocation failure. - // Abort in this case as the node should be in a pretty bad shape anyway. - rtlogger.error("failed to start the topology coordinator: {}", std::current_exception()); - abort(); - } - - while (true) { - _group0_as.check(); - try { - // Wait until upgrade is finished - co_await _topology_state_machine.event.when([this] { - return raft_topology_change_enabled(); - }); - rtlogger.info("upgrade to raft topology has finished"); - break; - } catch (const seastar::abort_requested_exception&) { - throw; - } catch (...) { - rtlogger.error("the fiber tracking progress of upgrade to raft topology got an unexpected error. " - "Will not report in logs when upgrade has completed. Error: {}", std::current_exception()); - } - } - - try { - _sstable_vnodes_cleanup_fiber = sstable_vnodes_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy); - start_tablet_split_monitor(); - } catch (...) { - rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception()); - abort(); - } -} - // Runs inside seastar::async context future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, std::optional& cdc_gen_id, const std::optional& replacement_info) { return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &replacement_info] { @@ -8074,13 +7997,8 @@ void storage_service::init_messaging_service() { ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) { return handle_raft_rpc(dst_id, [] (auto& ss) -> future { check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query"); - if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) { - throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time."); - } auto result = join_node_query_result{ - .topo_mode = ss.raft_topology_change_enabled() - ? join_node_query_result::topology_mode::raft - : join_node_query_result::topology_mode::legacy, + .topo_mode = join_node_query_result::topology_mode::raft }; return make_ready_future(std::move(result)); }); @@ -8579,13 +8497,6 @@ bool storage_service::raft_topology_change_enabled() const { return _topology_change_kind_enabled == topology_change_kind::raft; } -bool storage_service::legacy_topology_change_enabled() const { - if (this_shard_id() != 0) { - on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0"); - } - return _topology_change_kind_enabled == topology_change_kind::legacy; -} - future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) { _protocol_servers.push_back(&server); if (start_instantly) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 30fe0bc824..1d1c327124 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -929,7 +929,6 @@ private: public: bool raft_topology_change_enabled() const; - bool legacy_topology_change_enabled() const; private: future<> _raft_state_monitor = make_ready_future<>(); @@ -1045,7 +1044,6 @@ public: private: // Tracks progress of the upgrade to topology coordinator. future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>(); - future<> track_upgrade_progress_to_topology_coordinator(sharded& proxy); future<> transit_tablet(table_id, dht::token, noncopyable_function, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations); future get_guard_for_tablet_update(); diff --git a/test/cluster/auth_cluster/test_auth_v2_migration.py b/test/cluster/auth_cluster/test_auth_v2_migration.py deleted file mode 100644 index 30260f930d..0000000000 --- a/test/cluster/auth_cluster/test_auth_v2_migration.py +++ /dev/null @@ -1,214 +0,0 @@ -# -# Copyright (C) 2024-present ScyllaDB -# -# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 -# - -import asyncio -import logging -import pytest -import time - -from test.pylib.manager_client import ManagerClient -from test.pylib.rest_client import get_host_api_address, read_barrier -from test.pylib.util import wait_for_cql_and_get_hosts, unique_name -from cassandra.cluster import ConsistencyLevel -from test.cluster.util import wait_until_topology_upgrade_finishes, enter_recovery_state, reconnect_driver, \ - delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \ - wait_for_token_ring_and_group0_consistency -from test.cluster.auth_cluster import extra_scylla_config_options as auth_config - - -def auth_data(): - return [ - { - "statement": "INSERT INTO system_distributed.service_levels (service_level, timeout, workload_type) VALUES (?, ?, ?)", - "rows": [ - ("sl1", None, None), - ] - }, - { - "statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)", - "rows": [ - ("user 1", True, False, frozenset({'users'}), "salt1?"), - ("user 2", True, False, frozenset({'users'}), "salt2#"), - ("users", False, False, None, None), - ] - }, - { - "statement": "INSERT INTO system_auth.role_members (role, member) VALUES (?, ?)", - "rows": [ - ("users", "user 1"), - ("users", "user 2"), - ] - }, - { - "statement": "INSERT INTO system_auth.role_attributes (role, name, value) VALUES (?, ?, ?)", - "rows": [ - ("users", "service_level", "sl1"), - ] - }, - ] - - -async def populate_test_data(manager: ManagerClient, data): - cql = manager.get_cql() - for d in data: - stmt = cql.prepare(d["statement"]) - stmt.consistency_level = ConsistencyLevel.ALL - await asyncio.gather(*( - cql.run_async(stmt.bind(row_data)) for row_data in d["rows"])) - - -async def populate_auth_v1_data(manager: ManagerClient): - await populate_test_data(manager, auth_data()) - # test also absence of deleted data - username = unique_name("deleted_user_") - logging.info(f"Creating deleted auth-v1 user: {username}") - await populate_test_data(manager, [ - { - "statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)", - "rows": [ - (username, True, False, None, "fefe"), - ] - }, - { - "statement": "DELETE FROM system_auth.roles WHERE role = ?", - "rows": [ - (username,), - ] - }, - ]) - - -async def warmup_v1_static_values(manager: ManagerClient, hosts): - # auth-v1 was using statics to cache internal queries - # in auth-v2 those statics were removed but we want to - # verify that it was effective so trigger here internal - # call to potentially populate static storage and we'll - # verify later that after migration properly formed query - # executes (query has to change because keyspace name changes) - cql = manager.get_cql() - await asyncio.gather(*(cql.run_async("LIST ROLES", host=host) for host in hosts)) - - -async def check_auth_v2_data_migration(manager: ManagerClient, hosts): - cql = manager.get_cql() - # auth reads are eventually consistent so we need to make sure hosts are up-to-date - assert hosts - await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts)) - - data = auth_data() - - roles = set() - for row in await cql.run_async("SELECT * FROM system.roles"): - if row.role == "cassandra": - # Skip default role, its creation in auth-v1 - # is asynchronous and all nodes race to create it - # so we'd need to delay the test and wait. - # Checking this particular role doesn't bring much value - # to the test as we check other roles to demonstrate correctness - continue - member_of = frozenset(row.member_of) if row.member_of else None - roles.add((row.role, row.can_login, row.is_superuser, member_of, row.salted_hash)) - assert roles == set(data[1]["rows"]) - - role_members = set() - for row in await cql.run_async("SELECT * FROM system.role_members"): - role_members.add((row.role, row.member)) - assert role_members == set(data[2]["rows"]) - - role_attributes = set() - for row in await cql.run_async("SELECT * FROM system.role_attributes"): - role_attributes.add((row.role, row.name, row.value)) - assert role_attributes == set(data[3]["rows"]) - - -async def check_auth_v2_works(manager: ManagerClient, hosts): - cql = manager.get_cql() - roles = set() - for row in await cql.run_async("LIST ROLES"): - roles.add(row.role) - assert roles == set(["cassandra", "user 1", "user 2", "users"]) - - user1_roles = await cql.run_async("LIST ROLES OF 'user 1'") - assert len(user1_roles) == 2 - assert set([user1_roles[0].role, user1_roles[1].role]) == set(["users", "user 1"]) - - username = unique_name("user_after_migration_") - logging.info(f"Create role after migration: {username}") - await cql.run_async(f"CREATE ROLE {username}") - await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts)) - # see warmup_v1_static_values for background about checks below - # check if it was added to a new table - assert len(await cql.run_async(f"SELECT role FROM system.roles WHERE role = '{username}'")) == 1 - # check whether list roles statement sees it also via new table (on all nodes) - await asyncio.gather(*(cql.run_async(f"LIST ROLES OF {username}", host=host) for host in hosts)) - await cql.run_async(f"DROP ROLE {username}") - -@pytest.mark.asyncio -async def test_auth_v2_during_recovery(manager: ManagerClient): - # FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded. - servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1") - cql, hosts = await manager.get_ready_cql(servers) - - logging.info("Checking auth version before recovery") - auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'") - assert auth_version[0].value == "2" - - logging.info("Creating role before recovery") - role_name = "ro" + unique_name() - await cql.run_async(f"CREATE ROLE {role_name}") - # auth reads are eventually consistent so we need to sync all nodes - await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts)) - - logging.info("Read roles before recovery") - roles = [row.role for row in await cql.run_async(f"LIST ROLES")] - assert set(roles) == set([role_name, "cassandra"]) - - logging.info("Poison with auth_v1 look a like data") - # this will verify that old roles are not brought back during recovery - # as it runs very similar code path as during v1->v2 migration - await cql.run_async(f"CREATE TABLE system_auth.roles (role text PRIMARY KEY, can_login boolean, is_superuser boolean, member_of set, salted_hash text)") - v1_ro_name = "v1_ro" + unique_name() - await cql.run_async(f"INSERT INTO system_auth.roles (role) VALUES ('{v1_ro_name}')") - - logging.info(f"Restarting hosts {hosts} in recovery mode") - await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts)) - await manager.rolling_restart(servers) - - logging.info("Cluster restarted, waiting until driver reconnects to every server") - await reconnect_driver(manager) - cql, hosts = await manager.get_ready_cql(servers) - - logging.info("Checking auth version during recovery") - auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'") - assert auth_version[0].value == "2" - - logging.info("Reading roles during recovery") - roles = [row.role for row in await cql.run_async(f"LIST ROLES")] - assert set(roles) == set([role_name, "cassandra"]) - - logging.info("Restoring cluster to normal status") - await asyncio.gather(*(delete_raft_topology_state(cql, h) for h in hosts)) - await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts)) - await manager.rolling_restart(servers) - - await reconnect_driver(manager) - cql, hosts = await manager.get_ready_cql(servers) - - await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts)) - for host in hosts: - status = await manager.api.raft_topology_upgrade_status(host.address) - assert status == "not_upgraded" - - await manager.api.upgrade_to_raft_topology(hosts[0].address) - await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts)) - - logging.info("Checking auth version after recovery") - auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'") - assert auth_version[0].value == "2" - - logging.info("Reading roles after recovery") - roles = [row.role for row in await manager.get_cql().run_async(f"LIST ROLES")] - assert set(roles) == set([role_name, "cassandra"]) diff --git a/test/cluster/test_topology_recovery_basic.py b/test/cluster/test_topology_recovery_basic.py deleted file mode 100644 index 2071955754..0000000000 --- a/test/cluster/test_topology_recovery_basic.py +++ /dev/null @@ -1,175 +0,0 @@ -# -# Copyright (C) 2024-present ScyllaDB -# -# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 -# - -import asyncio -import logging -import pytest -import time - -from cassandra.policies import WhiteListRoundRobinPolicy - -from test.pylib.manager_client import ManagerClient -from test.pylib.util import wait_for_cql -from test.cluster.util import enter_recovery_state, \ - delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \ - wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \ - check_system_topology_and_cdc_generations_v3_consistency, start_writes_to_cdc_table, wait_until_last_generation_is_in_use -from test.cluster.conftest import cluster_con - - -@pytest.mark.nightly -@pytest.mark.asyncio -@log_run_time -async def test_topology_recovery_basic(request, build_mode: str, manager: ManagerClient): - # FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded. - - # Increase ring delay to ensure nodes learn about CDC generations before they start operating. - ring_delay = 15000 if build_mode == 'debug' else 5000 - normal_cfg = {'ring_delay_ms': ring_delay} - zero_token_cfg = {'ring_delay_ms': ring_delay, 'join_ring': False} - - servers = [await manager.server_add(config=normal_cfg), - await manager.server_add(config=zero_token_cfg), - await manager.server_add(config=normal_cfg)] - - # The zero-token node requires a different cql session not to be ignored by the driver because of empty tokens in - # the system.peers table. - # We need one cql session for both token-owning nodes to continue the write workload during the rolling restart. - cql_normal = cluster_con([servers[0].ip_addr, servers[2].ip_addr], load_balancing_policy= - WhiteListRoundRobinPolicy([servers[0].ip_addr, servers[2].ip_addr])).connect() - cql_zero_token = cluster_con([servers[1].ip_addr], load_balancing_policy= - WhiteListRoundRobinPolicy([servers[1].ip_addr])).connect() - # In the whole test, cqls[i] and hosts[i] correspond to servers[i]. - cqls = [cql_normal, cql_zero_token, cql_normal] - hosts = [cql_normal.hosts[0] if cql_normal.hosts[0].address == servers[0].ip_addr else cql_normal.hosts[1], - cql_zero_token.hosts[0], - cql_normal.hosts[1] if cql_normal.hosts[0].address == servers[0].ip_addr else cql_normal.hosts[0]] - - # We don't want to use ManagerClient.rolling_restart. Waiting for CQL of the zero-token node would time out. - async def rolling_restart(): - for idx, s in enumerate(servers): - await manager.server_stop_gracefully(s.server_id) - - for idx2 in range(len(servers)): - if idx2 != idx: - await manager.server_not_sees_other_server(servers[idx2].ip_addr, s.ip_addr) - - await manager.server_start(s.server_id) - - for idx2 in range(len(servers)): - if idx2 != idx: - await manager.server_sees_other_server(servers[idx2].ip_addr, s.ip_addr) - - logging.info("Waiting until driver connects to every server") - await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts))) - - restart_writes, stop_writes_and_verify = await start_writes_to_cdc_table(cql_normal) - - logging.info(f"Restarting hosts {hosts} in recovery mode") - await asyncio.gather(*(enter_recovery_state(cql, h) for cql, h in zip(cqls, hosts))) - - # If we restarted nodes before the last generation was in use, some writes - # could fail. After restart, nodes load only the last generation. If it's - # not active yet, writes with lower timestamps would fail. - await wait_until_last_generation_is_in_use(cql_normal) - - logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in all 3 generations") - await asyncio.sleep(1) - - # Restart sequentially, as it tests how nodes operating in legacy mode - # react to raft topology mode nodes and vice versa - await rolling_restart() - - await stop_writes_and_verify() - - def reconnect_cqls(): - nonlocal cql_normal - nonlocal cql_zero_token - nonlocal cqls - cql_normal.shutdown() - cql_zero_token.shutdown() - cql_normal = cluster_con([servers[0].ip_addr, servers[2].ip_addr], load_balancing_policy= - WhiteListRoundRobinPolicy([servers[0].ip_addr, servers[2].ip_addr])).connect() - cql_zero_token = cluster_con([servers[1].ip_addr], load_balancing_policy= - WhiteListRoundRobinPolicy([servers[1].ip_addr])).connect() - cqls = [cql_normal, cql_zero_token, cql_normal] - - reconnect_cqls() - - logging.info("Cluster restarted, waiting until driver reconnects to every server") - await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts))) - logging.info(f"Driver reconnected, hosts: {hosts}") - - restart_writes(cql_normal) - - logging.info(f"Deleting Raft data and upgrade state on {hosts}") - await asyncio.gather(*(delete_raft_topology_state(cql, h) for cql, h in zip(cqls, hosts))) - await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for cql, h in zip(cqls, hosts))) - - logging.info(f"Restarting hosts {hosts}") - await rolling_restart() - - # FIXME: We must reconnect the driver before performing CQL queries below, for example - # in wait_until_schema_upgrade_finishes. Unfortunately, it forces us to stop writing to - # a CDC table first. Reconnecting the driver would close the session used to send the - # writes, and some writes could time out on the client. - # Once https://github.com/scylladb/python-driver/issues/295 is fixed, we can remove - # all calls to reconnect_driver, restart_writes and leave only the last call to - # stop_writes_and_verify. - await stop_writes_and_verify() - - reconnect_cqls() - - logging.info("Cluster restarted, waiting until driver reconnects to every server") - await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts))) - - restart_writes(cql_normal) - - logging.info("Waiting until upgrade to raft schema finishes") - await asyncio.gather(*(wait_until_schema_upgrade_finishes(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts))) - - logging.info("Checking the topology upgrade state on all nodes") - for host in hosts: - status = await manager.api.raft_topology_upgrade_status(host.address) - assert status == "not_upgraded" - - logging.info("Waiting until all nodes see others as alive") - await manager.servers_see_each_other(servers) - - logging.info("Triggering upgrade to raft topology") - await manager.api.upgrade_to_raft_topology(hosts[0].address) - - logging.info("Waiting until upgrade finishes") - await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts)) - - logging.info("Waiting for CDC generations publishing") - for cql, h in zip(cqls, hosts): - await wait_for_cdc_generations_publishing(cql, [h], time.time() + 60) - - logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3") - await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, cqls) - - logging.info("Booting new node") - servers += [await manager.server_add(config=normal_cfg)] - cqls += [cluster_con([servers[3].ip_addr], - load_balancing_policy=WhiteListRoundRobinPolicy([servers[3].ip_addr])).connect()] - hosts += [cqls[3].hosts[0]] - await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts))) - - logging.info("Waiting for the new CDC generation publishing") - for cql, h in zip(cqls, hosts): - await wait_for_cdc_generations_publishing(cql, [h], time.time() + 60) - - logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3") - await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, cqls) - - await wait_until_last_generation_is_in_use(cql_normal) - - logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in the last generation") - await asyncio.sleep(1) - - logging.info("Checking correctness of data in system_distributed.cdc_streams_descriptions_v2") - await stop_writes_and_verify() diff --git a/test/cluster/test_topology_recovery_majority_loss.py b/test/cluster/test_topology_recovery_majority_loss.py deleted file mode 100644 index df9622c937..0000000000 --- a/test/cluster/test_topology_recovery_majority_loss.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Copyright (C) 2024-present ScyllaDB -# -# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 -# - -import asyncio -import logging -import pytest -import time - -from cassandra.policies import WhiteListRoundRobinPolicy - -from test.pylib.manager_client import ManagerClient -from test.pylib.util import wait_for_cql_and_get_hosts -from test.cluster.util import reconnect_driver, enter_recovery_state, \ - delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \ - wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \ - check_system_topology_and_cdc_generations_v3_consistency -from test.cluster.conftest import cluster_con - - -@pytest.mark.asyncio -@log_run_time -async def test_topology_recovery_after_majority_loss(request, manager: ManagerClient): - servers = await manager.servers_add(3) - servers += await manager.servers_add(2, config={'join_ring': False}) - cql = manager.cql - assert(cql) - - # Currently python driver ignores zero-token nodes, so we skip them here. - logging.info("Waiting until driver connects to every server") - hosts = await wait_for_cql_and_get_hosts(cql, servers[:-2], time.time() + 60) - - def host_for_zero_token_node(server): - # In order to get the Host instance of a zero-token node, connect to it directly. - # We cannot use the ManagerClient's session because it is connected to non-zero-token - # nodes and, in that case, the zero-token nodes are ignored. - zero_token_node_session = cluster_con([server.ip_addr], load_balancing_policy= - WhiteListRoundRobinPolicy([server.ip_addr])).connect() - return zero_token_node_session.hosts[0] - - zero_token_hosts = [host_for_zero_token_node(s) for s in [servers[3], servers[4]]] - - srv1, *others = servers - - logging.info(f"Killing all nodes except {srv1}") - await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in others)) - - logging.info(f"Entering recovery state on {srv1}") - host1 = next(h for h in hosts if h.address == srv1.ip_addr) - await enter_recovery_state(cql, host1) - await manager.server_restart(srv1.server_id) - cql = await reconnect_driver(manager) - - logging.info("Node restarted, waiting until driver connects") - host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0] - - for i in range(len(others)): - to_remove = others[i] - ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]] - logging.info(f"Removing {to_remove} using {srv1} with ignore_dead: {ignore_dead_ips}") - await manager.remove_node(srv1.server_id, to_remove.server_id, ignore_dead_ips) - - removed_hosts = hosts[1:] + zero_token_hosts - - logging.info(f"Deleting old Raft data and upgrade state on {host1} and restarting") - await delete_raft_topology_state(cql, host1) - await delete_raft_data_and_upgrade_state(cql, host1) - await manager.server_restart(srv1.server_id) - cql = await reconnect_driver(manager) - - logging.info("Node restarted, waiting until driver connects") - host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0] - - logging.info("Waiting until upgrade to raft schema finishes.") - await wait_until_schema_upgrade_finishes(cql, host1, time.time() + 60) - - logging.info("Triggering upgrade to raft topology") - await manager.api.upgrade_to_raft_topology(host1.address) - - logging.info("Waiting until upgrade to raft topology finishes") - await wait_until_topology_upgrade_finishes(manager, host1.address, time.time() + 60) - - logging.info("Waiting for CDC generations publishing") - await wait_for_cdc_generations_publishing(cql, [host1], time.time() + 60) - - logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3") - await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1], ignored_hosts=removed_hosts) - - logging.info("Add two more nodes") - servers = [srv1] + await manager.servers_add(2) - hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logging.info("Waiting for the new CDC generations publishing") - await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60) - - logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3") - await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts) diff --git a/test/cluster/test_view_build_status.py b/test/cluster/test_view_build_status.py index 70e86ace1c..5892011f44 100644 --- a/test/cluster/test_view_build_status.py +++ b/test/cluster/test_view_build_status.py @@ -326,36 +326,6 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie res = await log.grep(r'ERROR.*\[shard [0-9]: [a-z]+\] raft_topology - topology change coordinator fiber got error exceptions::unavailable_exception \(Cannot achieve consistency level for') assert len(res) == 0 -# Regression test for scylladb/scylladb#23536 -# New node addition with 'sleep_in_synchronize' injection shoehorns the execution path to -# hit 'raft_group0_client::start_operation' when upgrade state is synchronize. The test -# confirms successful execution in such cases. -@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') -@pytest.mark.asyncio -async def test_view_build_status_with_synchronize_wait(manager: ManagerClient): - servers = [] - servers.append(await manager.server_add()) - cql, hosts = await manager.get_ready_cql(servers) - - # With tablets and view building coordinator, view build status is marked - # in the same group0 batch that sets new node status to normal, - # so `start_operation()` is not called and this test doesn't work. - ks = await create_keyspace(cql, disable_tablets=True) - await create_table(cql, ks) - # 'raft_group0_client::start_operation' gets called underneath this. - await create_mv(cql, ks, "vt1") - - # add a node - new_server = await manager.server_add(start=False, config={ - 'error_injections_at_startup': ['sleep_in_synchronize'] - }) - task = asyncio.create_task(manager.server_start(new_server.server_id)) - log = await manager.server_open_log(new_server.server_id) - await log.wait_for("start_operation: waiting until local node leaves synchronize state to start a group 0 operation") - await manager.api.message_injection(new_server.ip_addr, 'sleep_in_synchronize') - - await task - # Test that when removing the view, its build status is cleaned from the status table @pytest.mark.asyncio async def test_view_build_status_cleanup_on_drop_view(manager: ManagerClient):