Compare commits

...

3 Commits

Author SHA1 Message Date
Alex
a0303bfd41 test/auth_cluster: simulate v1 state in self-heal test
When skip_service_levels_v2_initialization is used, write an explicit
v1 service level version marker while skipping v2 initialization. This
lets the restart test exercise self-healing from v1 to v2.
2026-05-13 16:00:02 +03:00
Alex
12dfd9b487 qos: self-heal stale service levels version on startup
Add self_heal_service_levels_version() and use it during startup when
  the node is already on raft topology but service levels are still marked
  as v1.

  In that stale state, migrate service levels to v2 through group0 instead
  of failing startup.
2026-05-13 16:00:02 +03:00
Alex
ac0a19aab8 qos: reintroduce service levels v2 migration self-heal
migrate_to_v2() was removed after gossip-based service level migration
  support was dropped, since upgraded nodes were expected to already use
  service levels v2.

  However, clusters affected by the old migration bug may reach raft topology
  while system.scylla_local still has a stale service level version. Restore
  the migration helper so startup can self-heal those nodes by writing the v2
  state through group0.
2026-05-13 10:16:02 +03:00
8 changed files with 242 additions and 11 deletions

View File

@@ -96,6 +96,20 @@ schema_ptr cdc_timestamps() {
static const sstring CDC_TIMESTAMPS_KEY = "timestamps";
schema_ptr service_levels() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS, std::make_optional(id))
.with_column("service_level", utf8_type, column_kind::partition_key)
.with_column("timeout", duration_type)
.with_column("workload_type", utf8_type)
.with_column("shares", int32_type)
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr snapshot_sstables() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES);
@@ -143,12 +157,13 @@ static std::vector<schema_ptr> ensured_tables() {
view_build_status(),
cdc_desc(),
cdc_timestamps(),
service_levels(),
snapshot_sstables(),
};
}
std::vector<schema_ptr> system_distributed_keyspace::all_distributed_tables() {
return {view_build_status(), cdc_desc(), cdc_timestamps(), snapshot_sstables()};
return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels(), snapshot_sstables()};
}
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm, service::storage_proxy& sp)

View File

@@ -55,6 +55,7 @@ public:
static constexpr auto NAME = "system_distributed";
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
static constexpr auto SERVICE_LEVELS = "service_levels";
/* This table is used by CDC clients to learn about available CDC streams. */
static constexpr auto CDC_DESC_V2 = "cdc_streams_descriptions_v2";

49
main.cc
View File

