service level: remove version 1 service level code
This commit is contained in:
@@ -1239,7 +1239,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'service/pager/query_pagers.cc',
|
||||
'service/qos/qos_common.cc',
|
||||
'service/qos/service_level_controller.cc',
|
||||
'service/qos/standard_service_level_distributed_data_accessor.cc',
|
||||
'service/qos/raft_service_level_distributed_data_accessor.cc',
|
||||
'streaming/stream_task.cc',
|
||||
'streaming/stream_session.cc',
|
||||
|
||||
@@ -27,7 +27,7 @@ future<> service_level_statement::check_access(query_processor& qp, const servic
|
||||
}
|
||||
|
||||
bool service_level_statement::needs_guard(query_processor&, service::query_state& state) const {
|
||||
return state.get_service_level_controller().is_v2();
|
||||
return true;
|
||||
}
|
||||
|
||||
audit::statement_category service_level_statement::category() const {
|
||||
|
||||
15
main.cc
15
main.cc
@@ -99,7 +99,6 @@
|
||||
|
||||
#include "cdc/log.hh"
|
||||
#include "cdc/generation_service.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/mapreduce_service.hh"
|
||||
#include "alternator/controller.hh"
|
||||
@@ -1484,6 +1483,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
"Cannot start - cluster is not yet upgraded to use auth v2 and this version does not support legacy auth. "
|
||||
"If you are trying to upgrade the node then first upgrade the cluster to use auth v2.");
|
||||
}
|
||||
if (sys_ks.local().get_service_levels_version().get() != 2) {
|
||||
throw std::runtime_error(
|
||||
"Cannot start - cluster is not yet upgraded to use service levels v2 and this version does not support legacy service levels. "
|
||||
"If you are trying to upgrade the node then first upgrade the cluster to use service levels v2.");
|
||||
}
|
||||
}
|
||||
|
||||
const auto listen_address = utils::resolve(cfg->listen_address, family).get();
|
||||
@@ -2351,9 +2355,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// with raft leader elected as only then service level mutation is put
|
||||
// into scylla_local table. Calling it here avoids starting new cluster with
|
||||
// older version only to immediately migrate it to the latest in the background.
|
||||
sl_controller.invoke_on_all([&qp, &group0_client] (qos::service_level_controller& controller) -> future<> {
|
||||
return controller.reload_distributed_data_accessor(
|
||||
qp.local(), group0_client, sys_ks.local(), sys_dist_ks.local());
|
||||
sl_controller.invoke_on_all([&qp, &group0_client] (qos::service_level_controller& controller) {
|
||||
controller.reload_distributed_data_accessor(qp.local(), group0_client);
|
||||
}).get();
|
||||
|
||||
// Initialize virtual table in system_distributed keyspace after joining the cluster, so
|
||||
@@ -2403,9 +2406,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
|
||||
// update the service level cache after the SL data accessor and auth service are initialized.
|
||||
if (sl_controller.local().is_v2()) {
|
||||
sl_controller.local().update_cache(qos::update_both_cache_levels::yes).get();
|
||||
}
|
||||
sl_controller.local().update_cache(qos::update_both_cache_levels::yes).get();
|
||||
|
||||
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {
|
||||
lifecycle_notifier.local().register_subscriber(&controller);
|
||||
|
||||
@@ -15,7 +15,6 @@ target_sources(service
|
||||
paxos/proposal.cc
|
||||
qos/qos_common.cc
|
||||
qos/service_level_controller.cc
|
||||
qos/standard_service_level_distributed_data_accessor.cc
|
||||
qos/raft_service_level_distributed_data_accessor.cc
|
||||
strong_consistency/groups_manager.cc
|
||||
strong_consistency/state_machine.cc
|
||||
|
||||
@@ -96,17 +96,9 @@ future<> raft_service_level_distributed_data_accessor::commit_mutations(service:
|
||||
return std::move(mc).commit(_group0_client, as, ::service::raft_timeout{});
|
||||
}
|
||||
|
||||
bool raft_service_level_distributed_data_accessor::is_v2() const {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool raft_service_level_distributed_data_accessor::can_use_effective_service_level_cache() const {
|
||||
return true;
|
||||
}
|
||||
|
||||
::shared_ptr<service_level_controller::service_level_distributed_data_accessor> raft_service_level_distributed_data_accessor::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -38,9 +38,7 @@ public:
|
||||
virtual future<> drop_service_level(sstring service_level_name, service::group0_batch& mc) const override;
|
||||
virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override;
|
||||
|
||||
virtual bool is_v2() const override;
|
||||
virtual bool can_use_effective_service_level_cache() const override;
|
||||
virtual ::shared_ptr<service_level_distributed_data_accessor> upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override;
|
||||
static future<utils::chunked_vector<mutation>> set_service_level_mutations(cql3::query_processor& qp, sstring service_level_name, qos::service_level_options slo, api::timestamp_type timestamp);
|
||||
};
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
#include "service_level_controller.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
@@ -124,12 +123,9 @@ void service_level_controller::set_distributed_data_accessor(service_level_distr
|
||||
}
|
||||
}
|
||||
|
||||
future<> service_level_controller::reload_distributed_data_accessor(cql3::query_processor& qp, service::raft_group0_client& g0, db::system_keyspace& sys_ks, db::system_distributed_keyspace& sys_dist_ks) {
|
||||
auto accessor = co_await qos::get_service_level_distributed_data_accessor_for_current_version(
|
||||
sys_ks,
|
||||
sys_dist_ks,
|
||||
qp,
|
||||
g0);
|
||||
void service_level_controller::reload_distributed_data_accessor(cql3::query_processor& qp, service::raft_group0_client& g0) {
|
||||
auto accessor = static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
||||
make_shared<qos::raft_service_level_distributed_data_accessor>(qp, g0));
|
||||
set_distributed_data_accessor(std::move(accessor));
|
||||
}
|
||||
|
||||
@@ -510,10 +506,6 @@ future<std::optional<service_level_options>> service_level_controller::find_effe
|
||||
}
|
||||
|
||||
std::optional<service_level_options> service_level_controller::auth_integration::find_cached_effective_service_level(const sstring& role_name) {
|
||||
if (!_sl_controller._sl_data_accessor->is_v2()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto effective_sl_it = _cache.find(role_name);
|
||||
return effective_sl_it != _cache.end()
|
||||
? std::optional<service_level_options>(effective_sl_it->second)
|
||||
@@ -837,99 +829,6 @@ future<> service_level_controller::do_add_service_level(sstring name, service_le
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
bool service_level_controller::is_v2() const {
|
||||
return _sl_data_accessor && _sl_data_accessor->is_v2();
|
||||
}
|
||||
|
||||
void service_level_controller::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) {
|
||||
if (!_sl_data_accessor) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto v2_data_accessor = _sl_data_accessor->upgrade_to_v2(qp, group0_client);
|
||||
if (v2_data_accessor) {
|
||||
_sl_data_accessor = v2_data_accessor;
|
||||
}
|
||||
}
|
||||
|
||||
future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as) {
|
||||
//TODO:
|
||||
//Now we trust the administrator to not make changes to service levels during the migration.
|
||||
//Ideally, during the migration we should set migration data accessor(on all nodes, on all shards) that allows to read but forbids writes
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
auto schema = qp.db().find_schema(db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS);
|
||||
|
||||
const auto t = 5min;
|
||||
const timeout_config tc{t, t, t, t, t, t, t};
|
||||
service::client_state cs(::service::client_state::internal_tag{}, tc);
|
||||
service::query_state qs(cs, empty_service_permit());
|
||||
|
||||
// `system_distributed` keyspace has RF=3 and we need to scan it with CL=ALL
|
||||
// To support migration on cluster with 1 or 2 nodes, set appropriate CL
|
||||
auto cl = db::consistency_level::ALL;
|
||||
if (nodes_count == 1) {
|
||||
cl = db::consistency_level::ONE;
|
||||
} else if (nodes_count == 2) {
|
||||
cl = db::consistency_level::TWO;
|
||||
}
|
||||
|
||||
auto rows = co_await qp.execute_internal(
|
||||
format("SELECT * FROM {}.{}", db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::SERVICE_LEVELS),
|
||||
cl,
|
||||
qs,
|
||||
{},
|
||||
cql3::query_processor::cache_internal::no);
|
||||
if (rows->empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
auto col_names = schema->all_columns() | std::views::transform([] (const auto& col) {return col.name_as_cql_string(); }) | std::ranges::to<std::vector<sstring>>();
|
||||
auto col_names_str = fmt::to_string(fmt::join(col_names, ", "));
|
||||
sstring val_binders_str = "?";
|
||||
for (size_t i = 1; i < col_names.size(); ++i) {
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(as);
|
||||
|
||||
utils::chunked_vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
std::vector<data_value_or_unset> values;
|
||||
for (const auto& col: schema->all_columns()) {
|
||||
if (row.has(col.name_as_text())) {
|
||||
values.push_back(col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
|
||||
} else {
|
||||
values.push_back(unset_value{});
|
||||
}
|
||||
}
|
||||
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_keyspace::NAME,
|
||||
db::system_keyspace::SERVICE_LEVELS_V2,
|
||||
col_names_str,
|
||||
val_binders_str),
|
||||
qos_query_state(),
|
||||
guard.write_timestamp(),
|
||||
std::move(values));
|
||||
if (muts.size() != 1) {
|
||||
on_internal_error(sl_logger, format("expecting single insert mutation, got {}", muts.size()));
|
||||
}
|
||||
migration_muts.push_back(std::move(muts[0]));
|
||||
}
|
||||
|
||||
auto status_mut = co_await sys_ks.make_service_levels_version_mutation(2, guard.write_timestamp());
|
||||
migration_muts.push_back(std::move(status_mut));
|
||||
|
||||
service::write_mutations change {
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
auto service_level_it = _service_levels_db.find(name);
|
||||
if (service_level_it != _service_levels_db.end()) {
|
||||
@@ -1171,21 +1070,4 @@ future<> service_level_controller::unregister_auth_integration() {
|
||||
co_await tmp->stop();
|
||||
}
|
||||
|
||||
future<shared_ptr<service_level_controller::service_level_distributed_data_accessor>>
|
||||
get_service_level_distributed_data_accessor_for_current_version(
|
||||
db::system_keyspace& sys_ks,
|
||||
db::system_distributed_keyspace& sys_dist_ks,
|
||||
cql3::query_processor& qp, service::raft_group0_client& group0_client
|
||||
) {
|
||||
auto sl_version = co_await sys_ks.get_service_levels_version();
|
||||
|
||||
if (sl_version && *sl_version == 2) {
|
||||
co_return static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
||||
make_shared<qos::raft_service_level_distributed_data_accessor>(qp, group0_client));
|
||||
} else {
|
||||
co_return static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
|
||||
make_shared<qos::standard_service_level_distributed_data_accessor>(sys_dist_ks));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace qos
|
||||
|
||||
@@ -117,12 +117,9 @@ public:
|
||||
virtual future<> drop_service_level(sstring service_level_name, service::group0_batch& mc) const = 0;
|
||||
virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const = 0;
|
||||
|
||||
virtual bool is_v2() const = 0;
|
||||
// Returns whether effective service level cache can be populated and used.
|
||||
// This is equivalent to checking whether auth + raft have been migrated to raft.
|
||||
virtual bool can_use_effective_service_level_cache() const = 0;
|
||||
// Returns v2(raft) data accessor. If data accessor is already a raft one, returns nullptr.
|
||||
virtual ::shared_ptr<service_level_distributed_data_accessor> upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const = 0;
|
||||
};
|
||||
using service_level_distributed_data_accessor_ptr = ::shared_ptr<service_level_distributed_data_accessor>;
|
||||
|
||||
@@ -252,7 +249,7 @@ public:
|
||||
* Reloads data accessor, this is used to align it with service level version
|
||||
* stored in scylla_local table.
|
||||
*/
|
||||
future<> reload_distributed_data_accessor(cql3::query_processor&, service::raft_group0_client&, db::system_keyspace&, db::system_distributed_keyspace&);
|
||||
void reload_distributed_data_accessor(cql3::query_processor&, service::raft_group0_client&);
|
||||
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
@@ -424,23 +421,9 @@ public:
|
||||
future<std::vector<cql3::description>> describe_service_levels();
|
||||
|
||||
future<> commit_mutations(::service::group0_batch&& mc) {
|
||||
if (_sl_data_accessor->is_v2()) {
|
||||
return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter);
|
||||
}
|
||||
return make_ready_future();
|
||||
return _sl_data_accessor->commit_mutations(std::move(mc), _global_controller_db->group0_aborter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if service levels module is running under raft
|
||||
*/
|
||||
bool is_v2() const;
|
||||
|
||||
void upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client);
|
||||
|
||||
/**
|
||||
* Migrate data from `system_distributed.service_levels` to `system.service_levels_v2`
|
||||
*/
|
||||
static future<> migrate_to_v2(size_t nodes_count, db::system_keyspace& sys_ks, cql3::query_processor& qp, service::raft_group0_client& group0_client, abort_source& as);
|
||||
private:
|
||||
/**
|
||||
* Adds a service level configuration if it doesn't exists, and updates
|
||||
@@ -508,11 +491,4 @@ public:
|
||||
virtual void on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) override;
|
||||
};
|
||||
|
||||
future<shared_ptr<service_level_controller::service_level_distributed_data_accessor>>
|
||||
get_service_level_distributed_data_accessor_for_current_version(
|
||||
db::system_keyspace& sys_ks,
|
||||
db::system_distributed_keyspace& sys_dist_ks,
|
||||
cql3::query_processor& qp, service::raft_group0_client& group0_client
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2020-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "standard_service_level_distributed_data_accessor.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
namespace qos {
|
||||
|
||||
standard_service_level_distributed_data_accessor::standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks):
|
||||
_sys_dist_ks(sys_dist_ks) {
|
||||
}
|
||||
|
||||
future<qos::service_levels_info> standard_service_level_distributed_data_accessor::get_service_levels(qos::query_context ctx) const {
|
||||
return _sys_dist_ks.get_service_levels(ctx);
|
||||
}
|
||||
|
||||
future<qos::service_levels_info> standard_service_level_distributed_data_accessor::get_service_level(sstring service_level_name) const {
|
||||
return _sys_dist_ks.get_service_level(service_level_name);
|
||||
}
|
||||
|
||||
future<> standard_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo, service::group0_batch&) const {
|
||||
return _sys_dist_ks.set_service_level(service_level_name, slo);
|
||||
}
|
||||
|
||||
future<> standard_service_level_distributed_data_accessor::drop_service_level(sstring service_level_name, service::group0_batch&) const {
|
||||
return _sys_dist_ks.drop_service_level(service_level_name);
|
||||
}
|
||||
|
||||
bool standard_service_level_distributed_data_accessor::is_v2() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool standard_service_level_distributed_data_accessor::can_use_effective_service_level_cache() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
::shared_ptr<service_level_controller::service_level_distributed_data_accessor> standard_service_level_distributed_data_accessor::upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const {
|
||||
return ::static_pointer_cast<service_level_controller::service_level_distributed_data_accessor>(
|
||||
::make_shared<raft_service_level_distributed_data_accessor>(qp, group0_client));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2020-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "seastarx.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service_level_controller.hh"
|
||||
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
}
|
||||
namespace qos {
|
||||
class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor {
|
||||
private:
|
||||
db::system_distributed_keyspace& _sys_dist_ks;
|
||||
public:
|
||||
standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks);
|
||||
virtual future<qos::service_levels_info> get_service_levels(qos::query_context ctx) const override;
|
||||
virtual future<qos::service_levels_info> get_service_level(sstring service_level_name) const override;
|
||||
virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo, service::group0_batch&) const override;
|
||||
virtual future<> drop_service_level(sstring service_level_name, service::group0_batch&) const override;
|
||||
virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override { return make_ready_future(); }
|
||||
|
||||
virtual bool is_v2() const override;
|
||||
virtual bool can_use_effective_service_level_cache() const override;
|
||||
virtual ::shared_ptr<service_level_distributed_data_accessor> upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override;
|
||||
};
|
||||
}
|
||||
@@ -24,7 +24,6 @@
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "service/topology_guard.hh"
|
||||
#include "service/session.hh"
|
||||
@@ -120,7 +119,6 @@
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
|
||||
#include <csignal>
|
||||
#include "utils/labels.hh"
|
||||
#include "view_info.hh"
|
||||
@@ -696,13 +694,6 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
_topology_state_machine.reload_count++;
|
||||
auto& topology = _topology_state_machine._topology;
|
||||
|
||||
if (!_sl_controller.local().is_v2()) {
|
||||
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
|
||||
sl_controller.upgrade_to_v2(_qp, _group0->client());
|
||||
});
|
||||
co_await _sl_controller.local().update_cache(qos::update_both_cache_levels::yes, qos::query_context::group0);
|
||||
}
|
||||
|
||||
// the view_builder is migrated to v2 in view_builder::migrate_to_v2.
|
||||
// it writes a v2 version mutation as topology_change, then we get here
|
||||
// to update the service to start using the v2 table.
|
||||
@@ -953,10 +944,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
|
||||
|
||||
future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
if (_sl_controller.local().is_v2()) {
|
||||
// Skip cache update unless the topology upgrade is done
|
||||
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
|
||||
}
|
||||
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
|
||||
}
|
||||
|
||||
future<> storage_service::compression_dictionary_updated_callback_all() {
|
||||
|
||||
@@ -3877,7 +3877,7 @@ future<std::optional<group0_guard>> topology_coordinator::maybe_migrate_system_t
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
if (_sl_controller.is_v2() && _feature_service.driver_service_level) {
|
||||
if (_feature_service.driver_service_level) {
|
||||
const auto sl_driver_created = co_await _sys_ks.get_service_level_driver_created();
|
||||
if (!sl_driver_created.value_or(false)) {
|
||||
co_return co_await _sl_controller.migrate_to_driver_service_level(std::move(guard), _sys_ks);
|
||||
|
||||
@@ -223,8 +223,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
// Avoid reading from memtables, which does not check timeouts due to being too fast
|
||||
e.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }).get();
|
||||
|
||||
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
|
||||
auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
auto msg = cquery_nofail(e, format("SELECT timeout FROM {}", "system.service_levels_v2"));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{duration_type->from_string("5ms")},
|
||||
{{}}, // `sl:driver`
|
||||
@@ -232,7 +231,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
|
||||
cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH timeout = 35s");
|
||||
|
||||
msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
msg = cquery_nofail(e, format("SELECT timeout FROM {} WHERE service_level = 'sl'", "system.service_levels_v2"));
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
duration_type->from_string("35s")
|
||||
}});
|
||||
@@ -311,8 +310,7 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) {
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1");
|
||||
|
||||
auto sl_is_v2 = e.local_client_state().get_service_level_controller().is_v2();
|
||||
auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", sl_is_v2 ? "system.service_levels_v2" : "system_distributed.service_levels"));
|
||||
auto msg = cquery_nofail(e, format("SELECT workload_type FROM {}", "system.service_levels_v2"));
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{{}},
|
||||
{"batch"}, // `sl:driver`
|
||||
|
||||
@@ -163,15 +163,9 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) {
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual bool is_v2() const override {
|
||||
return true;
|
||||
}
|
||||
virtual bool can_use_effective_service_level_cache() const override {
|
||||
return true;
|
||||
}
|
||||
virtual ::shared_ptr<service_level_distributed_data_accessor> upgrade_to_v2(cql3::query_processor& qp, service::raft_group0_client& group0_client) const override {
|
||||
return make_shared<data_accessor>();
|
||||
}
|
||||
virtual future<> commit_mutations(service::group0_batch&& mc, abort_source& as) const override {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -2348,12 +2348,6 @@ future<> cql_server::update_connections_scheduling_group() {
|
||||
}
|
||||
|
||||
future<> cql_server::update_connections_service_level_params() {
|
||||
if (!_sl_controller.is_v2()) {
|
||||
// Auto update of connections' service level params requires
|
||||
// service levels in v2.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return for_each_gently([this] (generic_server::connection& conn) {
|
||||
connection& cql_conn = dynamic_cast<connection&>(conn);
|
||||
auto& cs = cql_conn.get_client_state();
|
||||
|
||||
Reference in New Issue
Block a user