From 2917ec5d515ffbbd13f76b9addcb8309b715b78f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Mon, 4 Mar 2024 22:12:08 +0100 Subject: [PATCH] service:qos: service levels migration Migrate data from `system_distributes.service_levels` to `system.service_levels_v2` during raft topology upgrade. Migration process reads data from old table with CL ALL and inserts the data to the new table via raft. --- main.cc | 2 +- service/qos/service_level_controller.cc | 80 +++++++++++++++++++++++++ service/qos/service_level_controller.hh | 9 +++ service/storage_service.cc | 12 +++- service/storage_service.hh | 5 +- service/topology_coordinator.cc | 4 ++ test/boost/auth_test.cc | 8 ++- test/lib/cql_test_env.cc | 3 +- 8 files changed, 116 insertions(+), 7 deletions(-) diff --git a/main.cc b/main.cc index 95be27030a..33496d6b05 100644 --- a/main.cc +++ b/main.cc @@ -1435,7 +1435,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl std::ref(feature_service), std::ref(mm), std::ref(token_metadata), std::ref(erm_factory), std::ref(messaging), std::ref(repair), std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch), - std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp)).get(); + std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(qp), std::ref(sl_controller)).get(); auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] { ss.stop().get(); diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc index b36b17a1e5..cdcd86e642 100644 --- a/service/qos/service_level_controller.cc +++ b/service/qos/service_level_controller.cc @@ -6,9 +6,15 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include +#include + #include #include #include +#include "cql3/untyped_result_set.hh" +#include "db/consistency_level_type.hh" +#include "db/system_keyspace.hh" #include "seastar/core/on_internal_error.hh" #include "seastar/core/timer.hh" #include "service_level_controller.hh" @@ -435,6 +441,80 @@ void service_level_controller::upgrade_to_v2(cql3::query_processor& qp, service: } } +future<> service_level_controller::migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) { + //TODO: + //Now we trust the administrator to not make changes to service levels during the migration. + //Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes + using namespace std::chrono_literals; + + auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS); + + const auto t = 5min; + const timeout_config tc{t, t, t, t, t, t, t}; + service::client_state cs(::service::client_state::internal_tag{}, tc); + service::query_state qs(cs, empty_service_permit()); + + // `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL + // To support migration on cluster with 1 or 2 nodes, set appropriate CL + auto cl = db::consistency_level::ALL; + if (nodes_count == 1) { + cl = db::consistency_level::ONE; + } else if (nodes_count == 2) { + cl = db::consistency_level::TWO; + } + + auto rows = co_await qp.execute_internal( + format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS), + cl, + qs, + {}, + cql3::query_processor::cache_internal::no); + if (rows->empty()) { + co_return; + } + + + auto col_names = boost::copy_range>(schema->all_columns() | boost::adaptors::transformed([] (const auto& col) {return col.name_as_cql_string(); })); + auto col_names_str = boost::algorithm::join(col_names, ", "); + sstring val_binders_str = "?"; + for (size_t i = 1; i < col_names.size(); ++i) { + val_binders_str += ", ?"; + } + + auto guard = co_await group0_client.start_operation(&as); + + std::vector migration_muts; + for (const auto& row: *rows) { + std::vector values; + for (const auto& col: schema->all_columns()) { + if (row.has(col.name_as_text())) { + values.push_back(col.type->deserialize(row.get_blob(col.name_as_text()))); + } else { + values.push_back(unset_value{}); + } + } + + auto muts = co_await qp.get_mutations_internal( + format("INSERT INTO {}.{} ({}) VALUES ({})", + db::system_keyspace::NAME, + db::system_keyspace::SERVICE_LEVELS_V2, + col_names_str, + val_binders_str), + qos_query_state(), + guard.write_timestamp(), + std::move(values)); + if (muts.size() != 1) { + on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size())); + } + migration_muts.push_back(std::move(muts[0])); + } + service::write_mutations change { + .mutations{migration_muts.begin(), migration_muts.end()}, + }; + auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2"); + co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as); +} + future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) { auto service_level_it = _service_levels_db.find(name); if (service_level_it != _service_levels_db.end()) { diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index 1ceea63112..c658012fe9 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -20,10 +20,15 @@ #include "service/endpoint_lifecycle_subscriber.hh" #include "qos_configuration_change_subscriber.hh" #include "service/raft/raft_group0_client.hh" +#include "service/raft/raft_group_registry.hh" namespace db { class system_distributed_keyspace; } +namespace cql3 { + class query_processor; +} + namespace qos { /** * a structure to hold a service level @@ -187,6 +192,10 @@ public: void upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client); + /** + * Migrate data from `system_distributed.service_levels` to `system.service_levels_v2` + */ + static future<> migrate_to_v2(size_t nodes_count, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as); private: /** * Adds a service level configuration if it doesn't exists, and updates diff --git a/service/storage_service.cc b/service/storage_service.cc index d8e8ad7435..fd1d4b9a68 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -14,6 +14,9 @@ #include "db/system_auth_keyspace.hh" #include "gc_clock.hh" #include "raft/raft.hh" +#include "service/qos/raft_service_level_distributed_data_accessor.hh" +#include "service/qos/service_level_controller.hh" +#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service/topology_guard.hh" #include "service/session.hh" #include "dht/boot_strapper.hh" @@ -129,7 +132,8 @@ storage_service::storage_service(abort_source& abort_source, sharded& snitch, sharded& tablet_allocator, sharded& cdc_gens, - cql3::query_processor& qp) + cql3::query_processor& qp, + sharded& sl_controller) : _abort_source(abort_source) , _feature_service(feature_service) , _db(db) @@ -140,6 +144,7 @@ storage_service::storage_service(abort_source& abort_source, , _repair(repair) , _stream_manager(stream_manager) , _snitch(snitch) + , _sl_controller(sl_controller) , _group0(nullptr) , _node_ops_abort_thread(node_ops_abort_thread()) , _shared_token_metadata(stm) @@ -608,6 +613,7 @@ future<> storage_service::topology_state_load() { if (_manage_topology_change_kind_from_group0) { _topology_change_kind_enabled = upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state); } + if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::done) { co_return; } @@ -618,6 +624,10 @@ future<> storage_service::topology_state_load() { qp.auth_version = db::system_auth_keyspace::version_t::v2; }); + co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) { + sl_controller.upgrade_to_v2(_qp, _group0->client()); + }); + co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) { return fs.enable(boost::copy_range>(_topology_state_machine._topology.enabled_features)); }); diff --git a/service/storage_service.hh b/service/storage_service.hh index 578105a137..af7b133360 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -16,6 +16,7 @@ #include "gms/i_endpoint_state_change_subscriber.hh" #include "schema/schema_fwd.hh" #include "service/endpoint_lifecycle_subscriber.hh" +#include "service/qos/service_level_controller.hh" #include "service/topology_guard.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/tablets.hh" @@ -145,6 +146,7 @@ private: sharded& _repair; sharded& _stream_manager; sharded& _snitch; + sharded& _sl_controller; // Engaged on shard 0 before `join_cluster`. service::raft_group0* _group0; @@ -200,7 +202,8 @@ public: sharded& snitch, sharded& tablet_allocator, sharded& cdc_gs, - cql3::query_processor& qp); + cql3::query_processor& qp, + sharded& sl_controller); // Needed by distributed<> future<> stop(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 8f21cbf1ac..115783e00a 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -27,6 +27,7 @@ #include "replica/database.hh" #include "replica/tablet_mutation_builder.hh" #include "replica/tablets.hh" +#include "service/qos/service_level_controller.hh" #include "service/raft/join_node.hh" #include "service/raft/raft_address_map.hh" #include "service/raft/raft_group0.hh" @@ -2395,6 +2396,9 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) { co_await auth::migrate_to_auth_v2(_sys_ks.query_processor(), _group0.client(), [this] (abort_source*) { return start_operation();}, _as); + rtlogger.info("migrating service levels data"); + co_await qos::service_level_controller::migrate_to_v2(_gossiper.num_endpoints(), _sys_ks.query_processor(), _group0.client(), _as); + rtlogger.info("building initial raft topology state and CDC generation"); guard = co_await start_operation(); diff --git a/test/boost/auth_test.cc b/test/boost/auth_test.cc index 8f11cd2287..1775da640e 100644 --- a/test/boost/auth_test.cc +++ b/test/boost/auth_test.cc @@ -220,14 +220,15 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { // Avoid reading from memtables, which does not check timeouts due to being too fast e.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }).get(); - auto msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels"); + auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2(); + auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); assert_that(msg).is_rows().with_rows({{ duration_type->from_string("5ms") }}); cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH timeout = 35s"); - msg = cquery_nofail(e, "SELECT timeout FROM system_distributed.service_levels WHERE service_level = 'sl'"); + msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); assert_that(msg).is_rows().with_rows({{ duration_type->from_string("35s") }}); @@ -312,7 +313,8 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) { cquery_nofail(e, "CREATE SERVICE LEVEL sl"); cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1"); - auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels"); + auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2(); + auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); assert_that(msg).is_rows().with_rows({{ {} }}); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 80bd4def3b..a1a4c8e4fb 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -770,7 +770,8 @@ private: std::ref(_snitch), std::ref(_tablet_allocator), std::ref(_cdc_generation_service), - std::ref(_qp)).get(); + std::ref(_qp), + std::ref(_sl_controller)).get(); auto stop_storage_service = defer([this] { _ss.stop().get(); }); _mnotifier.local().register_listener(&_ss.local());