service:qos: service levels migration

Migrate data from `system_distributes.service_levels` to
`system.service_levels_v2` during raft topology upgrade.

Migration process reads data from old table with CL ALL
and inserts the data to the new table via raft.
This commit is contained in:
Michał Jadwiszczak
2024-03-04 22:12:08 +01:00
parent 36c9afda99
commit 2917ec5d51
8 changed files with 116 additions and 7 deletions

View File

@@ -1435,7 +1435,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
std::ref(feature_service), std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
std::ref(messaging), std::ref(repair),
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp)).get();
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp), std::ref(sl_controller)).get();
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
ss.stop().get();

View File

@@ -6,9 +6,15 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <boost/algorithm/string/join.hpp>
#include <chrono>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "seastar/core/on_internal_error.hh"
#include "seastar/core/timer.hh"
#include "service_level_controller.hh"
@@ -435,6 +441,80 @@ void service_level_controller::upgrade_to_v2(cql3::query_processor& qp, service:
}
}
future<> service_level_controller::migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
//TODO:
//Now we trust the administrator to not make changes to service levels during the migration.
//Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes
using namespace std::chrono_literals;
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS);
const auto t = 5min;
const timeout_config tc{t, t, t, t, t, t, t};
service::client_state cs(::service::client_state::internal_tag{}, tc);
service::query_state qs(cs, empty_service_permit());
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
auto cl = db::consistency_level::ALL;
if (nodes_count == 1) {
cl = db::consistency_level::ONE;
} else if (nodes_count == 2) {
cl = db::consistency_level::TWO;
}
auto rows = co_await qp.execute_internal(
format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS),
cl,
qs,
{},
cql3::query_processor::cache_internal::no);
if (rows->empty()) {
co_return;
}
auto col_names = boost::copy_range<std::vector<sstring>>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); }));
auto col_names_str = boost::algorithm::join(col_names, ", ");
sstring val_binders_str = "?";
for (size_t i = 1; i < col_names.size(); ++i) {
val_binders_str += ", ?";
}
auto guard = co_await group0_client.start_operation(&as);
std::vector<mutation> migration_muts;
for (const auto& row: *rows) {
std::vector<data_value_or_unset> values;
for (const auto& col: schema->all_columns()) {
if (row.has(col.name_as_text())) {
values.push_back(col.type->deserialize(row.get_blob(col.name_as_text())));
} else {
values.push_back(unset_value{});
}
}
auto muts = co_await qp.get_mutations_internal(
format("INSERT INTO {}.{} ({}) VALUES ({})",
db::system_keyspace::NAME,
db::system_keyspace::SERVICE_LEVELS_V2,
col_names_str,
val_binders_str),
qos_query_state(),
guard.write_timestamp(),
std::move(values));
if (muts.size() != 1) {
on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size()));
}
migration_muts.push_back(std::move(muts[0]));
}
service::write_mutations change {
.mutations{migration_muts.begin(), migration_muts.end()},
};
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
}
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
auto service_level_it = _service_levels_db.find(name);
if (service_level_it != _service_levels_db.end()) {

View File

@@ -20,10 +20,15 @@
#include "service/endpoint_lifecycle_subscriber.hh"
#include "qos_configuration_change_subscriber.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group_registry.hh"
namespace db {
class system_distributed_keyspace;
}
namespace cql3 {
class query_processor;
}
namespace qos {
/**
* a structure to hold a service level
@@ -187,6 +192,10 @@ public:
void upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client);
/**
* Migrate data from `system_distributed.service_levels` to `system.service_levels_v2`
*/
static future<> migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as);
private:
/**
* Adds a service level configuration if it doesn't exists, and updates

View File

@@ -14,6 +14,9 @@
#include "db/system_auth_keyspace.hh"
#include "gc_clock.hh"
#include "raft/raft.hh"
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/qos/service_level_controller.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/topology_guard.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
@@ -129,7 +132,8 @@ storage_service::storage_service(abort_source& abort_source,
sharded<locator::snitch_ptr>& snitch,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<cdc::generation_service>& cdc_gens,
cql3::query_processor& qp)
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller)
: _abort_source(abort_source)
, _feature_service(feature_service)
, _db(db)
@@ -140,6 +144,7 @@ storage_service::storage_service(abort_source& abort_source,
, _repair(repair)
, _stream_manager(stream_manager)
, _snitch(snitch)
, _sl_controller(sl_controller)
, _group0(nullptr)
, _node_ops_abort_thread(node_ops_abort_thread())
, _shared_token_metadata(stm)
@@ -608,6 +613,7 @@ future<> storage_service::topology_state_load() {
if (_manage_topology_change_kind_from_group0) {
_topology_change_kind_enabled = upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state);
}
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::done) {
co_return;
}
@@ -618,6 +624,10 @@ future<> storage_service::topology_state_load() {
qp.auth_version = db::system_auth_keyspace::version_t::v2;
});
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
sl_controller.upgrade_to_v2(_qp, _group0->client());
});
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
return fs.enable(boost::copy_range<std::set<std::string_view>>(_topology_state_machine._topology.enabled_features));
});

View File

@@ -16,6 +16,7 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/qos/service_level_controller.hh"
#include "service/topology_guard.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
@@ -145,6 +146,7 @@ private:
sharded<repair_service>& _repair;
sharded<streaming::stream_manager>& _stream_manager;
sharded<locator::snitch_ptr>& _snitch;
sharded<qos::service_level_controller>& _sl_controller;
// Engaged on shard 0 before `join_cluster`.
service::raft_group0* _group0;
@@ -200,7 +202,8 @@ public:
sharded<locator::snitch_ptr>& snitch,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<cdc::generation_service>& cdc_gs,
cql3::query_processor& qp);
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller);
// Needed by distributed<>
future<> stop();

View File

@@ -27,6 +27,7 @@
#include "replica/database.hh"
#include "replica/tablet_mutation_builder.hh"
#include "replica/tablets.hh"
#include "service/qos/service_level_controller.hh"
#include "service/raft/join_node.hh"
#include "service/raft/raft_address_map.hh"
#include "service/raft/raft_group0.hh"
@@ -2395,6 +2396,9 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
co_await auth::migrate_to_auth_v2(_sys_ks.query_processor(), _group0.client(),
[this] (abort_source*) { return start_operation();}, _as);
rtlogger.info("migrating service levels data");
co_await qos::service_level_controller::migrate_to_v2(_gossiper.num_endpoints(), _sys_ks.query_processor(), _group0.client(), _as);
rtlogger.info("building initial raft topology state and CDC generation");
guard = co_await start_operation();

View File

@@ -220,14 +220,15 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
// Avoid reading from memtables, which does not check timeouts due to being too fast
e.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }).get();
auto msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels");
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
assert_that(msg).is_rows().with_rows({{
duration_type->from_string("5ms")
}});
cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH timeout = 35s");
msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels WHERE service_level = 'sl'");
msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
assert_that(msg).is_rows().with_rows({{
duration_type->from_string("35s")
}});
@@ -312,7 +313,8 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) {
cquery_nofail(e, "CREATE SERVICE LEVEL sl");
cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1");
auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels");
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
assert_that(msg).is_rows().with_rows({{
{}
}});

View File

@@ -770,7 +770,8 @@ private:
std::ref(_snitch),
std::ref(_tablet_allocator),
std::ref(_cdc_generation_service),
std::ref(_qp)).get();
std::ref(_qp),
std::ref(_sl_controller)).get();
auto stop_storage_service = defer([this] { _ss.stop().get(); });
_mnotifier.local().register_listener(&_ss.local());