mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-16 12:52:01 +00:00
service:qos: service levels migration
Migrate data from `system_distributes.service_levels` to `system.service_levels_v2` during raft topology upgrade. Migration process reads data from old table with CL ALL and inserts the data to the new table via raft.
This commit is contained in:
2
main.cc
2
main.cc
@@ -1435,7 +1435,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
std::ref(feature_service), std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp)).get();
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp), std::ref(sl_controller)).get();
|
||||
|
||||
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
|
||||
ss.stop().get();
|
||||
|
||||
@@ -6,9 +6,15 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
#include <chrono>
|
||||
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "seastar/core/on_internal_error.hh"
|
||||
#include "seastar/core/timer.hh"
|
||||
#include "service_level_controller.hh"
|
||||
@@ -435,6 +441,80 @@ void service_level_controller::upgrade_to_v2(cql3::query_processor& qp, service:
|
||||
}
|
||||
}
|
||||
|
||||
future<> service_level_controller::migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
|
||||
//TODO:
|
||||
//Now we trust the administrator to not make changes to service levels during the migration.
|
||||
//Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS);
|
||||
|
||||
const auto t = 5min;
|
||||
const timeout_config tc{t, t, t, t, t, t, t};
|
||||
service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
service::query_state qs(cs, empty_service_permit());
|
||||
|
||||
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
|
||||
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
|
||||
auto cl = db::consistency_level::ALL;
|
||||
if (nodes_count == 1) {
|
||||
cl = db::consistency_level::ONE;
|
||||
} else if (nodes_count == 2) {
|
||||
cl = db::consistency_level::TWO;
|
||||
}
|
||||
|
||||
auto rows = co_await qp.execute_internal(
|
||||
format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS),
|
||||
cl,
|
||||
qs,
|
||||
{},
|
||||
cql3::query_processor::cache_internal::no);
|
||||
if (rows->empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
auto col_names = boost::copy_range<std::vector<sstring>>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); }));
|
||||
auto col_names_str = boost::algorithm::join(col_names, ", ");
|
||||
sstring val_binders_str = "?";
|
||||
for (size_t i = 1; i < col_names.size(); ++i) {
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(&as);
|
||||
|
||||
std::vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col: schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(col.type->deserialize(row.get_blob(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
}
|
||||
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_keyspace::NAME,
|
||||
db::system_keyspace::SERVICE_LEVELS_V2,
|
||||
col_names_str,
|
||||
val_binders_str),
|
||||
qos_query_state(),
|
||||
guard.write_timestamp(),
|
||||
std::move(values));
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size()));
|
||||
}
|
||||
migration_muts.push_back(std::move(muts[0]));
|
||||
}
|
||||
service::write_mutations change {
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
auto service_level_it = _service_levels_db.find(name);
|
||||
if (service_level_it != _service_levels_db.end()) {
|
||||
|
||||
@@ -20,10 +20,15 @@
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "qos_configuration_change_subscriber.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
}
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace qos {
|
||||
/**
|
||||
* a structure to hold a service level
|
||||
@@ -187,6 +192,10 @@ public:
|
||||
|
||||
void upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client);
|
||||
|
||||
/**
|
||||
* Migrate data from `system_distributed.service_levels` to `system.service_levels_v2`
|
||||
*/
|
||||
static future<> migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as);
|
||||
private:
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
|
||||
@@ -14,6 +14,9 @@
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
#include "service/topology_guard.hh"
|
||||
#include "service/session.hh"
|
||||
#include "dht/boot_strapper.hh"
|
||||
@@ -129,7 +132,8 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
sharded<locator::snitch_ptr>& snitch,
|
||||
sharded<service::tablet_allocator>& tablet_allocator,
|
||||
sharded<cdc::generation_service>& cdc_gens,
|
||||
cql3::query_processor& qp)
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller)
|
||||
: _abort_source(abort_source)
|
||||
, _feature_service(feature_service)
|
||||
, _db(db)
|
||||
@@ -140,6 +144,7 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _repair(repair)
|
||||
, _stream_manager(stream_manager)
|
||||
, _snitch(snitch)
|
||||
, _sl_controller(sl_controller)
|
||||
, _group0(nullptr)
|
||||
, _node_ops_abort_thread(node_ops_abort_thread())
|
||||
, _shared_token_metadata(stm)
|
||||
@@ -608,6 +613,7 @@ future<> storage_service::topology_state_load() {
|
||||
if (_manage_topology_change_kind_from_group0) {
|
||||
_topology_change_kind_enabled = upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state);
|
||||
}
|
||||
|
||||
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::done) {
|
||||
co_return;
|
||||
}
|
||||
@@ -618,6 +624,10 @@ future<> storage_service::topology_state_load() {
|
||||
qp.auth_version = db::system_auth_keyspace::version_t::v2;
|
||||
});
|
||||
|
||||
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
|
||||
sl_controller.upgrade_to_v2(_qp, _group0->client());
|
||||
});
|
||||
|
||||
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
|
||||
return fs.enable(boost::copy_range<std::set<std::string_view>>(_topology_state_machine._topology.enabled_features));
|
||||
});
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/topology_guard.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
@@ -145,6 +146,7 @@ private:
|
||||
sharded<repair_service>& _repair;
|
||||
sharded<streaming::stream_manager>& _stream_manager;
|
||||
sharded<locator::snitch_ptr>& _snitch;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
|
||||
// Engaged on shard 0 before `join_cluster`.
|
||||
service::raft_group0* _group0;
|
||||
@@ -200,7 +202,8 @@ public:
|
||||
sharded<locator::snitch_ptr>& snitch,
|
||||
sharded<service::tablet_allocator>& tablet_allocator,
|
||||
sharded<cdc::generation_service>& cdc_gs,
|
||||
cql3::query_processor& qp);
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller);
|
||||
|
||||
// Needed by distributed<>
|
||||
future<> stop();
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "replica/database.hh"
|
||||
#include "replica/tablet_mutation_builder.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/raft/join_node.hh"
|
||||
#include "service/raft/raft_address_map.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
@@ -2395,6 +2396,9 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
|
||||
co_await auth::migrate_to_auth_v2(_sys_ks.query_processor(), _group0.client(),
|
||||
[this] (abort_source*) { return start_operation();}, _as);
|
||||
|
||||
rtlogger.info("migrating service levels data");
|
||||
co_await qos::service_level_controller::migrate_to_v2(_gossiper.num_endpoints(), _sys_ks.query_processor(), _group0.client(), _as);
|
||||
|
||||
rtlogger.info("building initial raft topology state and CDC generation");
|
||||
guard = co_await start_operation();
|
||||
|
||||
|
||||
@@ -220,14 +220,15 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
// Avoid reading from memtables, which does not check timeouts due to being too fast
|
||||
e.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }).get();
|
||||
|
||||
auto msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels");
|
||||
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
|
||||
auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
duration_type->from_string("5ms")
|
||||
}});
|
||||
|
||||
cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH timeout = 35s");
|
||||
|
||||
msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels WHERE service_level = 'sl'");
|
||||
msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
duration_type->from_string("35s")
|
||||
}});
|
||||
@@ -312,7 +313,8 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) {
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1");
|
||||
|
||||
auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels");
|
||||
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
|
||||
auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
{}
|
||||
}});
|
||||
|
||||
@@ -770,7 +770,8 @@ private:
|
||||
std::ref(_snitch),
|
||||
std::ref(_tablet_allocator),
|
||||
std::ref(_cdc_generation_service),
|
||||
std::ref(_qp)).get();
|
||||
std::ref(_qp),
|
||||
std::ref(_sl_controller)).get();
|
||||
auto stop_storage_service = defer([this] { _ss.stop().get(); });
|
||||
|
||||
_mnotifier.local().register_listener(&_ss.local());
|
||||
|
||||
Reference in New Issue
Block a user