mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 19:32:02 +00:00
Compare commits
3 Commits
next
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0303bfd41 | ||
|
|
12dfd9b487 | ||
|
|
ac0a19aab8 |
@@ -96,6 +96,20 @@ schema_ptr cdc_timestamps() {
|
||||
|
||||
static const sstring CDC_TIMESTAMPS_KEY = "timestamps";
|
||||
|
||||
schema_ptr service_levels() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
|
||||
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS, std::make_optional(id))
|
||||
.with_column("service_level", utf8_type, column_kind::partition_key)
|
||||
.with_column("timeout", duration_type)
|
||||
.with_column("workload_type", utf8_type)
|
||||
.with_column("shares", int32_type)
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr snapshot_sstables() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SNAPSHOT_SSTABLES);
|
||||
@@ -143,12 +157,13 @@ static std::vector<schema_ptr> ensured_tables() {
|
||||
view_build_status(),
|
||||
cdc_desc(),
|
||||
cdc_timestamps(),
|
||||
service_levels(),
|
||||
snapshot_sstables(),
|
||||
};
|
||||
}
|
||||
|
||||
std::vector<schema_ptr> system_distributed_keyspace::all_distributed_tables() {
|
||||
return {view_build_status(), cdc_desc(), cdc_timestamps(), snapshot_sstables()};
|
||||
return {view_build_status(), cdc_desc(), cdc_timestamps(), service_levels(), snapshot_sstables()};
|
||||
}
|
||||
|
||||
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm, service::storage_proxy& sp)
|
||||
|
||||
@@ -55,6 +55,7 @@ public:
|
||||
static constexpr auto NAME = "system_distributed";
|
||||
|
||||
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
|
||||
static constexpr auto SERVICE_LEVELS = "service_levels";
|
||||
|
||||
/* This table is used by CDC clients to learn about available CDC streams. */
|
||||
static constexpr auto CDC_DESC_V2 = "cdc_streams_descriptions_v2";
|
||||
|
||||
49
main.cc
49
main.cc
@@ -68,6 +68,7 @@
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
#include <cstdio>
|
||||
#include <seastar/core/file.hh>
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
@@ -223,6 +224,33 @@ read_config(bpo::variables_map& opts, db::config& cfg) {
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
self_heal_service_levels_version(db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
|
||||
static constexpr unsigned max_attempts = 10;
|
||||
for (unsigned attempt = 1; attempt <= max_attempts; ++attempt) {
|
||||
try {
|
||||
auto guard = group0_client.start_operation(as).get();
|
||||
auto service_levels_version = sys_ks.get_service_levels_version().get();
|
||||
service::release_guard(std::move(guard));
|
||||
if (service_levels_version && *service_levels_version == 2) {
|
||||
startlog.info("Service levels version marker was already self-healed to v2.");
|
||||
return;
|
||||
}
|
||||
|
||||
auto nodes_count = qp.db().real_database().get_token_metadata().get_normal_token_owners().size();
|
||||
qos::service_level_controller::migrate_to_v2(nodes_count, sys_ks, qp, group0_client, as).get();
|
||||
group0_client.send_group0_read_barrier_to_live_members().get();
|
||||
startlog.info("Self-healed service levels version marker to v2.");
|
||||
return;
|
||||
} catch (...) {
|
||||
if (attempt == max_attempts) {
|
||||
std::throw_with_nested(std::runtime_error(format("Failed to self-heal service levels version marker after {} attempts", max_attempts)));
|
||||
}
|
||||
startlog.info("Concurrent group0 operation while self-healing service levels version marker, retrying ({}/{}).", attempt, max_attempts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
static future<>
|
||||
enable_initial_error_injections(const db::config& cfg) {
|
||||
@@ -1525,6 +1553,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
sys_ks.local().build_bootstrap_info().get();
|
||||
|
||||
bool should_self_heal_service_levels_version = false;
|
||||
if (sys_ks.local().bootstrap_complete()) {
|
||||
// Check as early as possible if the cluster is fully upgraded to use Raft, since if it's not, then this node cannot be started with the current version.
|
||||
if (sys_ks.local().load_group0_upgrade_state().get() != "use_post_raft_procedures") {
|
||||
@@ -1532,7 +1561,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
" a node of a cluster that is not using Raft yet. This is no longer supported. Please first complete the upgrade of the cluster to use Raft");
|
||||
}
|
||||
|
||||
if (sys_ks.local().load_topology_upgrade_state().get() != "done") {
|
||||
const bool raft_topology_done = sys_ks.local().load_topology_upgrade_state().get() == "done";
|
||||
if (!raft_topology_done) {
|
||||
throw std::runtime_error(
|
||||
"Cannot start - cluster is not yet upgraded to use raft topology and this version does not support legacy topology operations. "
|
||||
"If you are trying to upgrade the node then first upgrade the cluster to use raft topology.");
|
||||
@@ -1543,10 +1573,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
"Cannot start - cluster is not yet upgraded to use auth v2 and this version does not support legacy auth. "
|
||||
"If you are trying to upgrade the node then first upgrade the cluster to use auth v2.");
|
||||
}
|
||||
if (sys_ks.local().get_service_levels_version().get() != 2) {
|
||||
throw std::runtime_error(
|
||||
"Cannot start - cluster is not yet upgraded to use service levels v2 and this version does not support legacy service levels. "
|
||||
"If you are trying to upgrade the node then first upgrade the cluster to use service levels v2.");
|
||||
auto service_levels_version = sys_ks.local().get_service_levels_version().get();
|
||||
if (raft_topology_done && (!service_levels_version || *service_levels_version != 2)
|
||||
&& !utils::get_local_injector().enter("skip_service_levels_v2_initialization")) {
|
||||
should_self_heal_service_levels_version = true;
|
||||
startlog.warn(
|
||||
"Cluster is using raft topology but service levels are still marked as version {}. "
|
||||
"Startup will continue and the service levels version marker will be self-healed after group0 starts.",
|
||||
service_levels_version ? format("{}", *service_levels_version) : "unset");
|
||||
}
|
||||
if (sys_ks.local().get_view_builder_version().get() != db::system_keyspace::view_builder_version_t::v2) {
|
||||
throw std::runtime_error(
|
||||
@@ -2368,6 +2402,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
stop_signal.ready(false);
|
||||
|
||||
if (should_self_heal_service_levels_version) {
|
||||
checkpoint(stop_signal, "self-healing service levels version");
|
||||
self_heal_service_levels_version(sys_ks.local(), qp.local(), group0_client, stop_signal.as_local_abort_source());
|
||||
}
|
||||
|
||||
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
||||
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service_level_controller.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
@@ -739,6 +740,80 @@ future<> service_level_controller::do_add_service_level(sstring name, service_le
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
|
||||
//TODO:
|
||||
//Now we trust the administrator to not make changes to service levels during the migration.
|
||||
//Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS);
|
||||
|
||||
const auto t = 5min;
|
||||
const timeout_config tc{t, t, t, t, t, t, t};
|
||||
service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
service::query_state qs(cs, empty_service_permit());
|
||||
|
||||
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
|
||||
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
|
||||
auto cl = db::consistency_level::ALL;
|
||||
if (nodes_count == 1) {
|
||||
cl = db::consistency_level::ONE;
|
||||
} else if (nodes_count == 2) {
|
||||
cl = db::consistency_level::TWO;
|
||||
}
|
||||
|
||||
auto rows = co_await qp.execute_internal(
|
||||
format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS),
|
||||
cl,
|
||||
qs,
|
||||
{},
|
||||
cql3::query_processor::cache_internal::no);
|
||||
|
||||
auto col_names = schema->all_columns() | std::views::transform([] (const auto& col) {return col.name_as_cql_string(); }) | std::ranges::to<std::vector<sstring>>();
|
||||
auto col_names_str = fmt::to_string(fmt::join(col_names, ", "));
|
||||
sstring val_binders_str = "?";
|
||||
for (size_t i = 1; i < col_names.size(); ++i) {
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(as);
|
||||
|
||||
utils::chunked_vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col: schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
}
|
||||
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_keyspace::NAME,
|
||||
db::system_keyspace::SERVICE_LEVELS_V2,
|
||||
col_names_str,
|
||||
val_binders_str),
|
||||
qos_query_state(),
|
||||
guard.write_timestamp(),
|
||||
std::move(values));
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size()));
|
||||
}
|
||||
migration_muts.push_back(std::move(muts[0]));
|
||||
}
|
||||
|
||||
auto status_mut = co_await sys_ks.make_service_levels_version_mutation(2, guard.write_timestamp());
|
||||
migration_muts.push_back(std::move(status_mut));
|
||||
|
||||
service::write_mutations change {
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
auto service_level_it = _service_levels_db.find(name);
|
||||
if (service_level_it != _service_levels_db.end()) {
|
||||
|
||||
@@ -396,6 +396,11 @@ public:
|
||||
return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrate data from `system_distributed.service_levels` to `system.service_levels_v2`.
|
||||
*/
|
||||
static future<> migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
|
||||
@@ -1374,16 +1374,19 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
auto enable_features_mutation = builder.build();
|
||||
insert_join_request_mutations.push_back(std::move(enable_features_mutation));
|
||||
|
||||
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(2, write_timestamp);
|
||||
auto skip_service_levels_v2_initialization = utils::get_local_injector().enter("skip_service_levels_v2_initialization");
|
||||
auto sl_status_mutation = co_await _sys_ks.local().make_service_levels_version_mutation(skip_service_levels_v2_initialization ? 1 : 2, write_timestamp);
|
||||
insert_join_request_mutations.emplace_back(std::move(sl_status_mutation));
|
||||
|
||||
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_auth_version_mutation(write_timestamp, db::system_keyspace::auth_version_t::v2));
|
||||
|
||||
insert_join_request_mutations.emplace_back(co_await _sys_ks.local().make_view_builder_version_mutation(write_timestamp, db::system_keyspace::view_builder_version_t::v2));
|
||||
|
||||
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
|
||||
for (auto& m : sl_driver_mutations) {
|
||||
insert_join_request_mutations.emplace_back(m);
|
||||
if (!skip_service_levels_v2_initialization) {
|
||||
auto sl_driver_mutations = co_await qos::service_level_controller::get_create_driver_service_level_mutations(_sys_ks.local(), write_timestamp);
|
||||
for (auto& m : sl_driver_mutations) {
|
||||
insert_join_request_mutations.emplace_back(m);
|
||||
}
|
||||
}
|
||||
|
||||
topology_change change{std::move(insert_join_request_mutations)};
|
||||
|
||||
@@ -4308,7 +4308,7 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_t
|
||||
// it's in `topology_coordinator::enable_features` ,so topology_coordinator will re-run its loop
|
||||
// and `maybe_migrate_system_tables` will be called.
|
||||
|
||||
if (_feature_service.driver_service_level) {
|
||||
if (_feature_service.driver_service_level && !utils::get_local_injector().enter("skip_service_levels_v2_initialization")) {
|
||||
const auto sl_driver_created = co_await _sys_ks.get_service_level_driver_created();
|
||||
if (!sl_driver_created.value_or(false)) {
|
||||
co_return co_await _sl_controller.migrate_to_driver_service_level(std::move(guard), _sys_ks);
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
|
||||
import time
|
||||
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
from cassandra.util import Duration
|
||||
import pytest
|
||||
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
from test.cluster.util import reconnect_driver
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
|
||||
|
||||
NANOS_PER_SECOND = 1_000_000_000
|
||||
|
||||
V1_SERVICE_LEVELS = {
|
||||
"sl_v1_interactive": (Duration(0, 0, 30 * NANOS_PER_SECOND), "interactive", 1000),
|
||||
"sl_v1_batch": (Duration(0, 0, 60 * NANOS_PER_SECOND), "batch", 500)}
|
||||
|
||||
|
||||
def assert_service_levels(rows, expected):
|
||||
rows_by_name = {row.service_level: row for row in rows}
|
||||
assert rows_by_name.keys() == expected.keys()
|
||||
|
||||
for name, (timeout, workload_type, shares) in expected.items():
|
||||
row = rows_by_name[name]
|
||||
assert row.timeout == timeout
|
||||
assert row.workload_type == workload_type
|
||||
assert row.shares == shares
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode="release", reason="error injection is disabled in release mode")
|
||||
async def test_self_heals_service_levels_v1_after_restart(manager: ManagerClient, scale_timeout: callable):
|
||||
"""Reproduces a raft-topology cluster created before service levels were initialized as v2."""
|
||||
|
||||
service_level_ver_query = "SELECT value FROM system.scylla_local WHERE key = 'service_level_version'"
|
||||
service_level_list_query = "LIST ALL SERVICE LEVELS"
|
||||
service_levels_v1_query = SimpleStatement("SELECT service_level, timeout, workload_type, shares FROM system_distributed.service_levels",
|
||||
consistency_level=ConsistencyLevel.ONE)
|
||||
|
||||
config = {
|
||||
**auth_config,
|
||||
"error_injections_at_startup": ["skip_service_levels_v2_initialization"]}
|
||||
|
||||
server = await manager.server_add(config=config)
|
||||
cql = manager.get_cql()
|
||||
|
||||
async def service_levels_version_initialized():
|
||||
rows = await cql.run_async(service_level_ver_query)
|
||||
if not rows:
|
||||
return None
|
||||
return rows
|
||||
|
||||
version_rows = await wait_for(service_levels_version_initialized, time.time() + scale_timeout(30))
|
||||
assert version_rows[0].value == "1"
|
||||
|
||||
insert_service_level = cql.prepare("INSERT INTO system_distributed.service_levels (service_level, timeout, workload_type, shares) VALUES (?, ?, ?, ?)")
|
||||
insert_service_level.consistency_level = ConsistencyLevel.ONE
|
||||
|
||||
for name, (timeout, workload_type, shares) in V1_SERVICE_LEVELS.items():
|
||||
await cql.run_async(insert_service_level, (name, timeout, workload_type, shares))
|
||||
|
||||
assert_service_levels(await cql.run_async(service_levels_v1_query), V1_SERVICE_LEVELS)
|
||||
assert await cql.run_async(service_level_list_query) == []
|
||||
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
await manager.server_update_config(server.server_id, "error_injections_at_startup", [])
|
||||
await manager.server_start(server.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
await manager.api.reload_raft_topology_state(server.ip_addr)
|
||||
|
||||
expected_service_levels = {**V1_SERVICE_LEVELS, "driver": (None, "batch", 200)}
|
||||
|
||||
async def service_levels_v2_healed():
|
||||
version_rows = await cql.run_async(service_level_ver_query)
|
||||
if not version_rows or version_rows[0].value != "2":
|
||||
return None
|
||||
|
||||
service_levels = await cql.run_async(service_level_list_query)
|
||||
service_level_names = {row.service_level for row in service_levels}
|
||||
if service_level_names != expected_service_levels.keys():
|
||||
return None
|
||||
assert_service_levels(service_levels, expected_service_levels)
|
||||
return True
|
||||
|
||||
await wait_for(service_levels_v2_healed, time.time() + scale_timeout(30))
|
||||
Reference in New Issue
Block a user