diff --git a/configure.py b/configure.py index e185e877cc..71fa373dc4 100755 --- a/configure.py +++ b/configure.py @@ -1239,7 +1239,6 @@ scylla_core = (['message/messaging_service.cc', 'service/pager/query_pagers.cc', 'service/qos/qos_common.cc', 'service/qos/service_level_controller.cc', - 'service/qos/standard_service_level_distributed_data_accessor.cc', 'service/qos/raft_service_level_distributed_data_accessor.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', diff --git a/cql3/statements/service_level_statement.cc b/cql3/statements/service_level_statement.cc index b8431a815e..a8f70d6e4d 100644 --- a/cql3/statements/service_level_statement.cc +++ b/cql3/statements/service_level_statement.cc @@ -27,7 +27,7 @@ future<> service_level_statement::check_access(query_processor& qp, const servic } bool service_level_statement::needs_guard(query_processor&, service::query_state& state) const { - return state.get_service_level_controller().is_v2(); + return true; } audit::statement_category service_level_statement::category() const { diff --git a/main.cc b/main.cc index 8b1d540452..09db7d10fd 100644 --- a/main.cc +++ b/main.cc @@ -99,7 +99,6 @@ #include "cdc/log.hh" #include "cdc/generation_service.hh" -#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service/storage_proxy.hh" #include "service/mapreduce_service.hh" #include "alternator/controller.hh" @@ -1484,6 +1483,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl "Cannot start - cluster is not yet upgraded to use auth v2 and this version does not support legacy auth. " "If you are trying to upgrade the node then first upgrade the cluster to use auth v2."); } + if (sys_ks.local().get_service_levels_version().get() != 2) { + throw std::runtime_error( + "Cannot start - cluster is not yet upgraded to use service levels v2 and this version does not support legacy service levels. " + "If you are trying to upgrade the node then first upgrade the cluster to use service levels v2."); + } } const auto listen_address = utils::resolve(cfg->listen_address, family).get(); @@ -2351,9 +2355,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // with raft leader elected as only then service level mutation is put // into scylla_local table. Calling it here avoids starting new cluster with // older version only to immediately migrate it to the latest in the background. - sl_controller.invoke_on_all([&qp, &group0_client] (qos::service_level_controller& controller) -> future<> { - return controller.reload_distributed_data_accessor( - qp.local(), group0_client, sys_ks.local(), sys_dist_ks.local()); + sl_controller.invoke_on_all([&qp, &group0_client] (qos::service_level_controller& controller) { + controller.reload_distributed_data_accessor(qp.local(), group0_client); }).get(); // Initialize virtual table in system_distributed keyspace after joining the cluster, so @@ -2403,9 +2406,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); // update the service level cache after the SL data accessor and auth service are initialized. - if (sl_controller.local().is_v2()) { - sl_controller.local().update_cache(qos::update_both_cache_levels::yes).get(); - } + sl_controller.local().update_cache(qos::update_both_cache_levels::yes).get(); sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) { lifecycle_notifier.local().register_subscriber(&controller); diff --git a/service/CMakeLists.txt b/service/CMakeLists.txt index 86b3103071..42ecbd43df 100644 --- a/service/CMakeLists.txt +++ b/service/CMakeLists.txt @@ -15,7 +15,6 @@ target_sources(service paxos/proposal.cc qos/qos_common.cc qos/service_level_controller.cc - qos/standard_service_level_distributed_data_accessor.cc qos/raft_service_level_distributed_data_accessor.cc strong_consistency/groups_manager.cc strong_consistency/state_machine.cc diff --git a/service/qos/raft_service_level_distributed_data_accessor.cc b/service/qos/raft_service_level_distributed_data_accessor.cc index 725bf71d6f..2d1978d0a2 100644 --- a/service/qos/raft_service_level_distributed_data_accessor.cc +++ b/service/qos/raft_service_level_distributed_data_accessor.cc @@ -96,17 +96,9 @@ future<> raft_service_level_distributed_data_accessor::commit_mutations(service: return std::move(mc).commit(_group0_client, as, ::service::raft_timeout{}); } -bool raft_service_level_distributed_data_accessor::is_v2() const { - return true; -} - bool raft_service_level_distributed_data_accessor::can_use_effective_service_level_cache() const { return true; } -::shared_ptr raft_service_level_distributed_data_accessor::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const { - return nullptr; -} - } diff --git a/service/qos/raft_service_level_distributed_data_accessor.hh b/service/qos/raft_service_level_distributed_data_accessor.hh index 7638d5439e..d7db58b3ff 100644 --- a/service/qos/raft_service_level_distributed_data_accessor.hh +++ b/service/qos/raft_service_level_distributed_data_accessor.hh @@ -38,9 +38,7 @@ public: virtual future<> drop_service_level(sstring service_level_name, service::group0_batch& mc) const override; virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override; - virtual bool is_v2() const override; virtual bool can_use_effective_service_level_cache() const override; - virtual ::shared_ptr upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override; static future> set_service_level_mutations(cql3::query_processor& qp, sstring service_level_name, qos::service_level_options slo, api::timestamp_type timestamp); }; diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc index 9be912de7f..dc5d00a202 100644 --- a/service/qos/service_level_controller.cc +++ b/service/qos/service_level_controller.cc @@ -25,7 +25,6 @@ #include #include #include "service/qos/raft_service_level_distributed_data_accessor.hh" -#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service_level_controller.hh" #include "db/system_distributed_keyspace.hh" #include "cql3/query_processor.hh" @@ -124,12 +123,9 @@ void service_level_controller::set_distributed_data_accessor(service_level_distr } } -future<> service_level_controller::reload_distributed_data_accessor(cql3::query_processor& qp, service::raft_group0_client& g0, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks) { - auto accessor = co_await qos::get_service_level_distributed_data_accessor_for_current_version( - sys_ks, - sys_dist_ks, - qp, - g0); +void service_level_controller::reload_distributed_data_accessor(cql3::query_processor& qp, service::raft_group0_client& g0) { + auto accessor = static_pointer_cast( + make_shared(qp, g0)); set_distributed_data_accessor(std::move(accessor)); } @@ -510,10 +506,6 @@ future> service_level_controller::find_effe } std::optional service_level_controller::auth_integration::find_cached_effective_service_level(const sstring& role_name) { - if (!_sl_controller._sl_data_accessor->is_v2()) { - return std::nullopt; - } - auto effective_sl_it = _cache.find(role_name); return effective_sl_it != _cache.end() ? std::optional(effective_sl_it->second) @@ -837,99 +829,6 @@ future<> service_level_controller::do_add_service_level(sstring name, service_le return make_ready_future(); } -bool service_level_controller::is_v2() const { - return _sl_data_accessor && _sl_data_accessor->is_v2(); -} - -void service_level_controller::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) { - if (!_sl_data_accessor) { - return; - } - - auto v2_data_accessor = _sl_data_accessor->upgrade_to_v2(qp, group0_client); - if (v2_data_accessor) { - _sl_data_accessor = v2_data_accessor; - } -} - -future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) { - //TODO: - //Now we trust the administrator to not make changes to service levels during the migration. - //Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes - using namespace std::chrono_literals; - - auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS); - - const auto t = 5min; - const timeout_config tc{t, t, t, t, t, t, t}; - service::client_state cs(::service::client_state::internal_tag{}, tc); - service::query_state qs(cs, empty_service_permit()); - - // `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL - // To support migration on cluster with 1 or 2 nodes, set appropriate CL - auto cl = db::consistency_level::ALL; - if (nodes_count == 1) { - cl = db::consistency_level::ONE; - } else if (nodes_count == 2) { - cl = db::consistency_level::TWO; - } - - auto rows = co_await qp.execute_internal( - format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS), - cl, - qs, - {}, - cql3::query_processor::cache_internal::no); - if (rows->empty()) { - co_return; - } - - - auto col_names = schema->all_columns() | std::views::transform([] (const auto& col) {return col.name_as_cql_string(); }) | std::ranges::to>(); - auto col_names_str = fmt::to_string(fmt::join(col_names, ", ")); - sstring val_binders_str = "?"; - for (size_t i = 1; i < col_names.size(); ++i) { - val_binders_str += ", ?"; - } - - auto guard = co_await group0_client.start_operation(as); - - utils::chunked_vector migration_muts; - for (const auto& row: *rows) { - std::vector values; - for (const auto& col: schema->all_columns()) { - if (row.has(col.name_as_text())) { - values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text()))); - } else { - values.push_back(unset_value{}); - } - } - - auto muts = co_await qp.get_mutations_internal( - seastar::format("INSERT INTO {}.{} ({}) VALUES ({})", - db::system_keyspace::NAME, - db::system_keyspace::SERVICE_LEVELS_V2, - col_names_str, - val_binders_str), - qos_query_state(), - guard.write_timestamp(), - std::move(values)); - if (muts.size() != 1) { - on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size())); - } - migration_muts.push_back(std::move(muts[0])); - } - - auto status_mut = co_await sys_ks.make_service_levels_version_mutation(2, guard.write_timestamp()); - migration_muts.push_back(std::move(status_mut)); - - service::write_mutations change { - .mutations{migration_muts.begin(), migration_muts.end()}, - }; - auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2"); - co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as); -} - future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) { auto service_level_it = _service_levels_db.find(name); if (service_level_it != _service_levels_db.end()) { @@ -1171,21 +1070,4 @@ future<> service_level_controller::unregister_auth_integration() { co_await tmp->stop(); } -future> -get_service_level_distributed_data_accessor_for_current_version( - db::system_keyspace& sys_ks, - db::system_distributed_keyspace& sys_dist_ks, - cql3::query_processor& qp, service::raft_group0_client& group0_client -) { - auto sl_version = co_await sys_ks.get_service_levels_version(); - - if (sl_version && *sl_version == 2) { - co_return static_pointer_cast( - make_shared(qp, group0_client)); - } else { - co_return static_pointer_cast( - make_shared(sys_dist_ks)); - } -} - } // namespace qos diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index 0537171c32..2606e25026 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -117,12 +117,9 @@ public: virtual future<> drop_service_level(sstring service_level_name, service::group0_batch& mc) const = 0; virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const = 0; - virtual bool is_v2() const = 0; // Returns whether effective service level cache can be populated and used. // This is equivalent to checking whether auth + raft have been migrated to raft. virtual bool can_use_effective_service_level_cache() const = 0; - // Returns v2(raft) data accessor. If data accessor is already a raft one, returns nullptr. - virtual ::shared_ptr upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const = 0; }; using service_level_distributed_data_accessor_ptr = ::shared_ptr; @@ -252,7 +249,7 @@ public: * Reloads data accessor, this is used to align it with service level version * stored in scylla_local table. */ - future<> reload_distributed_data_accessor(cql3::query_processor&, service::raft_group0_client&, db::system_keyspace&, db::system_distributed_keyspace&); + void reload_distributed_data_accessor(cql3::query_processor&, service::raft_group0_client&); /** * Adds a service level configuration if it doesn't exists, and updates @@ -424,23 +421,9 @@ public: future> describe_service_levels(); future<> commit_mutations(::service::group0_batch&& mc) { - if (_sl_data_accessor->is_v2()) { - return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter); - } - return make_ready_future(); + return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter); } - /** - * Returns true if service levels module is running under raft - */ - bool is_v2() const; - - void upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client); - - /** - * Migrate data from `system_distributed.service_levels` to `system.service_levels_v2` - */ - static future<> migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as); private: /** * Adds a service level configuration if it doesn't exists, and updates @@ -508,11 +491,4 @@ public: virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override; }; -future> -get_service_level_distributed_data_accessor_for_current_version( - db::system_keyspace& sys_ks, - db::system_distributed_keyspace& sys_dist_ks, - cql3::query_processor& qp, service::raft_group0_client& group0_client -); - } diff --git a/service/qos/standard_service_level_distributed_data_accessor.cc b/service/qos/standard_service_level_distributed_data_accessor.cc deleted file mode 100644 index 33873b04de..0000000000 --- a/service/qos/standard_service_level_distributed_data_accessor.cc +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2020-present ScyllaDB - */ - -/* - * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 - */ - -#include "standard_service_level_distributed_data_accessor.hh" -#include "db/system_distributed_keyspace.hh" -#include "service/qos/raft_service_level_distributed_data_accessor.hh" -#include "service/raft/raft_group0_client.hh" - -namespace qos { - -standard_service_level_distributed_data_accessor::standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks): -_sys_dist_ks(sys_dist_ks) { -} - -future standard_service_level_distributed_data_accessor::get_service_levels(qos::query_context ctx) const { - return _sys_dist_ks.get_service_levels(ctx); -} - -future standard_service_level_distributed_data_accessor::get_service_level(sstring service_level_name) const { - return _sys_dist_ks.get_service_level(service_level_name); -} - -future<> standard_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo, service::group0_batch&) const { - return _sys_dist_ks.set_service_level(service_level_name, slo); -} - -future<> standard_service_level_distributed_data_accessor::drop_service_level(sstring service_level_name, service::group0_batch&) const { - return _sys_dist_ks.drop_service_level(service_level_name); -} - -bool standard_service_level_distributed_data_accessor::is_v2() const { - return false; -} - -bool standard_service_level_distributed_data_accessor::can_use_effective_service_level_cache() const { - return false; -} - -::shared_ptr standard_service_level_distributed_data_accessor::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const { - return ::static_pointer_cast( - ::make_shared(qp, group0_client)); -} - -} diff --git a/service/qos/standard_service_level_distributed_data_accessor.hh b/service/qos/standard_service_level_distributed_data_accessor.hh deleted file mode 100644 index 91b1195008..0000000000 --- a/service/qos/standard_service_level_distributed_data_accessor.hh +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (C) 2020-present ScyllaDB - */ - -/* - * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 - */ - -#pragma once - -#include -#include -#include "seastarx.hh" -#include "service/raft/raft_group0_client.hh" -#include "service_level_controller.hh" - - -namespace db { - class system_distributed_keyspace; -} -namespace qos { -class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor { -private: - db::system_distributed_keyspace& _sys_dist_ks; -public: - standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks); - virtual future get_service_levels(qos::query_context ctx) const override; - virtual future get_service_level(sstring service_level_name) const override; - virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo, service::group0_batch&) const override; - virtual future<> drop_service_level(sstring service_level_name, service::group0_batch&) const override; - virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override { return make_ready_future(); } - - virtual bool is_v2() const override; - virtual bool can_use_effective_service_level_cache() const override; - virtual ::shared_ptr upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override; -}; -} diff --git a/service/storage_service.cc b/service/storage_service.cc index 22b73495e6..ee0675c17a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -24,7 +24,6 @@ #include #include "service/qos/raft_service_level_distributed_data_accessor.hh" #include "service/qos/service_level_controller.hh" -#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "locator/token_metadata.hh" #include "service/topology_guard.hh" #include "service/session.hh" @@ -120,7 +119,6 @@ #include "service/topology_mutation.hh" #include "cql3/query_processor.hh" #include "service/qos/service_level_controller.hh" -#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include #include "utils/labels.hh" #include "view_info.hh" @@ -696,13 +694,6 @@ future<> storage_service::topology_state_load(state_change_hint hint) { _topology_state_machine.reload_count++; auto& topology = _topology_state_machine._topology; - if (!_sl_controller.local().is_v2()) { - co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) { - sl_controller.upgrade_to_v2(_qp, _group0->client()); - }); - co_await _sl_controller.local().update_cache(qos::update_both_cache_levels::yes, qos::query_context::group0); - } - // the view_builder is migrated to v2 in view_builder::migrate_to_v2. // it writes a v2 version mutation as topology_change, then we get here // to update the service to start using the v2 table. @@ -953,10 +944,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) { future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) { SCYLLA_ASSERT(this_shard_id() == 0); - if (_sl_controller.local().is_v2()) { - // Skip cache update unless the topology upgrade is done - co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx); - } + co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx); } future<> storage_service::compression_dictionary_updated_callback_all() { diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 8142f806ea..e2b4cccfe4 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -3877,7 +3877,7 @@ future> topology_coordinator::maybe_migrate_system_t co_return std::nullopt; } - if (_sl_controller.is_v2() && _feature_service.driver_service_level) { + if (_feature_service.driver_service_level) { const auto sl_driver_created = co_await _sys_ks.get_service_level_driver_created(); if (!sl_driver_created.value_or(false)) { co_return co_await _sl_controller.migrate_to_driver_service_level(std::move(guard), _sys_ks); diff --git a/test/boost/auth_test.cc b/test/boost/auth_test.cc index fde4389e40..d1d62d273e 100644 --- a/test/boost/auth_test.cc +++ b/test/boost/auth_test.cc @@ -223,8 +223,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { // Avoid reading from memtables, which does not check timeouts due to being too fast e.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }).get(); - auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2(); - auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); + auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", "system.service_levels_v2")); assert_that(msg).is_rows().with_rows({ {duration_type->from_string("5ms")}, {{}}, // `sl:driver` @@ -232,7 +231,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH timeout = 35s"); - msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); + msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", "system.service_levels_v2")); assert_that(msg).is_rows().with_rows({{ duration_type->from_string("35s") }}); @@ -311,8 +310,7 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) { cquery_nofail(e, "CREATE SERVICE LEVEL sl"); cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1"); - auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2(); - auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels")); + auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", "system.service_levels_v2")); assert_that(msg).is_rows().with_rows({ {{}}, {"batch"}, // `sl:driver` diff --git a/test/boost/service_level_controller_test.cc b/test/boost/service_level_controller_test.cc index af7d5fda85..3ff96b81a6 100644 --- a/test/boost/service_level_controller_test.cc +++ b/test/boost/service_level_controller_test.cc @@ -163,15 +163,9 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) { } return make_ready_future<>(); } - virtual bool is_v2() const override { - return true; - } virtual bool can_use_effective_service_level_cache() const override { return true; } - virtual ::shared_ptr upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override { - return make_shared(); - } virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override { return make_ready_future<>(); } diff --git a/transport/server.cc b/transport/server.cc index 293dc5f37a..11fd47cc67 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -2348,12 +2348,6 @@ future<> cql_server::update_connections_scheduling_group() { } future<> cql_server::update_connections_service_level_params() { - if (!_sl_controller.is_v2()) { - // Auto update of connections' service level params requires - // service levels in v2. - return make_ready_future<>(); - } - return for_each_gently([this] (generic_server::connection& conn) { connection& cql_conn = dynamic_cast(conn); auto& cs = cql_conn.get_client_state();