mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
group0: remove upgrade to group0 code
This patch removes ability of a cluster to upgrade from not having group0 to having one. This ability is used in gossiper based recovery procedure that is deprecated and removed in this version. Also remove tests that uses the procedure.
This commit is contained in:
@@ -57,7 +57,7 @@ enum class group0_upgrade_state : uint8_t {
|
||||
// Schema changes may still arrive from other nodes for some time. However, if no failures occur
|
||||
// during the upgrade procedure, eventually all nodes should enter `synchronize` state. Then
|
||||
// the nodes ensure that schema is synchronized across the entire cluster before entering `use_post_raft_procedures`.
|
||||
synchronize = 2,
|
||||
synchronize = 2, // Deprecated
|
||||
|
||||
// In `use_post_raft_procedures` state the upgrade is finished. The node performs schema changes
|
||||
// using group 0, i.e. by constructing appropriate Raft commands and sending them to the Raft group 0 cluster.
|
||||
|
||||
@@ -694,16 +694,6 @@ struct group0_members {
|
||||
}
|
||||
};
|
||||
|
||||
static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
const group0_members& members0, netw::messaging_service&, abort_source&, gate::holder pause_shutdown);
|
||||
static future<bool> anyone_finished_upgrade(
|
||||
const group0_members& members0, netw::messaging_service&, abort_source&);
|
||||
static future<bool> synchronize_schema(
|
||||
replica::database&, netw::messaging_service&,
|
||||
const group0_members& members0, service::migration_manager&,
|
||||
const noncopyable_function<future<bool>()>& can_finish_early,
|
||||
abort_source&);
|
||||
|
||||
future<bool> raft_group0::use_raft() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
@@ -799,44 +789,6 @@ future<> raft_group0::setup_group0(
|
||||
throw std::runtime_error{"injection: stop_after_joining_group0"};
|
||||
});
|
||||
|
||||
// Enter `synchronize` upgrade state in case the cluster we're joining has recently enabled Raft
|
||||
// and is currently in the middle of `upgrade_to_group0()`. For that procedure to finish
|
||||
// every member of group 0 (now including us) needs to enter `synchronize` state.
|
||||
co_await _client.set_group0_upgrade_state(group0_upgrade_state::synchronize);
|
||||
|
||||
group0_log.info("setup_group0: ensuring that the cluster has fully upgraded to use Raft...");
|
||||
auto& group0_server = _raft_gr.group0();
|
||||
group0_members members0{group0_server};
|
||||
|
||||
// Perform a Raft read barrier so we know the set of group 0 members and our group 0 state is up-to-date.
|
||||
co_await group0_server.read_barrier(&_abort_source);
|
||||
|
||||
auto cfg = group0_server.get_configuration();
|
||||
if (!cfg.is_joint() && cfg.current.size() == 1) {
|
||||
group0_log.info("setup_group0: we're the only member of the cluster.");
|
||||
} else if (ss.raft_topology_change_enabled()) {
|
||||
group0_log.info("setup_group0: cluster uses raft for topology. No need to sync schema.");
|
||||
} else {
|
||||
// We're joining an existing cluster - we're not the only member.
|
||||
//
|
||||
// Wait until one of group 0 members enters `group0_upgrade_state::use_post_raft_procedures`
|
||||
// or all members enter `group0_upgrade_state::synchronize`.
|
||||
//
|
||||
// In a fully upgraded cluster this should finish immediately (if the network works well) - everyone is in `use_post_raft_procedures`.
|
||||
// In a cluster that is currently in the middle of `upgrade_to_group0`, this will cause us to wait until the procedure finishes.
|
||||
group0_log.info("setup_group0: waiting for peers to synchronize state...");
|
||||
if (co_await wait_for_peers_to_enter_synchronize_state(members0, _ms.local(), _abort_source, _shutdown_gate.hold())) {
|
||||
// Everyone entered `synchronize` state. That means we're bootstrapping in the middle of `upgrade_to_group0`.
|
||||
// We need to finish upgrade as others do.
|
||||
auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms.local()), std::ref(_abort_source));
|
||||
co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source);
|
||||
}
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("sleep_in_synchronize", [](auto& handler) -> future<> {
|
||||
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
|
||||
});
|
||||
|
||||
group0_log.info("setup_group0: the cluster is ready to use Raft. Finishing.");
|
||||
co_await _client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures);
|
||||
}
|
||||
@@ -873,24 +825,10 @@ future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3
|
||||
// We're either upgrading or in recovery mode.
|
||||
}
|
||||
|
||||
// Note: even if we joined group 0 before, it doesn't necessarily mean we finished with the whole
|
||||
// upgrade procedure which has follow-up steps after joining group 0, hence we need to prepare
|
||||
// the listener below (or fire the upgrade procedure if the feature is already enabled) even if
|
||||
// we're already a member of group 0 by this point.
|
||||
// `upgrade_to_group0()` will correctly detect and handle which case we're in: whether the procedure
|
||||
// has already finished or we restarted in the middle after a crash.
|
||||
|
||||
if (!_feat.supports_raft_cluster_mgmt) {
|
||||
group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, scheduling upgrade to start when it is.");
|
||||
throw std::runtime_error("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, but was expected to be enabled at this point."
|
||||
" If you are trying to upgrade a node pf a cluster that is not using Raft yet, this is no longer supported.");
|
||||
}
|
||||
|
||||
// The listener may fire immediately, create a thread for that case.
|
||||
co_await seastar::async([this, &ss, &qp, &mm, topology_change_enabled] {
|
||||
_raft_support_listener = _feat.supports_raft_cluster_mgmt.when_enabled([this, &ss, &qp, &mm, topology_change_enabled] {
|
||||
group0_log.info("finish_setup_after_join: SUPPORTS_RAFT feature enabled. Starting internal upgrade-to-raft procedure.");
|
||||
upgrade_to_group0(ss, qp, mm, topology_change_enabled).get();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bool raft_group0::is_member(raft::server_id id, bool include_voters_only) {
|
||||
@@ -982,47 +920,6 @@ future<bool> raft_group0::wait_for_raft() {
|
||||
co_return false;
|
||||
}
|
||||
|
||||
if (!_feat.supports_raft_cluster_mgmt) {
|
||||
// The Raft feature is not yet enabled.
|
||||
//
|
||||
// In that case we assume the cluster is in a partially-upgraded state (some nodes still use a version
|
||||
// without the raft/group 0 code enabled) and skip the reconfiguration.
|
||||
//
|
||||
// In theory we could be wrong: the cluster may have just upgraded but the user managed to start
|
||||
// a topology operation before we noticed it. Removing a node at this point could cause `upgrade_to_group0`
|
||||
// on other nodes to get stuck because it requires contacting all peers which may include the removed node.
|
||||
//
|
||||
// This is unlikely and shouldn't happen if rolling upgrade is performed correctly.
|
||||
// If it does happen, there's always the possibility of performing the manual group 0 recovery procedure.
|
||||
group0_log.warn(
|
||||
"Raft feature not enabled yet. Assuming that the cluster is partially upgraded"
|
||||
" and skipping group 0 reconfiguration. However, if you already finished the rolling upgrade"
|
||||
" procedure, this means that the node just hasn't noticed it yet. The internal upgrade-to-raft procedure"
|
||||
" may get stuck. If that happens, manual recovery will be required."
|
||||
" Consult the documentation for more details: {}",
|
||||
raft_upgrade_doc);
|
||||
co_return false;
|
||||
}
|
||||
|
||||
if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) {
|
||||
// The feature is enabled but `upgrade_to_group0` did not finish yet.
|
||||
// The upgrade procedure requires everyone to participate. In order to not block the cluster
|
||||
// from doing the upgrades, we'll wait until we finish our procedure before doing the reconfiguration.
|
||||
//
|
||||
// Note: in theory it could happen that a node is removed/leaves the cluster immediately after
|
||||
// it finishes upgrade, causing others to get stuck (trying to contact the node that left).
|
||||
// It's unlikely and can be recovered from using the manual group 0 recovery procedure.
|
||||
group0_log.info("Waiting until cluster fully upgrades to use Raft before proceeding...");
|
||||
co_await _client.wait_until_group0_upgraded(_abort_source);
|
||||
group0_log.info("Cluster finished Raft upgrade procedure.");
|
||||
}
|
||||
|
||||
// We're fully upgraded, we must have joined group 0.
|
||||
if (!joined_group0()) {
|
||||
on_internal_error(group0_log,
|
||||
"We're fully upgraded to use Raft but didn't join group 0. Please report a bug.");
|
||||
}
|
||||
|
||||
group0_log.info("Performing a group 0 read barrier...");
|
||||
co_await _raft_gr.group0().read_barrier(&_abort_source);
|
||||
group0_log.info("Finished group 0 read barrier.");
|
||||
@@ -1291,110 +1188,6 @@ struct sleep_with_exponential_backoff {
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Precondition: we joined group 0 and the server is running.
|
||||
// Assumes we don't leave group 0 while running.
|
||||
static future<> wait_until_every_peer_joined_group0(db::system_keyspace& sys_ks, const group0_members& members0, abort_source& as) {
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
// We fetch both config and peers on each iteration; we don't assume that they don't change.
|
||||
// No new node should join while the procedure is running, but nodes may leave.
|
||||
|
||||
auto current_config = members0.get_host_ids();
|
||||
if (current_config.empty()) {
|
||||
// Not all addresses are known
|
||||
continue;
|
||||
}
|
||||
std::sort(current_config.begin(), current_config.end());
|
||||
|
||||
auto peers = co_await sys_ks.load_peers_ids();
|
||||
std::sort(peers.begin(), peers.end());
|
||||
|
||||
std::vector<locator::host_id> missing_peers;
|
||||
std::set_difference(peers.begin(), peers.end(), current_config.begin(), current_config.end(), std::back_inserter(missing_peers));
|
||||
|
||||
if (missing_peers.empty()) {
|
||||
if (!members0.is_joint()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
upgrade_log.info("group 0 configuration is joint: {}.", current_config);
|
||||
continue;
|
||||
}
|
||||
|
||||
upgrade_log.info(
|
||||
"group 0 configuration does not contain all peers yet."
|
||||
" Missing peers: {}. Current group 0 config: {}.",
|
||||
missing_peers, current_config);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if anyone entered `use_post_raft_procedures`.
|
||||
// This is a best-effort single round-trip check; we don't retry if some nodes fail to answer.
|
||||
static future<bool> anyone_finished_upgrade(
|
||||
const group0_members& members0, netw::messaging_service& ms, abort_source& as) {
|
||||
static constexpr auto max_concurrency = 10;
|
||||
static constexpr auto rpc_timeout = std::chrono::seconds{5};
|
||||
|
||||
auto current_config = members0.get_host_ids();
|
||||
bool finished = false;
|
||||
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (const locator::host_id& node) -> future<> {
|
||||
try {
|
||||
auto state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node));
|
||||
if (state == group0_upgrade_state::use_post_raft_procedures) {
|
||||
finished = true;
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
upgrade_log.warn("anyone_finished_upgrade: abort requested during `send_get_group0_upgrade_state({})`", node);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// XXX: are there possible fatal errors which should cause us to abort the entire procedure?
|
||||
upgrade_log.warn("anyone_finished_upgrade: `send_get_group0_upgrade_state({})` failed: {}", node, std::current_exception());
|
||||
}
|
||||
});
|
||||
co_return finished;
|
||||
}
|
||||
|
||||
// Check if it's possible to reach everyone through `get_group0_upgrade_state` RPC.
|
||||
static future<> check_remote_group0_upgrade_state_dry_run(
|
||||
const noncopyable_function<future<std::vector<locator::host_id>>()>& get_host_ids,
|
||||
netw::messaging_service& ms, abort_source& as) {
|
||||
static constexpr auto rpc_timeout = std::chrono::seconds{5};
|
||||
static constexpr auto max_concurrency = 10;
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
// Note: we strive to get a response from everyone in a 'single round-trip',
|
||||
// so we don't skip nodes which responded in earlier iterations.
|
||||
// We contact everyone in each iteration even if some of these guys already answered.
|
||||
// We fetch peers again on every attempt to handle the possibility of leaving nodes.
|
||||
auto cluster_config = co_await get_host_ids();
|
||||
if (cluster_config.empty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bool retry = false;
|
||||
co_await max_concurrent_for_each(cluster_config, max_concurrency, [&] (const locator::host_id& node) -> future<> {
|
||||
try {
|
||||
upgrade_log.info("check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})`", node);
|
||||
co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node));
|
||||
} catch (abort_requested_exception&) {
|
||||
upgrade_log.warn("check_remote_group0_upgrade_state_dry_run: abort requested during `send_get_group0_upgrade_state({})`", node);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// XXX: are there possible fatal errors which should cause us to abort the entire procedure?
|
||||
upgrade_log.warn(
|
||||
"check_remote_group0_upgrade_state_dry_run: `send_get_group0_upgrade_state({})` failed: {}",
|
||||
node, std::current_exception());
|
||||
retry = true;
|
||||
}
|
||||
});
|
||||
|
||||
if (!retry) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) {
|
||||
static constexpr auto rpc_timeout = std::chrono::seconds{5};
|
||||
static constexpr auto max_concurrency = 10;
|
||||
@@ -1432,423 +1225,6 @@ future<> raft_group0::wait_for_all_nodes_to_finish_upgrade(abort_source& as) {
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until all members of group 0 enter `group0_upgrade_state::synchronize` or some node enters
|
||||
// `group0_upgrade_state::use_post_raft_procedures` (the latter meaning upgrade is finished and we can also finish).
|
||||
//
|
||||
// Precondition: we're in `synchronize` state.
|
||||
//
|
||||
// Returns `true` if we finished because everybody entered `synchronize`.
|
||||
// Returns `false` if we finished because somebody entered `use_post_raft_procedures`.
|
||||
static future<bool> wait_for_peers_to_enter_synchronize_state(
|
||||
const group0_members& members0, netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown) {
|
||||
static constexpr auto rpc_timeout = std::chrono::seconds{5};
|
||||
static constexpr auto max_concurrency = 10;
|
||||
|
||||
auto entered_synchronize = make_lw_shared<std::unordered_set<locator::host_id>>();
|
||||
|
||||
// This is a work-around for boost tests where RPC module is not listening so we cannot contact ourselves.
|
||||
// But really, except the (arguably broken) test code, we don't need to be treated as an edge case. All nodes are symmetric.
|
||||
// For production code this line is unnecessary.
|
||||
entered_synchronize->insert(ms.host_id());
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
// We fetch the config again on every attempt to handle the possibility of removing failed nodes.
|
||||
auto current_members_set = members0.get_members();
|
||||
|
||||
::tracker<bool> tracker;
|
||||
auto retry = make_lw_shared<bool>(false);
|
||||
|
||||
auto sub = as.subscribe([tracker] () mutable noexcept {
|
||||
tracker.set_exception(std::make_exception_ptr(abort_requested_exception{}));
|
||||
});
|
||||
if (!sub) {
|
||||
upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: abort requested");
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
|
||||
(void) [] (netw::messaging_service& ms, abort_source& as, gate::holder pause_shutdown,
|
||||
raft::config_member_set current_members_set, group0_members members0,
|
||||
lw_shared_ptr<std::unordered_set<locator::host_id>> entered_synchronize,
|
||||
lw_shared_ptr<bool> retry, ::tracker<bool> tracker) -> future<> {
|
||||
co_await max_concurrent_for_each(current_members_set, max_concurrency, [&] (const raft::config_member& member) -> future<> {
|
||||
locator::host_id node{member.addr.id.uuid()};
|
||||
|
||||
if (entered_synchronize->contains(node)) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
try {
|
||||
auto state = co_await with_timeout(as, rpc_timeout, std::bind_front(send_get_group0_upgrade_state, std::ref(ms), node));
|
||||
if (tracker.finished()) {
|
||||
// A response from another node caused us to finish already.
|
||||
co_return;
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case group0_upgrade_state::use_post_raft_procedures:
|
||||
upgrade_log.info("wait_for_peers_to_enter_synchronize_state: {} confirmed that they finished upgrade.", node);
|
||||
tracker.set_value(true);
|
||||
break;
|
||||
case group0_upgrade_state::synchronize:
|
||||
entered_synchronize->insert(node);
|
||||
break;
|
||||
default:
|
||||
upgrade_log.info("wait_for_peers_to_enter_synchronize_state: node {} not in synchronize state yet...", node);
|
||||
*retry = true;
|
||||
}
|
||||
} catch (abort_requested_exception&) {
|
||||
upgrade_log.warn("wait_for_peers_to_enter_synchronize_state: abort requested during `send_get_group0_upgrade_state({})`", node);
|
||||
tracker.set_exception(std::current_exception());
|
||||
} catch (...) {
|
||||
// XXX: are there possible fatal errors which should cause us to abort the entire procedure?
|
||||
upgrade_log.warn(
|
||||
"wait_for_peers_to_enter_synchronize_state: `send_get_group0_upgrade_state({})` failed: {}",
|
||||
node, std::current_exception());
|
||||
*retry = true;
|
||||
}
|
||||
});
|
||||
|
||||
tracker.set_value(false);
|
||||
}(ms, as, pause_shutdown, std::move(current_members_set), members0, entered_synchronize, retry, tracker);
|
||||
|
||||
auto finish_early = co_await tracker.get();
|
||||
if (finish_early) {
|
||||
co_return false;
|
||||
}
|
||||
|
||||
if (!*retry) {
|
||||
co_return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returning nullopt means we finished early (`can_finish_early` returned true).
|
||||
static future<std::optional<std::unordered_map<locator::host_id, table_schema_version>>>
|
||||
collect_schema_versions_from_group0_members(
|
||||
netw::messaging_service& ms, const group0_members& members0,
|
||||
const noncopyable_function<future<bool>()>& can_finish_early,
|
||||
abort_source& as) {
|
||||
static constexpr auto rpc_timeout = std::chrono::seconds{5};
|
||||
static constexpr auto max_concurrency = 10;
|
||||
|
||||
std::unordered_map<locator::host_id, table_schema_version> versions;
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
|
||||
// We fetch the config on each iteration; some nodes may leave.
|
||||
auto current_config = members0.get_members() |
|
||||
std::views::transform([] (auto m) { return locator::host_id{m.addr.id.uuid()}; }) |
|
||||
std::ranges::to<std::vector<locator::host_id>>();
|
||||
|
||||
if (current_config.empty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bool failed = false;
|
||||
co_await max_concurrent_for_each(current_config, max_concurrency, [&] (locator::host_id& node) -> future<> {
|
||||
if (versions.contains(node)) {
|
||||
// This node was already contacted in a previous iteration.
|
||||
co_return;
|
||||
}
|
||||
|
||||
try {
|
||||
upgrade_log.info("synchronize_schema: `send_schema_check({})`", node);
|
||||
versions.emplace(node,
|
||||
co_await with_timeout(as, rpc_timeout, [&ms, node] (abort_source& as) mutable {
|
||||
return ser::migration_manager_rpc_verbs::send_schema_check(&ms, node, as);
|
||||
}));
|
||||
} catch (abort_requested_exception&) {
|
||||
upgrade_log.warn("synchronize_schema: abort requested during `send_schema_check({})`", node);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// XXX: are there possible fatal errors which should cause us to abort the entire procedure?
|
||||
upgrade_log.warn("synchronize_schema: `send_schema_check({})` failed: {}", node, std::current_exception());
|
||||
failed = true;
|
||||
}
|
||||
});
|
||||
|
||||
if (failed) {
|
||||
upgrade_log.warn("synchronize_schema: there were some failures when collecting remote schema versions.");
|
||||
} else if (members0.is_joint()) {
|
||||
upgrade_log.warn("synchronize_schema: group 0 configuration is joint: {}.", current_config);
|
||||
} else {
|
||||
co_return versions;
|
||||
}
|
||||
|
||||
upgrade_log.info("synchronize_schema: checking if we can finish early before retrying...");
|
||||
|
||||
if (co_await can_finish_early()) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
upgrade_log.info("synchronize_schema: could not finish early.");
|
||||
}
|
||||
}
|
||||
|
||||
// Returning `true` means we synchronized schema.
|
||||
// `false` means we finished early after calling `can_finish_early`.
|
||||
//
|
||||
// Postcondition for synchronizing schema (i.e. we return `true`):
|
||||
// Let T0 be the point in time when this function starts.
|
||||
// There is a schema version X and a point in time T > T0 such that:
|
||||
// - the local schema version at T was X,
|
||||
// - for every member of group 0 configuration there was a point in time T'
|
||||
// such that T > T' > T0 and the schema version at this member at T' was X.
|
||||
//
|
||||
// Assuming that merging schema mutations is an associative, commutative and idempotent
|
||||
// operation, everybody pulling from everybody (or verifying they have the same mutations)
|
||||
// should cause everybody to arrive at the same result.
|
||||
static future<bool> synchronize_schema(
|
||||
replica::database& db, netw::messaging_service& ms,
|
||||
const group0_members& members0,
|
||||
service::migration_manager& mm,
|
||||
const noncopyable_function<future<bool>()>& can_finish_early,
|
||||
abort_source& as) {
|
||||
static constexpr auto max_concurrency = 10;
|
||||
|
||||
bool last_pull_successful = false;
|
||||
size_t num_attempts_after_successful_pull = 0;
|
||||
|
||||
for (sleep_with_exponential_backoff sleep;; co_await sleep(as)) {
|
||||
upgrade_log.info("synchronize_schema: collecting schema versions from group 0 members...");
|
||||
auto remote_versions = co_await collect_schema_versions_from_group0_members(ms, members0, can_finish_early, as);
|
||||
if (!remote_versions) {
|
||||
upgrade_log.info("synchronize_schema: finished early.");
|
||||
co_return false;
|
||||
}
|
||||
upgrade_log.info("synchronize_schema: collected remote schema versions.");
|
||||
|
||||
auto my_version = db.get_version();
|
||||
upgrade_log.info("synchronize_schema: my version: {}", my_version);
|
||||
|
||||
auto matched = std::erase_if(*remote_versions, [my_version] (const auto& p) { return p.second == my_version; });
|
||||
upgrade_log.info("synchronize_schema: schema mismatches: {}. {} nodes had a matching version.", *remote_versions, matched);
|
||||
|
||||
if (remote_versions->empty()) {
|
||||
upgrade_log.info("synchronize_schema: finished.");
|
||||
co_return true;
|
||||
}
|
||||
|
||||
// Note: if we successfully merged schema from everyone in earlier iterations, but our schema versions
|
||||
// are still not matching, that means our version is strictly more up-to-date than someone else's version.
|
||||
// In that case we could switch to a push mode instead of pull mode (we push schema mutations to them);
|
||||
// on the other hand this would further complicate the code and I assume that the regular schema synchronization
|
||||
// mechanisms (gossiping schema digests and triggering pulls on the other side) should deal with this case,
|
||||
// even though it may potentially take a bit longer than a pro-active approach. Furthermore, the other side
|
||||
// is also executing `synchronize_schema` at some point, so having this problem should be extremely unlikely.
|
||||
if (last_pull_successful) {
|
||||
if ((++num_attempts_after_successful_pull) > 3) {
|
||||
upgrade_log.error(
|
||||
"synchronize_schema: we managed to pull schema from every other node, but our schema versions"
|
||||
" are still different. The other side must have an outdated version and fail to pull it for some"
|
||||
" reason. If this message keeps showing up, the internal upgrade-to-raft procedure is stuck;"
|
||||
" try performing a rolling restart of your cluster."
|
||||
" If that doesn't fix the problem, the system may require manual fixing of schema tables.");
|
||||
}
|
||||
}
|
||||
|
||||
last_pull_successful = true;
|
||||
co_await max_concurrent_for_each(*remote_versions, max_concurrency, [&] (const auto& p) -> future<> {
|
||||
auto& [addr, _] = p;
|
||||
|
||||
try {
|
||||
upgrade_log.info("synchronize_schema: `merge_schema_from({})`", addr);
|
||||
co_await mm.merge_schema_from(addr);
|
||||
} catch (const rpc::closed_error& e) {
|
||||
upgrade_log.warn("synchronize_schema: `merge_schema_from({})` failed due to connection error: {}", addr, e);
|
||||
last_pull_successful = false;
|
||||
} catch (timed_out_error&) {
|
||||
upgrade_log.warn("synchronize_schema: `merge_schema_from({})` timed out", addr);
|
||||
last_pull_successful = false;
|
||||
} catch (abort_requested_exception&) {
|
||||
upgrade_log.warn("synchronize_schema: abort requested during `merge_schema_from({})`", addr);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// We assume that every other exception type indicates a fatal error and happens because `merge_schema_from`
|
||||
// failed to apply schema mutations from a remote, which is not something we can automatically recover from.
|
||||
upgrade_log.error(
|
||||
"synchronize_schema: fatal error in `merge_schema_from({})`: {}."
|
||||
"\nCannot finish the upgrade procedure."
|
||||
" Please fix your schema tables manually and try again by restarting the node.",
|
||||
addr, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
|
||||
if (co_await can_finish_early()) {
|
||||
upgrade_log.info("synchronize_schema: finishing early.");
|
||||
co_return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static auto warn_if_upgrade_takes_too_long() {
|
||||
auto as = std::make_unique<abort_source>();
|
||||
auto task = [] (abort_source& as) -> future<> {
|
||||
static constexpr auto warn_period = std::chrono::minutes{1};
|
||||
|
||||
while (!as.abort_requested()) {
|
||||
try {
|
||||
co_await sleep_abortable(warn_period, as);
|
||||
} catch (const sleep_aborted&) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
upgrade_log.warn(
|
||||
"Raft upgrade procedure taking longer than expected. Please check if all nodes are live and the network is healthy."
|
||||
" If the upgrade procedure does not progress even though the cluster is healthy, try performing a rolling restart of the cluster."
|
||||
" If that doesn't help or some nodes are dead and irrecoverable, manual recovery may be required."
|
||||
" Consult the relevant documentation: {}", raft_upgrade_doc);
|
||||
}
|
||||
}(*as);
|
||||
|
||||
return defer([task = std::move(task), as = std::move(as)] () mutable {
|
||||
// Stop in background.
|
||||
as->request_abort();
|
||||
(void)std::move(task).then([as = std::move(as)] {});
|
||||
});
|
||||
}
|
||||
|
||||
future<> raft_group0::upgrade_to_group0(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
auto start_state = (co_await _client.get_group0_upgrade_state()).second;
|
||||
switch (start_state) {
|
||||
case group0_upgrade_state::recovery:
|
||||
upgrade_log.info("RECOVERY mode. Not attempting upgrade.");
|
||||
co_return;
|
||||
case group0_upgrade_state::use_post_raft_procedures:
|
||||
upgrade_log.info("Already upgraded.");
|
||||
co_return;
|
||||
case group0_upgrade_state::synchronize:
|
||||
upgrade_log.warn(
|
||||
"Restarting upgrade in `synchronize` state."
|
||||
" A previous upgrade attempt must have been interrupted or failed.");
|
||||
break;
|
||||
case group0_upgrade_state::use_pre_raft_procedures:
|
||||
upgrade_log.info("starting in `use_pre_raft_procedures` state.");
|
||||
break;
|
||||
}
|
||||
|
||||
(void)[] (raft_group0& self, abort_source& as, group0_upgrade_state start_state, gate::holder pause_shutdown, service::storage_service& ss, cql3::query_processor& qp,
|
||||
service::migration_manager& mm, bool topology_change_enabled) -> future<> {
|
||||
auto warner = warn_if_upgrade_takes_too_long();
|
||||
try {
|
||||
co_await self.do_upgrade_to_group0(start_state, ss, qp, mm, topology_change_enabled);
|
||||
co_await self._client.set_group0_upgrade_state(group0_upgrade_state::use_post_raft_procedures);
|
||||
upgrade_log.info("Raft upgrade finished. Disabling migration_manager schema pulls.");
|
||||
co_await mm.disable_schema_pulls();
|
||||
} catch (...) {
|
||||
upgrade_log.error(
|
||||
"Raft upgrade failed: {}.\nTry restarting the node to retry upgrade."
|
||||
" If the procedure gets stuck, manual recovery may be required."
|
||||
" Consult the relevant documentation: {}", std::current_exception(), raft_upgrade_doc);
|
||||
}
|
||||
}(std::ref(*this), std::ref(_abort_source), start_state, _shutdown_gate.hold(), ss, qp, mm, topology_change_enabled);
|
||||
}
|
||||
|
||||
// `start_state` is either `use_pre_raft_procedures` or `synchronize`.
|
||||
future<> raft_group0::do_upgrade_to_group0(group0_upgrade_state start_state, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// Check if every peer knows about the upgrade procedure.
|
||||
//
|
||||
// In a world in which post-conditions and invariants are respected, the fact that `SUPPORTS_RAFT` feature
|
||||
// is enabled would guarantee this. However, the cluster features mechanism is unreliable and there are
|
||||
// scenarios where the feature gets enabled even though not everybody supports it. Attempts to fix this
|
||||
// only unmask more issues; see #11225. In general, fixing the gossiper/features subsystem is a big
|
||||
// project and we don't want this to block the Raft group 0 project (and we probably want to eventually
|
||||
// move most application states - including supported feature sets - to group 0 anyway).
|
||||
//
|
||||
// Work around that by ensuring that everybody is able to answer to the `get_group0_upgrade_state` call
|
||||
// before we proceed to the `join_group0` step, which doesn't tolerate servers leaving in the middle;
|
||||
// once a node is selected as one of the seeds of the discovery algorithm, it must answer. This 'dry run'
|
||||
// step, on the other hand, allows nodes to leave and will unblock as soon as all remaining peers are
|
||||
// ready to answer.
|
||||
upgrade_log.info("Waiting until everyone is ready to start upgrade...");
|
||||
auto get_host_ids = [this]() -> future<std::vector<locator::host_id>> {
|
||||
auto current_config = co_await _sys_ks.load_peers_ids();
|
||||
current_config.push_back(_gossiper.my_host_id());
|
||||
co_return current_config;
|
||||
};
|
||||
co_await check_remote_group0_upgrade_state_dry_run(get_host_ids, _ms.local(), _abort_source);
|
||||
|
||||
if (!joined_group0()) {
|
||||
upgrade_log.info("Joining group 0...");
|
||||
auto handshaker = make_legacy_handshaker(raft::is_voter::yes); // Voter
|
||||
co_await join_group0(co_await _sys_ks.load_peers(), std::move(handshaker), ss, qp, mm, _sys_ks, topology_change_enabled, join_node_request_params{});
|
||||
} else {
|
||||
upgrade_log.info(
|
||||
"We're already a member of group 0."
|
||||
" Apparently we're restarting after a previous upgrade attempt failed.");
|
||||
}
|
||||
|
||||
// Start group 0 leadership monitor fiber.
|
||||
_leadership_monitor = leadership_monitor_fiber();
|
||||
|
||||
group0_members members0{_raft_gr.group0()};
|
||||
|
||||
// After we joined, we shouldn't be removed from group 0 until the end of the procedure.
|
||||
// The implementation of `leave_group0` waits until upgrade finishes before leaving the group.
|
||||
// There is no guarantee that `remove_from_group0` from another node (that has
|
||||
// finished upgrading) won't remove us after we enter `synchronize` but before we leave it;
|
||||
// but then we're not needed for anything anymore and we can be shutdown,
|
||||
// and we won't do anything harmful to other nodes while in `synchronize`, worst case being
|
||||
// that we get stuck.
|
||||
|
||||
upgrade_log.info("Waiting until every peer has joined Raft group 0...");
|
||||
co_await wait_until_every_peer_joined_group0(_sys_ks, members0, _abort_source);
|
||||
upgrade_log.info("Every peer is a member of Raft group 0.");
|
||||
|
||||
if (start_state == group0_upgrade_state::use_pre_raft_procedures) {
|
||||
// We perform a schema synchronization step before entering `synchronize` upgrade state.
|
||||
//
|
||||
// This step is not necessary for correctness: we will make sure schema is synchronized
|
||||
// after every node enters `synchronize`, where schema changes are disabled.
|
||||
//
|
||||
// However, it's good for reducing the chance that we get stuck later. If we manage to ensure that schema
|
||||
// is synchronized now, there's a high chance that after entering `synchronize` state we won't have
|
||||
// to do any additional schema pulls (only verify quickly that the schema is still in sync).
|
||||
upgrade_log.info("Waiting for schema to synchronize across all nodes in group 0...");
|
||||
auto can_finish_early = [] { return make_ready_future<bool>(false); };
|
||||
co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source);
|
||||
|
||||
// Before entering `synchronize`, perform a round-trip of `get_group0_upgrade_state` RPC calls
|
||||
// to everyone as a dry run, just to check that nodes respond to this RPC.
|
||||
// Obviously we may lose connectivity immediately after this function finishes,
|
||||
// causing later steps to fail, but if network/RPC module is already broken, better to detect
|
||||
// it now than after entering `synchronize` state. And if this steps succeeds, then there's
|
||||
// a very high chance that the following steps succeed as well (we would need to be very unlucky otherwise).
|
||||
upgrade_log.info("Performing a dry run of remote `get_group0_upgrade_state` calls...");
|
||||
co_await check_remote_group0_upgrade_state_dry_run(
|
||||
[members0] {
|
||||
return make_ready_future<std::vector<locator::host_id>>(members0.get_host_ids());
|
||||
}, _ms.local(), _abort_source);
|
||||
|
||||
utils::get_local_injector().inject("group0_upgrade_before_synchronize",
|
||||
[] { throw std::runtime_error("error injection before group 0 upgrade enters synchronize"); });
|
||||
|
||||
upgrade_log.info("Entering synchronize state.");
|
||||
upgrade_log.warn("Schema changes are disabled in synchronize state."
|
||||
" If a failure makes us unable to proceed, manual recovery will be required.");
|
||||
co_await _client.set_group0_upgrade_state(group0_upgrade_state::synchronize);
|
||||
}
|
||||
|
||||
upgrade_log.info("Waiting for all peers to enter synchronize state...");
|
||||
if (!(co_await wait_for_peers_to_enter_synchronize_state(members0, _ms.local(), _abort_source, _shutdown_gate.hold()))) {
|
||||
upgrade_log.info("Another node already finished upgrade. We can finish early.");
|
||||
co_return;
|
||||
}
|
||||
|
||||
upgrade_log.info("All peers in synchronize state. Waiting for schema to synchronize...");
|
||||
auto can_finish_early = std::bind_front(anyone_finished_upgrade, std::cref(members0), std::ref(_ms.local()), std::ref(_abort_source));
|
||||
if (!(co_await synchronize_schema(qp.db().real_database(), _ms.local(), members0, mm, can_finish_early, _abort_source))) {
|
||||
upgrade_log.info("Another node already finished upgrade. We can finish early.");
|
||||
co_return;
|
||||
}
|
||||
|
||||
upgrade_log.info("Schema synchronized.");
|
||||
}
|
||||
|
||||
void raft_group0::register_metrics() {
|
||||
namespace sm = seastar::metrics;
|
||||
_metrics.add_group("raft_group0", {
|
||||
|
||||
@@ -209,9 +209,8 @@ public:
|
||||
// which will start a procedure to create group 0 and switch administrative operations to use it.
|
||||
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled);
|
||||
|
||||
// If Raft is disabled or in RECOVERY mode, returns `false`.
|
||||
// If in RECOVERY mode, returns `false`.
|
||||
// Otherwise:
|
||||
// - waits for the Raft upgrade procedure to finish if it's currently in progress,
|
||||
// - performs a Raft read barrier,
|
||||
// - returns `true`.
|
||||
//
|
||||
|
||||
@@ -262,21 +262,8 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source&
|
||||
auto [upgrade_lock_holder, upgrade_state] = std::move(upgrade_lock_and_state);
|
||||
switch (upgrade_state) {
|
||||
case group0_upgrade_state::synchronize:
|
||||
logger.info("start_operation: waiting until local node leaves synchronize state to start a group 0 operation");
|
||||
upgrade_lock_holder.release();
|
||||
co_await when_any(wait_until_group0_upgraded(as), sleep_abortable(std::chrono::seconds{10}, as));
|
||||
// Checks whether above wait returned due to sleep timeout, which confirms the upgrade procedure stuck case.
|
||||
// Returns the corresponding runtime error in such cases.
|
||||
upgrade_lock_and_state = co_await get_group0_upgrade_state();
|
||||
upgrade_lock_holder = std::move(upgrade_lock_and_state.first);
|
||||
upgrade_state = std::move(upgrade_lock_and_state.second);
|
||||
upgrade_lock_holder.release();
|
||||
if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) {
|
||||
throw std::runtime_error{
|
||||
"Cannot perform schema or topology changes during this time; the cluster is currently upgrading to use Raft for schema operations."
|
||||
" If this error keeps happening, check the logs of your nodes to learn the state of upgrade. The upgrade procedure may get stuck"
|
||||
" if there was a node failure."};
|
||||
}
|
||||
// The version no longer support upgrade to group0, so the state should be unused
|
||||
on_internal_error(logger, "unexpected group0 upgrade state 'synchronize' in start_operation");
|
||||
[[fallthrough]];
|
||||
case group0_upgrade_state::use_post_raft_procedures: {
|
||||
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
|
||||
@@ -448,21 +435,6 @@ future<> raft_group0_client::set_group0_upgrade_state(group0_upgrade_state state
|
||||
}
|
||||
}
|
||||
|
||||
future<> raft_group0_client::wait_until_group0_upgraded(abort_source& as) {
|
||||
auto sub = as.subscribe([this] () noexcept { _upgraded.broadcast(); });
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
|
||||
co_await _upgraded.wait([this, &as, sub = std::move(sub)] {
|
||||
return _upgrade_state == group0_upgrade_state::use_post_raft_procedures || as.abort_requested();
|
||||
});
|
||||
|
||||
if (as.abort_requested()) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
}
|
||||
|
||||
future<semaphore_units<>> raft_group0_client::hold_read_apply_mutex(abort_source& as) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "hold_read_apply_mutex: must run on shard 0");
|
||||
|
||||
@@ -195,9 +195,6 @@ public:
|
||||
// and follow the correct sequence of states.
|
||||
future<> set_group0_upgrade_state(group0_upgrade_state s);
|
||||
|
||||
// Wait until group 0 upgrade enters the `use_post_raft_procedures` state.
|
||||
future<> wait_until_group0_upgraded(abort_source&);
|
||||
|
||||
future<semaphore_units<>> hold_read_apply_mutex(abort_source&);
|
||||
|
||||
bool in_recovery() const;
|
||||
|
||||
@@ -2265,16 +2265,6 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local(), false);
|
||||
co_await _cdc_gens.local().after_join(std::move(cdc_gen_id));
|
||||
|
||||
// Waited on during stop()
|
||||
(void)([] (storage_service& me, sharded<service::storage_proxy>& proxy) -> future<> {
|
||||
try {
|
||||
co_await me.track_upgrade_progress_to_topology_coordinator(proxy);
|
||||
} catch (const abort_requested_exception&) {
|
||||
// Ignore
|
||||
}
|
||||
// Other errors are handled internally by track_upgrade_progress_to_topology_coordinator
|
||||
})(*this, proxy);
|
||||
|
||||
std::unordered_set<locator::host_id> ids;
|
||||
_gossiper.for_each_endpoint_state([this, &ids] (const gms::endpoint_state& ep) {
|
||||
if (_gossiper.is_normal(ep.get_host_id())) {
|
||||
@@ -2285,73 +2275,6 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
co_await _gossiper.notify_nodes_on_up(std::move(ids));
|
||||
}
|
||||
|
||||
future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy) {
|
||||
SCYLLA_ASSERT(_group0);
|
||||
|
||||
while (true) {
|
||||
_group0_as.check();
|
||||
try {
|
||||
co_await _group0->client().wait_until_group0_upgraded(_group0_as);
|
||||
|
||||
// First, wait for the feature to become enabled
|
||||
shared_promise<> p;
|
||||
auto sub = _feature_service.supports_consistent_topology_changes.when_enabled([&] () noexcept { p.set_value(); });
|
||||
rtlogger.debug("Waiting for cluster feature `SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`");
|
||||
co_await p.get_shared_future(_group0_as);
|
||||
rtlogger.info("The cluster is ready to start upgrade to the raft topology. The procedure needs to be manually triggered. Refer to the documentation");
|
||||
|
||||
// Wait until upgrade is started
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return !legacy_topology_change_enabled();
|
||||
});
|
||||
rtlogger.info("upgrade to raft topology has started");
|
||||
break;
|
||||
} catch (const seastar::abort_requested_exception&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("the fiber tracking readiness of upgrade to raft topology got an unexpected error: {}", std::current_exception());
|
||||
}
|
||||
|
||||
co_await sleep_abortable(std::chrono::seconds(1), _group0_as);
|
||||
}
|
||||
|
||||
// Start the topology coordinator monitor fiber. If we are the leader, this will start
|
||||
// the topology coordinator which is responsible for driving the upgrade process.
|
||||
try {
|
||||
_raft_state_monitor = raft_state_monitor_fiber(_group0->group0_server(), _group0->hold_group0_gate());
|
||||
} catch (...) {
|
||||
// The calls above can theoretically fail due to coroutine frame allocation failure.
|
||||
// Abort in this case as the node should be in a pretty bad shape anyway.
|
||||
rtlogger.error("failed to start the topology coordinator: {}", std::current_exception());
|
||||
abort();
|
||||
}
|
||||
|
||||
while (true) {
|
||||
_group0_as.check();
|
||||
try {
|
||||
// Wait until upgrade is finished
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return raft_topology_change_enabled();
|
||||
});
|
||||
rtlogger.info("upgrade to raft topology has finished");
|
||||
break;
|
||||
} catch (const seastar::abort_requested_exception&) {
|
||||
throw;
|
||||
} catch (...) {
|
||||
rtlogger.error("the fiber tracking progress of upgrade to raft topology got an unexpected error. "
|
||||
"Will not report in logs when upgrade has completed. Error: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
_sstable_vnodes_cleanup_fiber = sstable_vnodes_cleanup_fiber(_group0->group0_server(), _group0->hold_group0_gate(), proxy);
|
||||
start_tablet_split_monitor();
|
||||
} catch (...) {
|
||||
rtlogger.error("failed to start one of the raft-related background fibers: {}", std::current_exception());
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
|
||||
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &replacement_info] {
|
||||
@@ -8074,13 +7997,8 @@ void storage_service::init_messaging_service() {
|
||||
ser::join_node_rpc_verbs::register_join_node_query(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, service::join_node_query_params) {
|
||||
return handle_raft_rpc(dst_id, [] (auto& ss) -> future<join_node_query_result> {
|
||||
check_raft_rpc_scheduling_group(ss._db.local(), ss._feature_service, "join_node_query");
|
||||
if (!ss.legacy_topology_change_enabled() && !ss.raft_topology_change_enabled()) {
|
||||
throw std::runtime_error("The cluster is upgrading to raft topology. Nodes cannot join at this time.");
|
||||
}
|
||||
auto result = join_node_query_result{
|
||||
.topo_mode = ss.raft_topology_change_enabled()
|
||||
? join_node_query_result::topology_mode::raft
|
||||
: join_node_query_result::topology_mode::legacy,
|
||||
.topo_mode = join_node_query_result::topology_mode::raft
|
||||
};
|
||||
return make_ready_future<join_node_query_result>(std::move(result));
|
||||
});
|
||||
@@ -8579,13 +8497,6 @@ bool storage_service::raft_topology_change_enabled() const {
|
||||
return _topology_change_kind_enabled == topology_change_kind::raft;
|
||||
}
|
||||
|
||||
bool storage_service::legacy_topology_change_enabled() const {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
|
||||
}
|
||||
return _topology_change_kind_enabled == topology_change_kind::legacy;
|
||||
}
|
||||
|
||||
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
|
||||
_protocol_servers.push_back(&server);
|
||||
if (start_instantly) {
|
||||
|
||||
@@ -929,7 +929,6 @@ private:
|
||||
|
||||
public:
|
||||
bool raft_topology_change_enabled() const;
|
||||
bool legacy_topology_change_enabled() const;
|
||||
|
||||
private:
|
||||
future<> _raft_state_monitor = make_ready_future<>();
|
||||
@@ -1045,7 +1044,6 @@ public:
|
||||
private:
|
||||
// Tracks progress of the upgrade to topology coordinator.
|
||||
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
|
||||
future<> track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy);
|
||||
|
||||
future<> transit_tablet(table_id, dht::token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations);
|
||||
future<service::group0_guard> get_guard_for_tablet_update();
|
||||
|
||||
@@ -1,214 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
|
||||
from cassandra.cluster import ConsistencyLevel
|
||||
from test.cluster.util import wait_until_topology_upgrade_finishes, enter_recovery_state, reconnect_driver, \
|
||||
delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \
|
||||
wait_for_token_ring_and_group0_consistency
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
|
||||
def auth_data():
|
||||
return [
|
||||
{
|
||||
"statement": "INSERT INTO system_distributed.service_levels (service_level, timeout, workload_type) VALUES (?, ?, ?)",
|
||||
"rows": [
|
||||
("sl1", None, None),
|
||||
]
|
||||
},
|
||||
{
|
||||
"statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)",
|
||||
"rows": [
|
||||
("user 1", True, False, frozenset({'users'}), "salt1?"),
|
||||
("user 2", True, False, frozenset({'users'}), "salt2#"),
|
||||
("users", False, False, None, None),
|
||||
]
|
||||
},
|
||||
{
|
||||
"statement": "INSERT INTO system_auth.role_members (role, member) VALUES (?, ?)",
|
||||
"rows": [
|
||||
("users", "user 1"),
|
||||
("users", "user 2"),
|
||||
]
|
||||
},
|
||||
{
|
||||
"statement": "INSERT INTO system_auth.role_attributes (role, name, value) VALUES (?, ?, ?)",
|
||||
"rows": [
|
||||
("users", "service_level", "sl1"),
|
||||
]
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
async def populate_test_data(manager: ManagerClient, data):
|
||||
cql = manager.get_cql()
|
||||
for d in data:
|
||||
stmt = cql.prepare(d["statement"])
|
||||
stmt.consistency_level = ConsistencyLevel.ALL
|
||||
await asyncio.gather(*(
|
||||
cql.run_async(stmt.bind(row_data)) for row_data in d["rows"]))
|
||||
|
||||
|
||||
async def populate_auth_v1_data(manager: ManagerClient):
|
||||
await populate_test_data(manager, auth_data())
|
||||
# test also absence of deleted data
|
||||
username = unique_name("deleted_user_")
|
||||
logging.info(f"Creating deleted auth-v1 user: {username}")
|
||||
await populate_test_data(manager, [
|
||||
{
|
||||
"statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)",
|
||||
"rows": [
|
||||
(username, True, False, None, "fefe"),
|
||||
]
|
||||
},
|
||||
{
|
||||
"statement": "DELETE FROM system_auth.roles WHERE role = ?",
|
||||
"rows": [
|
||||
(username,),
|
||||
]
|
||||
},
|
||||
])
|
||||
|
||||
|
||||
async def warmup_v1_static_values(manager: ManagerClient, hosts):
|
||||
# auth-v1 was using statics to cache internal queries
|
||||
# in auth-v2 those statics were removed but we want to
|
||||
# verify that it was effective so trigger here internal
|
||||
# call to potentially populate static storage and we'll
|
||||
# verify later that after migration properly formed query
|
||||
# executes (query has to change because keyspace name changes)
|
||||
cql = manager.get_cql()
|
||||
await asyncio.gather(*(cql.run_async("LIST ROLES", host=host) for host in hosts))
|
||||
|
||||
|
||||
async def check_auth_v2_data_migration(manager: ManagerClient, hosts):
|
||||
cql = manager.get_cql()
|
||||
# auth reads are eventually consistent so we need to make sure hosts are up-to-date
|
||||
assert hosts
|
||||
await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts))
|
||||
|
||||
data = auth_data()
|
||||
|
||||
roles = set()
|
||||
for row in await cql.run_async("SELECT * FROM system.roles"):
|
||||
if row.role == "cassandra":
|
||||
# Skip default role, its creation in auth-v1
|
||||
# is asynchronous and all nodes race to create it
|
||||
# so we'd need to delay the test and wait.
|
||||
# Checking this particular role doesn't bring much value
|
||||
# to the test as we check other roles to demonstrate correctness
|
||||
continue
|
||||
member_of = frozenset(row.member_of) if row.member_of else None
|
||||
roles.add((row.role, row.can_login, row.is_superuser, member_of, row.salted_hash))
|
||||
assert roles == set(data[1]["rows"])
|
||||
|
||||
role_members = set()
|
||||
for row in await cql.run_async("SELECT * FROM system.role_members"):
|
||||
role_members.add((row.role, row.member))
|
||||
assert role_members == set(data[2]["rows"])
|
||||
|
||||
role_attributes = set()
|
||||
for row in await cql.run_async("SELECT * FROM system.role_attributes"):
|
||||
role_attributes.add((row.role, row.name, row.value))
|
||||
assert role_attributes == set(data[3]["rows"])
|
||||
|
||||
|
||||
async def check_auth_v2_works(manager: ManagerClient, hosts):
|
||||
cql = manager.get_cql()
|
||||
roles = set()
|
||||
for row in await cql.run_async("LIST ROLES"):
|
||||
roles.add(row.role)
|
||||
assert roles == set(["cassandra", "user 1", "user 2", "users"])
|
||||
|
||||
user1_roles = await cql.run_async("LIST ROLES OF 'user 1'")
|
||||
assert len(user1_roles) == 2
|
||||
assert set([user1_roles[0].role, user1_roles[1].role]) == set(["users", "user 1"])
|
||||
|
||||
username = unique_name("user_after_migration_")
|
||||
logging.info(f"Create role after migration: {username}")
|
||||
await cql.run_async(f"CREATE ROLE {username}")
|
||||
await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts))
|
||||
# see warmup_v1_static_values for background about checks below
|
||||
# check if it was added to a new table
|
||||
assert len(await cql.run_async(f"SELECT role FROM system.roles WHERE role = '{username}'")) == 1
|
||||
# check whether list roles statement sees it also via new table (on all nodes)
|
||||
await asyncio.gather(*(cql.run_async(f"LIST ROLES OF {username}", host=host) for host in hosts))
|
||||
await cql.run_async(f"DROP ROLE {username}")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_v2_during_recovery(manager: ManagerClient):
|
||||
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.
|
||||
servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking auth version before recovery")
|
||||
auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'")
|
||||
assert auth_version[0].value == "2"
|
||||
|
||||
logging.info("Creating role before recovery")
|
||||
role_name = "ro" + unique_name()
|
||||
await cql.run_async(f"CREATE ROLE {role_name}")
|
||||
# auth reads are eventually consistent so we need to sync all nodes
|
||||
await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts))
|
||||
|
||||
logging.info("Read roles before recovery")
|
||||
roles = [row.role for row in await cql.run_async(f"LIST ROLES")]
|
||||
assert set(roles) == set([role_name, "cassandra"])
|
||||
|
||||
logging.info("Poison with auth_v1 look a like data")
|
||||
# this will verify that old roles are not brought back during recovery
|
||||
# as it runs very similar code path as during v1->v2 migration
|
||||
await cql.run_async(f"CREATE TABLE system_auth.roles (role text PRIMARY KEY, can_login boolean, is_superuser boolean, member_of set<text>, salted_hash text)")
|
||||
v1_ro_name = "v1_ro" + unique_name()
|
||||
await cql.run_async(f"INSERT INTO system_auth.roles (role) VALUES ('{v1_ro_name}')")
|
||||
|
||||
logging.info(f"Restarting hosts {hosts} in recovery mode")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await manager.rolling_restart(servers)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
await reconnect_driver(manager)
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
logging.info("Checking auth version during recovery")
|
||||
auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'")
|
||||
assert auth_version[0].value == "2"
|
||||
|
||||
logging.info("Reading roles during recovery")
|
||||
roles = [row.role for row in await cql.run_async(f"LIST ROLES")]
|
||||
assert set(roles) == set([role_name, "cassandra"])
|
||||
|
||||
logging.info("Restoring cluster to normal status")
|
||||
await asyncio.gather(*(delete_raft_topology_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
await manager.rolling_restart(servers)
|
||||
|
||||
await reconnect_driver(manager)
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Checking auth version after recovery")
|
||||
auth_version = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'auth_version'")
|
||||
assert auth_version[0].value == "2"
|
||||
|
||||
logging.info("Reading roles after recovery")
|
||||
roles = [row.role for row in await manager.get_cql().run_async(f"LIST ROLES")]
|
||||
assert set(roles) == set([role_name, "cassandra"])
|
||||
@@ -1,175 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql
|
||||
from test.cluster.util import enter_recovery_state, \
|
||||
delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \
|
||||
wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \
|
||||
check_system_topology_and_cdc_generations_v3_consistency, start_writes_to_cdc_table, wait_until_last_generation_is_in_use
|
||||
from test.cluster.conftest import cluster_con
|
||||
|
||||
|
||||
@pytest.mark.nightly
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_topology_recovery_basic(request, build_mode: str, manager: ManagerClient):
|
||||
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.
|
||||
|
||||
# Increase ring delay to ensure nodes learn about CDC generations before they start operating.
|
||||
ring_delay = 15000 if build_mode == 'debug' else 5000
|
||||
normal_cfg = {'ring_delay_ms': ring_delay}
|
||||
zero_token_cfg = {'ring_delay_ms': ring_delay, 'join_ring': False}
|
||||
|
||||
servers = [await manager.server_add(config=normal_cfg),
|
||||
await manager.server_add(config=zero_token_cfg),
|
||||
await manager.server_add(config=normal_cfg)]
|
||||
|
||||
# The zero-token node requires a different cql session not to be ignored by the driver because of empty tokens in
|
||||
# the system.peers table.
|
||||
# We need one cql session for both token-owning nodes to continue the write workload during the rolling restart.
|
||||
cql_normal = cluster_con([servers[0].ip_addr, servers[2].ip_addr], load_balancing_policy=
|
||||
WhiteListRoundRobinPolicy([servers[0].ip_addr, servers[2].ip_addr])).connect()
|
||||
cql_zero_token = cluster_con([servers[1].ip_addr], load_balancing_policy=
|
||||
WhiteListRoundRobinPolicy([servers[1].ip_addr])).connect()
|
||||
# In the whole test, cqls[i] and hosts[i] correspond to servers[i].
|
||||
cqls = [cql_normal, cql_zero_token, cql_normal]
|
||||
hosts = [cql_normal.hosts[0] if cql_normal.hosts[0].address == servers[0].ip_addr else cql_normal.hosts[1],
|
||||
cql_zero_token.hosts[0],
|
||||
cql_normal.hosts[1] if cql_normal.hosts[0].address == servers[0].ip_addr else cql_normal.hosts[0]]
|
||||
|
||||
# We don't want to use ManagerClient.rolling_restart. Waiting for CQL of the zero-token node would time out.
|
||||
async def rolling_restart():
|
||||
for idx, s in enumerate(servers):
|
||||
await manager.server_stop_gracefully(s.server_id)
|
||||
|
||||
for idx2 in range(len(servers)):
|
||||
if idx2 != idx:
|
||||
await manager.server_not_sees_other_server(servers[idx2].ip_addr, s.ip_addr)
|
||||
|
||||
await manager.server_start(s.server_id)
|
||||
|
||||
for idx2 in range(len(servers)):
|
||||
if idx2 != idx:
|
||||
await manager.server_sees_other_server(servers[idx2].ip_addr, s.ip_addr)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
restart_writes, stop_writes_and_verify = await start_writes_to_cdc_table(cql_normal)
|
||||
|
||||
logging.info(f"Restarting hosts {hosts} in recovery mode")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
# If we restarted nodes before the last generation was in use, some writes
|
||||
# could fail. After restart, nodes load only the last generation. If it's
|
||||
# not active yet, writes with lower timestamps would fail.
|
||||
await wait_until_last_generation_is_in_use(cql_normal)
|
||||
|
||||
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in all 3 generations")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Restart sequentially, as it tests how nodes operating in legacy mode
|
||||
# react to raft topology mode nodes and vice versa
|
||||
await rolling_restart()
|
||||
|
||||
await stop_writes_and_verify()
|
||||
|
||||
def reconnect_cqls():
|
||||
nonlocal cql_normal
|
||||
nonlocal cql_zero_token
|
||||
nonlocal cqls
|
||||
cql_normal.shutdown()
|
||||
cql_zero_token.shutdown()
|
||||
cql_normal = cluster_con([servers[0].ip_addr, servers[2].ip_addr], load_balancing_policy=
|
||||
WhiteListRoundRobinPolicy([servers[0].ip_addr, servers[2].ip_addr])).connect()
|
||||
cql_zero_token = cluster_con([servers[1].ip_addr], load_balancing_policy=
|
||||
WhiteListRoundRobinPolicy([servers[1].ip_addr])).connect()
|
||||
cqls = [cql_normal, cql_zero_token, cql_normal]
|
||||
|
||||
reconnect_cqls()
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts)))
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
restart_writes(cql_normal)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_topology_state(cql, h) for cql, h in zip(cqls, hosts)))
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
logging.info(f"Restarting hosts {hosts}")
|
||||
await rolling_restart()
|
||||
|
||||
# FIXME: We must reconnect the driver before performing CQL queries below, for example
|
||||
# in wait_until_schema_upgrade_finishes. Unfortunately, it forces us to stop writing to
|
||||
# a CDC table first. Reconnecting the driver would close the session used to send the
|
||||
# writes, and some writes could time out on the client.
|
||||
# Once https://github.com/scylladb/python-driver/issues/295 is fixed, we can remove
|
||||
# all calls to reconnect_driver, restart_writes and leave only the last call to
|
||||
# stop_writes_and_verify.
|
||||
await stop_writes_and_verify()
|
||||
|
||||
reconnect_cqls()
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
restart_writes(cql_normal)
|
||||
|
||||
logging.info("Waiting until upgrade to raft schema finishes")
|
||||
await asyncio.gather(*(wait_until_schema_upgrade_finishes(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
logging.info("Checking the topology upgrade state on all nodes")
|
||||
for host in hosts:
|
||||
status = await manager.api.raft_topology_upgrade_status(host.address)
|
||||
assert status == "not_upgraded"
|
||||
|
||||
logging.info("Waiting until all nodes see others as alive")
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(hosts[0].address)
|
||||
|
||||
logging.info("Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Waiting for CDC generations publishing")
|
||||
for cql, h in zip(cqls, hosts):
|
||||
await wait_for_cdc_generations_publishing(cql, [h], time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, cqls)
|
||||
|
||||
logging.info("Booting new node")
|
||||
servers += [await manager.server_add(config=normal_cfg)]
|
||||
cqls += [cluster_con([servers[3].ip_addr],
|
||||
load_balancing_policy=WhiteListRoundRobinPolicy([servers[3].ip_addr])).connect()]
|
||||
hosts += [cqls[3].hosts[0]]
|
||||
await asyncio.gather(*(wait_for_cql(cql, h, time.time() + 60) for cql, h in zip(cqls, hosts)))
|
||||
|
||||
logging.info("Waiting for the new CDC generation publishing")
|
||||
for cql, h in zip(cqls, hosts):
|
||||
await wait_for_cdc_generations_publishing(cql, [h], time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, cqls)
|
||||
|
||||
await wait_until_last_generation_is_in_use(cql_normal)
|
||||
|
||||
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in the last generation")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
logging.info("Checking correctness of data in system_distributed.cdc_streams_descriptions_v2")
|
||||
await stop_writes_and_verify()
|
||||
@@ -1,99 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from cassandra.policies import WhiteListRoundRobinPolicy
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.cluster.util import reconnect_driver, enter_recovery_state, \
|
||||
delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \
|
||||
wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \
|
||||
check_system_topology_and_cdc_generations_v3_consistency
|
||||
from test.cluster.conftest import cluster_con
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_topology_recovery_after_majority_loss(request, manager: ManagerClient):
|
||||
servers = await manager.servers_add(3)
|
||||
servers += await manager.servers_add(2, config={'join_ring': False})
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
# Currently python driver ignores zero-token nodes, so we skip them here.
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers[:-2], time.time() + 60)
|
||||
|
||||
def host_for_zero_token_node(server):
|
||||
# In order to get the Host instance of a zero-token node, connect to it directly.
|
||||
# We cannot use the ManagerClient's session because it is connected to non-zero-token
|
||||
# nodes and, in that case, the zero-token nodes are ignored.
|
||||
zero_token_node_session = cluster_con([server.ip_addr], load_balancing_policy=
|
||||
WhiteListRoundRobinPolicy([server.ip_addr])).connect()
|
||||
return zero_token_node_session.hosts[0]
|
||||
|
||||
zero_token_hosts = [host_for_zero_token_node(s) for s in [servers[3], servers[4]]]
|
||||
|
||||
srv1, *others = servers
|
||||
|
||||
logging.info(f"Killing all nodes except {srv1}")
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in others))
|
||||
|
||||
logging.info(f"Entering recovery state on {srv1}")
|
||||
host1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
||||
await enter_recovery_state(cql, host1)
|
||||
await manager.server_restart(srv1.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Node restarted, waiting until driver connects")
|
||||
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
|
||||
|
||||
for i in range(len(others)):
|
||||
to_remove = others[i]
|
||||
ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]]
|
||||
logging.info(f"Removing {to_remove} using {srv1} with ignore_dead: {ignore_dead_ips}")
|
||||
await manager.remove_node(srv1.server_id, to_remove.server_id, ignore_dead_ips)
|
||||
|
||||
removed_hosts = hosts[1:] + zero_token_hosts
|
||||
|
||||
logging.info(f"Deleting old Raft data and upgrade state on {host1} and restarting")
|
||||
await delete_raft_topology_state(cql, host1)
|
||||
await delete_raft_data_and_upgrade_state(cql, host1)
|
||||
await manager.server_restart(srv1.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Node restarted, waiting until driver connects")
|
||||
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
|
||||
|
||||
logging.info("Waiting until upgrade to raft schema finishes.")
|
||||
await wait_until_schema_upgrade_finishes(cql, host1, time.time() + 60)
|
||||
|
||||
logging.info("Triggering upgrade to raft topology")
|
||||
await manager.api.upgrade_to_raft_topology(host1.address)
|
||||
|
||||
logging.info("Waiting until upgrade to raft topology finishes")
|
||||
await wait_until_topology_upgrade_finishes(manager, host1.address, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for CDC generations publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, [host1], time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1], ignored_hosts=removed_hosts)
|
||||
|
||||
logging.info("Add two more nodes")
|
||||
servers = [srv1] + await manager.servers_add(2)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for the new CDC generations publishing")
|
||||
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts)
|
||||
@@ -326,36 +326,6 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie
|
||||
res = await log.grep(r'ERROR.*\[shard [0-9]: [a-z]+\] raft_topology - topology change coordinator fiber got error exceptions::unavailable_exception \(Cannot achieve consistency level for')
|
||||
assert len(res) == 0
|
||||
|
||||
# Regression test for scylladb/scylladb#23536
|
||||
# New node addition with 'sleep_in_synchronize' injection shoehorns the execution path to
|
||||
# hit 'raft_group0_client::start_operation' when upgrade state is synchronize. The test
|
||||
# confirms successful execution in such cases.
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_with_synchronize_wait(manager: ManagerClient):
|
||||
servers = []
|
||||
servers.append(await manager.server_add())
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
# With tablets and view building coordinator, view build status is marked
|
||||
# in the same group0 batch that sets new node status to normal,
|
||||
# so `start_operation()` is not called and this test doesn't work.
|
||||
ks = await create_keyspace(cql, disable_tablets=True)
|
||||
await create_table(cql, ks)
|
||||
# 'raft_group0_client::start_operation' gets called underneath this.
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
# add a node
|
||||
new_server = await manager.server_add(start=False, config={
|
||||
'error_injections_at_startup': ['sleep_in_synchronize']
|
||||
})
|
||||
task = asyncio.create_task(manager.server_start(new_server.server_id))
|
||||
log = await manager.server_open_log(new_server.server_id)
|
||||
await log.wait_for("start_operation: waiting until local node leaves synchronize state to start a group 0 operation")
|
||||
await manager.api.message_injection(new_server.ip_addr, 'sleep_in_synchronize')
|
||||
|
||||
await task
|
||||
|
||||
# Test that when removing the view, its build status is cleaned from the status table
|
||||
@pytest.mark.asyncio
|
||||
async def test_view_build_status_cleanup_on_drop_view(manager: ManagerClient):
|
||||
|
||||
Reference in New Issue
Block a user