@@ -68,6 +68,7 @@
#include "vector_search/vector_store_client.hh"
#include <cstdio>
#include <seastar/core/file.hh>
#include <stdexcept>
#include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
@@ -223,6 +224,33 @@ read_config(bpo::variables_map& opts, db::config& cfg) {
}
}
static void
self_heal_service_levels_version(db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
static constexpr unsigned max_attempts = 10;
for (unsigned attempt = 1; attempt <= max_attempts; ++attempt) {
try {
auto guard = group0_client.start_operation(as).get();
auto service_levels_version = sys_ks.get_service_levels_version().get();
service::release_guard(std::move(guard));
if (service_levels_version && *service_levels_version == 2) {
startlog.info("Service levels version marker was already self-healed to v2.");
return;
}
auto nodes_count = qp.db().real_database().get_token_metadata().get_normal_token_owners().size();
qos::service_level_controller::migrate_to_v2(nodes_count, sys_ks, qp, group0_client, as).get();
group0_client.send_group0_read_barrier_to_live_members().get();
startlog.info("Self-healed service levels version marker to v2.");
return;
} catch (...) {
if (attempt == max_attempts) {
std::throw_with_nested(std::runtime_error(format("Failed to self-heal service levels version marker after {} attempts", max_attempts)));
}
startlog.info("Concurrent group0 operation while self-healing service levels version marker, retrying ({}/{}).", attempt, max_attempts);
}
}
}
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
static future<>
enable_initial_error_injections(const db::config& cfg) {
@@ -1525,6 +1553,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sys_ks.local().build_bootstrap_info().get();
bool should_self_heal_service_levels_version = false;
if (sys_ks.local().bootstrap_complete()) {
// Check as early as possible if the cluster is fully upgraded to use Raft, since if it's not, then this node cannot be started with the current version.
if (sys_ks.local().load_group0_upgrade_state().get() != "use_post_raft_procedures") {
@@ -1532,7 +1561,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
" a node of a cluster that is not using Raft yet. This is no longer supported. Please first complete the upgrade of the cluster to use Raft");
}
if (sys_ks.local().load_topology_upgrade_state().get() != "done") {
const bool raft_topology_done = sys_ks.local().load_topology_upgrade_state().get() == "done";
if (!raft_topology_done) {
throw std::runtime_error(
"Cannot start - cluster is not yet upgraded to use raft topology and this version does not support legacy topology operations. "
"If you are trying to upgrade the node then first upgrade the cluster to use raft topology.");
@@ -1543,10 +1573,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
"Cannot start - cluster is not yet upgraded to use auth v2 and this version does not support legacy auth. "
"If you are trying to upgrade the node then first upgrade the cluster to use auth v2.");
}
if (sys_ks.local().get_service_levels_version().get() != 2) {
throw std::runtime_error(
"Cannot start - cluster is not yet upgraded to use service levels v2 and this version does not support legacy service levels. "
"If you are trying to upgrade the node then first upgrade the cluster to use service levels v2.");
auto service_levels_version = sys_ks.local().get_service_levels_version().get();
if (raft_topology_done && (!service_levels_version || *service_levels_version != 2)
&& !utils::get_local_injector().enter("skip_service_levels_v2_initialization")) {
should_self_heal_service_levels_version = true;
startlog.warn(
"Cluster is using raft topology but service levels are still marked as version {}. "
"Startup will continue and the service levels version marker will be self-healed after group0 starts.",
service_levels_version ? format("{}", *service_levels_version) : "unset");
}
if (sys_ks.local().get_view_builder_version().get() != db::system_keyspace::view_builder_version_t::v2) {
throw std::runtime_error(
@@ -2368,6 +2402,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
stop_signal.ready(false);
if (should_self_heal_service_levels_version) {
checkpoint(stop_signal, "self-healing service levels version");
self_heal_service_levels_version(sys_ks.local(), qp.local(), group0_client, stop_signal.as_local_abort_source());
}
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");

View File

@@ -26,6 +26,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service_level_controller.hh"
#include "db/system_distributed_keyspace.hh"
#include "cql3/query_processor.hh"
#include "service/storage_service.hh"
#include "service/topology_state_machine.hh"
@@ -739,6 +740,80 @@ future<> service_level_controller::do_add_service_level(sstring name, service_le
return make_ready_future();
}
future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
//TODO:
//Now we trust the administrator to not make changes to service levels during the migration.
//Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes
using namespace std::chrono_literals;
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS);
const auto t = 5min;
const timeout_config tc{t, t, t, t, t, t, t};
service::client_state cs(::service::client_state::internal_tag{}, tc);
service::query_state qs(cs, empty_service_permit());
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
auto cl = db::consistency_level::ALL;
if (nodes_count == 1) {
cl = db::consistency_level::ONE;
} else if (nodes_count == 2) {
cl = db::consistency_level::TWO;
}
auto rows = co_await qp.execute_internal(
format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS),
cl,
qs,
{},
cql3::query_processor::cache_internal::no);
auto col_names = schema->all_columns() | std::views::transform([] (const auto& col) {return col.name_as_cql_string(); }) | std::ranges::to<std::vector<sstring>>();
auto col_names_str = fmt::to_string(fmt::join(col_names, ", "));
sstring val_binders_str = "?";
for (size_t i = 1; i < col_names.size(); ++i) {
val_binders_str += ", ?";
}
auto guard = co_await group0_client.start_operation(as);
utils::chunked_vector<mutation> migration_muts;
for (const auto& row: *rows) {
std::vector<data_value_or_unset> values;
for (const auto& col: schema->all_columns()) {
if (row.has(col.name_as_text())) {
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
} else {
values.push_back(unset_value{});
}
}
auto muts = co_await qp.get_mutations_internal(
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
db::system_keyspace::NAME,
db::system_keyspace::SERVICE_LEVELS_V2,
col_names_str,
val_binders_str),
qos_query_state(),
guard.write_timestamp(),
std::move(values));
if (muts.size() != 1) {
on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size()));
}
migration_muts.push_back(std::move(muts[0]));
}
auto status_mut = co_await sys_ks.make_service_levels_version_mutation(2, guard.write_timestamp());
migration_muts.push_back(std::move(status_mut));
service::write_mutations change {
.mutations{migration_muts.begin(), migration_muts.end()},
};
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
}
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
auto service_level_it = _service_levels_db.find(name);
if (service_level_it != _service_levels_db.end()) {

View File

@@ -396,6 +396,11 @@ public:
return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter);
}
/**
* Migrate data from `system_distributed.service_levels` to `system.service_levels_v2`.
*/
static future<> migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as);
private:
/**
* Adds a service level configuration if it doesn't exists, and updates

View File

@@ -1374,16 +1374,19 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
auto enable_features_mutation = builder.build();
insert_join_request_mutations.push_back(std::move(enable_features_mutation));
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(2, write_timestamp);
auto skip_service_levels_v2_initialization = utils::get_local_injector().enter("skip_service_levels_v2_initialization");
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(skip_service_levels_v2_initialization ? 1 : 2, write_timestamp);
insert_join_request_mutations.emplace_back(std::move(sl_status_mutation));
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_auth_version_mutation(write_timestamp, db::system_keyspace::auth_version_t::v2));
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_view_builder_version_mutation(write_timestamp, db::system_keyspace::view_builder_version_t::v2));
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
for (auto& m : sl_driver_mutations) {
insert_join_request_mutations.emplace_back(m);
if (!skip_service_levels_v2_initialization) {
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
for (auto& m : sl_driver_mutations) {
insert_join_request_mutations.emplace_back(m);
}
}
topology_change change{std::move(insert_join_request_mutations)};

View File

@@ -4308,7 +4308,7 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_t
// it's in `topology_coordinator::enable_features` ,so topology_coordinator will re-run its loop
// and `maybe_migrate_system_tables` will be called.
if (_feature_service.driver_service_level) {
if (_feature_service.driver_service_level && !utils::get_local_injector().enter("skip_service_levels_v2_initialization")) {
const auto sl_driver_created = co_await _sys_ks.get_service_level_driver_created();
if (!sl_driver_created.value_or(false)) {
co_return co_await _sl_controller.migrate_to_driver_service_level(std::move(guard), _sys_ks);

View File

@@ -0,0 +1,93 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
import time
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra.util import Duration
import pytest
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
from test.cluster.util import reconnect_driver
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
NANOS_PER_SECOND = 1_000_000_000
V1_SERVICE_LEVELS = {
"sl_v1_interactive": (Duration(0, 0, 30 * NANOS_PER_SECOND), "interactive", 1000),
"sl_v1_batch": (Duration(0, 0, 60 * NANOS_PER_SECOND), "batch", 500)}
def assert_service_levels(rows, expected):
rows_by_name = {row.service_level: row for row in rows}
assert rows_by_name.keys() == expected.keys()
for name, (timeout, workload_type, shares) in expected.items():
row = rows_by_name[name]
assert row.timeout == timeout
assert row.workload_type == workload_type
assert row.shares == shares
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode="release", reason="error injection is disabled in release mode")
async def test_self_heals_service_levels_v1_after_restart(manager: ManagerClient, scale_timeout: callable):
"""Reproduces a raft-topology cluster created before service levels were initialized as v2."""
service_level_ver_query = "SELECT value FROM system.scylla_local WHERE key = 'service_level_version'"
service_level_list_query = "LIST ALL SERVICE LEVELS"
service_levels_v1_query = SimpleStatement("SELECT service_level, timeout, workload_type, shares FROM system_distributed.service_levels",
consistency_level=ConsistencyLevel.ONE)
config = {
**auth_config,
"error_injections_at_startup": ["skip_service_levels_v2_initialization"]}
server = await manager.server_add(config=config)
cql = manager.get_cql()
async def service_levels_version_initialized():
rows = await cql.run_async(service_level_ver_query)
if not rows:
return None
return rows
version_rows = await wait_for(service_levels_version_initialized, time.time() + scale_timeout(30))
assert version_rows[0].value == "1"
insert_service_level = cql.prepare("INSERT INTO system_distributed.service_levels (service_level, timeout, workload_type, shares) VALUES (?, ?, ?, ?)")
insert_service_level.consistency_level = ConsistencyLevel.ONE
for name, (timeout, workload_type, shares) in V1_SERVICE_LEVELS.items():
await cql.run_async(insert_service_level, (name, timeout, workload_type, shares))
assert_service_levels(await cql.run_async(service_levels_v1_query), V1_SERVICE_LEVELS)
assert await cql.run_async(service_level_list_query) == []
await manager.server_stop_gracefully(server.server_id)
await manager.server_update_config(server.server_id, "error_injections_at_startup", [])
await manager.server_start(server.server_id)
cql = await reconnect_driver(manager)
await manager.api.reload_raft_topology_state(server.ip_addr)
expected_service_levels = {**V1_SERVICE_LEVELS, "driver": (None, "batch", 200)}
async def service_levels_v2_healed():
version_rows = await cql.run_async(service_level_ver_query)
if not version_rows or version_rows[0].value != "2":
return None
service_levels = await cql.run_async(service_level_list_query)
service_level_names = {row.service_level for row in service_levels}
if service_level_names != expected_service_levels.keys():
return None
assert_service_levels(service_levels, expected_service_levels)
return True
await wait_for(service_levels_v2_healed, time.time() + scale_timeout(30))