From 578543603d5aa42afcfa44b9c880754e60331332 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 19 May 2021 10:19:30 +0200 Subject: [PATCH 01/10] qos: add workload_type service level parameter The workload type is currently one of three values: - unspecified - interactive - batch By defining the workload type, the service level makes it easier for other components to decide what to do in overload scenarios. E.g. if the workload is interactive, requests can be shed earlier, while if it's batched (or unspecified), shedding does not take place. Conversely, batch workloads could accept long full scan operations. --- service/qos/qos_common.cc | 34 ++++++++++++++++++++++++++++++++++ service/qos/qos_common.hh | 10 ++++++++++ 2 files changed, 44 insertions(+) diff --git a/service/qos/qos_common.cc b/service/qos/qos_common.cc index 26be4391da..8b57f92897 100644 --- a/service/qos/qos_common.cc +++ b/service/qos/qos_common.cc @@ -21,6 +21,7 @@ #include "qos_common.hh" #include "utils/overloaded_functor.hh" + namespace qos { service_level_options service_level_options::replace_defaults(const service_level_options& default_values) const { @@ -38,6 +39,9 @@ service_level_options service_level_options::replace_defaults(const service_leve // leave the value as is }, }, ret.timeout); + if (ret.workload == workload_type::unspecified) { + ret.workload = default_values.workload; + } return ret; } @@ -56,7 +60,37 @@ service_level_options service_level_options::merge_with(const service_level_opti } }, }, ret.timeout); + // Specified workloads should be preferred over unspecified ones + if (ret.workload == workload_type::unspecified || other.workload == workload_type::unspecified) { + ret.workload = std::max(ret.workload, other.workload); + } else { + ret.workload = std::min(ret.workload, other.workload); + } return ret; } +std::string_view service_level_options::to_string(const workload_type& wt) { + switch (wt) { + case workload_type::unspecified: return "unspecified"; + case workload_type::batch: return "batch"; + case workload_type::interactive: return "interactive"; + } + abort(); +} + +std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type& wt) { + return os << service_level_options::to_string(wt); +} + +std::optional service_level_options::parse_workload_type(std::string_view sv) { + if (sv == "unspecified") { + return workload_type::unspecified; + } else if (sv == "interactive") { + return workload_type::interactive; + } else if (sv == "batch") { + return workload_type::batch; + } + return std::nullopt; +} + } diff --git a/service/qos/qos_common.hh b/service/qos/qos_common.hh index 54845a17c5..0a4d6a0bef 100644 --- a/service/qos/qos_common.hh +++ b/service/qos/qos_common.hh @@ -45,8 +45,13 @@ struct service_level_options { bool operator!=(const delete_marker&) const { return false; }; }; + enum class workload_type { + unspecified, batch, interactive + }; + using timeout_type = std::variant; timeout_type timeout = unset_marker{}; + workload_type workload = workload_type::unspecified; service_level_options replace_defaults(const service_level_options& other) const; // Merges the values of two service level options. The semantics depends @@ -55,8 +60,13 @@ struct service_level_options { bool operator==(const service_level_options& other) const = default; bool operator!=(const service_level_options& other) const = default; + + static std::string_view to_string(const workload_type& wt); + static std::optional parse_workload_type(std::string_view sv); }; +std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type&); + using service_levels_info = std::map; /// From 4816678eb6e2ecdd6490f4a64a2e7b72a2a7f4d8 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 19 May 2021 10:33:07 +0200 Subject: [PATCH 02/10] cql3: add persisting service level workload type The workload type information can now be set via CQL and it's persisted in the distributed system table. --- cql3/statements/sl_prop_defs.cc | 10 +++++++++- db/system_distributed_keyspace.cc | 22 +++++++++++++++------- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cql3/statements/sl_prop_defs.cc b/cql3/statements/sl_prop_defs.cc index 473d7e48ec..5655ce4771 100644 --- a/cql3/statements/sl_prop_defs.cc +++ b/cql3/statements/sl_prop_defs.cc @@ -31,7 +31,7 @@ namespace statements { void sl_prop_defs::validate() { static std::set timeout_props { - "timeout" + "timeout", "workload_type" }; auto get_duration = [&] (const std::optional& repr) -> qos::service_level_options::timeout_type { if (!repr) { @@ -56,6 +56,14 @@ void sl_prop_defs::validate() { property_definitions::validate(timeout_props); _slo.timeout = get_duration(get_simple("timeout")); + auto workload_string_opt = get_simple("workload_type"); + if (workload_string_opt) { + auto workload = qos::service_level_options::parse_workload_type(*workload_string_opt); + if (!workload) { + throw exceptions::invalid_request_exception(format("Invalid workload type: {}", *workload_string_opt)); + } + _slo.workload = *workload; + } } qos::service_level_options sl_prop_defs::get_service_level_options() const { diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index cee2b11760..6edbf54148 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -162,26 +162,28 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& } static future<> add_new_columns_if_missing(database& db, ::service::migration_manager& mm) noexcept { - static std::string_view new_columns[] { - "timeout" + static thread_local std::pair new_columns[] { + {"timeout", duration_type}, + {"workload_type", utf8_type} }; try { auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); schema_builder b(schema); bool updated = false; - for (const std::string_view& col_name : new_columns) { + for (const auto& col : new_columns) { + auto& [col_name, col_type] = col; bytes options_name = to_bytes(col_name.data()); if (schema->get_column_definition(options_name)) { continue; } updated = true; - b.with_column(options_name, duration_type, column_kind::regular_column); + b.with_column(options_name, col_type, column_kind::regular_column); } if (!updated) { return make_ready_future<>(); } schema_ptr table = b.build(); - return mm.announce_column_family_update(table, false, {}, api::timestamp_type(0)).handle_exception([] (const std::exception_ptr&) {}); + return mm.announce_column_family_update(table, false, {}, api::timestamp_type(1)).handle_exception([] (const std::exception_ptr&) {}); } catch (...) { dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception()); return make_ready_future<>(); @@ -582,8 +584,10 @@ future system_distributed_keyspace::get_service_levels qos::service_levels_info service_levels; for (auto &&row : *result_set) { auto service_level_name = row.get_as("service_level"); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); qos::service_level_options slo{ .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), }; service_levels.emplace(service_level_name, slo); } @@ -598,8 +602,10 @@ future system_distributed_keyspace::get_service_level( if (!result_set->empty()) { auto &&row = result_set->one(); auto service_level_name = row.get_as("service_level"); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); qos::service_level_options slo{ .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), }; service_levels.emplace(service_level_name, slo); } @@ -625,10 +631,12 @@ future<> system_distributed_keyspace::set_service_level(sstring service_level_na }, }, tv); }; - co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS), + co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ?, workload_type = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS), db::consistency_level::ONE, internal_distributed_query_state(), - {to_data_value(slo.timeout), service_level_name}); + {to_data_value(slo.timeout), + data_value(qos::service_level_options::to_string(slo.workload)), + service_level_name}); } future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const { From 762e2f48f222b4c8f78ce2f3508d795ccf713de6 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 19 May 2021 10:34:21 +0200 Subject: [PATCH 03/10] cql3: add listing service level workload type The workload type information is now presented in the output of LIST SERVICE LEVEL and LIST ALL SERVICE LEVELS statements. --- cql3/statements/list_service_level_statement.cc | 5 ++++- test/boost/cql_query_test.cc | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cql3/statements/list_service_level_statement.cc b/cql3/statements/list_service_level_statement.cc index c71e2e7f0f..3cb3a1a2ba 100644 --- a/cql3/statements/list_service_level_statement.cc +++ b/cql3/statements/list_service_level_statement.cc @@ -61,6 +61,7 @@ list_service_level_statement::execute(query_processor& qp, static thread_local const std::vector> metadata({make_column("service_level", utf8_type), make_column("timeout", duration_type), + make_column("workload_type", utf8_type) }); return make_ready_future().then([this, &state] () { @@ -88,7 +89,9 @@ list_service_level_statement::execute(query_processor& qp, auto rs = std::make_unique(metadata); for (auto &&[sl_name, slo] : sl_info) { rs->add_row(std::vector{ - utf8_type->decompose(sl_name), d(slo.timeout)}); + utf8_type->decompose(sl_name), + d(slo.timeout), + utf8_type->decompose(qos::service_level_options::to_string(slo.workload))}); } auto rows = ::make_shared(result(std::move(std::move(rs)))); diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index f84f117bbc..48098179a0 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4878,14 +4878,14 @@ SEASTAR_TEST_CASE(test_user_based_sla_queries) { e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0(); assert_that(msg).is_rows().with_rows({ - {utf8_type->decompose("sl_1"), {}}, + {utf8_type->decompose("sl_1"), {}, utf8_type->decompose("unspecified")}, }); e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(); //drop service levels e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0(); assert_that(msg).is_rows().with_rows({ - {utf8_type->decompose("sl_2"), {}}, + {utf8_type->decompose("sl_2"), {}, utf8_type->decompose("unspecified")}, }); // validate exceptions (illegal requests) From 409c67b1b41690d1c668ece227dbcc450004340a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 19 May 2021 11:00:47 +0200 Subject: [PATCH 04/10] client_state: hook workload type from service levels The client state is now aware of its workload type derived from its attached service level. --- service/client_state.cc | 2 ++ service/client_state.hh | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/service/client_state.cc b/service/client_state.cc index 9a1753f6a8..5c2a48b32d 100644 --- a/service/client_state.cc +++ b/service/client_state.cc @@ -300,5 +300,7 @@ future<> service::client_state::maybe_update_per_service_level_params() { _timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout); _timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout); _timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout); + + _workload_type = slo_opt->workload; } } diff --git a/service/client_state.hh b/service/client_state.hh index fd25f71ec7..f0219e4320 100644 --- a/service/client_state.hh +++ b/service/client_state.hh @@ -72,6 +72,7 @@ public: enum class auth_state : uint8_t { UNINITIALIZED, AUTHENTICATION, READY }; + using workload_type = qos::service_level_options::workload_type; // This class is used to move client_state between shards // It is created on a shard that owns client_state than passed @@ -150,10 +151,16 @@ private: timeout_config _default_timeout_config; timeout_config _timeout_config; + workload_type _workload_type = workload_type::unspecified; + public: struct internal_tag {}; struct external_tag {}; + workload_type get_workload_type() const noexcept { + return _workload_type; + } + auth_state get_auth_state() const noexcept { return _auth_state; } From cb27ebe61de527219d244a2ec3771b13705ad4ce Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 24 Feb 2021 17:18:20 +0100 Subject: [PATCH 05/10] transport: start shedding requests during potential overload This commit implements the following overload prevention heuristics: if the admission queue becomes full, a timer is armed for 50ms. If any of the ongoing requests finishes, the timer is disarmed, but if that doesn't happen, the server goes into shedding mode, which means that it reads new requests from the socket and immediately drops them until one of the ongoing requests finishes. This heuristics is not recommended for OLAP workloads, so it is applied only if the session declared itself as interactive (via service level's workload_type parameter). --- transport/server.cc | 45 ++++++++++++++++++++++++++++++++++++++++++--- transport/server.hh | 2 ++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/transport/server.cc b/transport/server.cc index c867934b06..afd7b07345 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -521,6 +521,10 @@ cql_server::connection::connection(cql_server& server, socket_address server_add , _server_addr(server_addr) , _client_state(service::client_state::external_tag{}, server._auth_service, &server._sl_controller, server.timeout_config(), addr) { + _shedding_timer.set_callback([this] { + clogger.debug("Shedding all incoming requests due to overload"); + _shed_incoming_requests = true; + }); } cql_server::connection::~connection() { @@ -577,6 +581,13 @@ future<> cql_server::connection::process_request() { } auto& f = *maybe_frame; + + const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive; + if (allow_shedding && _shed_incoming_requests) { + ++_server._stats.requests_shed; + return _read_buf.skip(f.length); + } + tracing_request_type tracing_requested = tracing_request_type::not_requested; if (f.flags & cql_frame_flags::tracing) { // If tracing is requested for a specific CQL command - flush @@ -608,19 +619,47 @@ future<> cql_server::connection::process_request() { }); } - auto fut = get_units(_server._memory_available, mem_estimate); + const auto shedding_timeout = std::chrono::milliseconds(50); + auto fut = allow_shedding + ? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) { + try { + return make_ready_future>(f.get0()); + } catch (semaphore_timed_out sto) { + // Cancel shedding in case no more requests are going to do that on completion + if (_pending_requests_gate.get_count() == 0) { + _shed_incoming_requests = false; + } + return _read_buf.skip(length).then([sto = std::move(sto)] () mutable { + return make_exception_future>(std::move(sto)); + }); + } + }) + : get_units(_server._memory_available, mem_estimate); if (_server._memory_available.waiters()) { + if (allow_shedding && !_shedding_timer.armed()) { + _shedding_timer.arm(shedding_timeout); + } ++_server._stats.requests_blocked_memory; } - return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) { + return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) { + if (mem_permit_fut.failed()) { + // Ignore semaphore errors - they are expected if load shedding took place + mem_permit_fut.ignore_ready_future(); + return make_ready_future<>(); + } + semaphore_units<> mem_permit = mem_permit_fut.get0(); return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable { ++_server._stats.requests_served; ++_server._stats.requests_serving; _pending_requests_gate.enter(); - auto leave = defer([this] { _pending_requests_gate.leave(); }); + auto leave = defer([this] { + _shedding_timer.cancel(); + _shed_incoming_requests = false; + _pending_requests_gate.leave(); + }); auto istream = buf.get_istream(); (void)_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) .then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future>> response_f) mutable { diff --git a/transport/server.hh b/transport/server.hh index 1955d894df..5fceec99d9 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -184,6 +184,8 @@ private: cql_serialization_format _cql_serialization_format = cql_serialization_format::latest(); service::client_state _client_state; std::unordered_map _query_states; + timer _shedding_timer; + bool _shed_incoming_requests = false; unsigned _request_cpu = 0; enum class tracing_request_type : uint8_t { From 7faba196052ddb7246c5affacafe67b86c74966c Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 May 2021 09:54:30 +0200 Subject: [PATCH 06/10] sys_dist_ks: make get_service_level exception-safe In order to avoid killing the node if a parsing error occurs, the routine which fetches service level information is made exception-safe. --- db/system_distributed_keyspace.cc | 41 +++++++++++++++++++------------ 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 6edbf54148..50b3919c36 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -583,13 +583,17 @@ future system_distributed_keyspace::get_service_levels return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {}).then([] (shared_ptr result_set) { qos::service_levels_info service_levels; for (auto &&row : *result_set) { - auto service_level_name = row.get_as("service_level"); - auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); - qos::service_level_options slo{ - .timeout = get_duration(row, "timeout"), - .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), - }; - service_levels.emplace(service_level_name, slo); + try { + auto service_level_name = row.get_as("service_level"); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); + qos::service_level_options slo{ + .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), + }; + service_levels.emplace(service_level_name, slo); + } catch (...) { + dlogger.warn("Failed to fetch data for service levels: {}", std::current_exception()); + } } return service_levels; }); @@ -597,17 +601,22 @@ future system_distributed_keyspace::get_service_levels future system_distributed_keyspace::get_service_level(sstring service_level_name) const { static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS); - return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then([] (shared_ptr result_set) { + return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then( + [service_level_name = std::move(service_level_name)] (shared_ptr result_set) { qos::service_levels_info service_levels; if (!result_set->empty()) { - auto &&row = result_set->one(); - auto service_level_name = row.get_as("service_level"); - auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); - qos::service_level_options slo{ - .timeout = get_duration(row, "timeout"), - .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), - }; - service_levels.emplace(service_level_name, slo); + try { + auto &&row = result_set->one(); + auto service_level_name = row.get_as("service_level"); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); + qos::service_level_options slo{ + .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), + }; + service_levels.emplace(service_level_name, slo); + } catch (...) { + dlogger.warn("Failed to fetch data for service level {}: {}", service_level_name, std::current_exception()); + } } return service_levels; }); From d45574ed28cb74b1de1ea079dfd86fe29c1abab1 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 May 2021 09:56:01 +0200 Subject: [PATCH 07/10] sys_dist_ks: fix redundant parsing in get_service_level The routine used for getting service level information already operates on the service level name, but the same information is also parsed once more from a row from an internal table. This parsing is redundant, so it's hereby removed. --- db/system_distributed_keyspace.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 50b3919c36..f6962c43be 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -607,7 +607,6 @@ future system_distributed_keyspace::get_service_level( if (!result_set->empty()) { try { auto &&row = result_set->one(); - auto service_level_name = row.get_as("service_level"); auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); qos::service_level_options slo{ .timeout = get_duration(row, "timeout"), From 54a5d4516c67f031f52a637a38441fffe3237b3b Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 May 2021 11:10:30 +0200 Subject: [PATCH 08/10] docs: describe workload types for service levels A paragraph about workload types is added to docs/service_levels.md --- docs/service_levels.md | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/docs/service_levels.md b/docs/service_levels.md index 4def1d4031..838617236a 100644 --- a/docs/service_levels.md +++ b/docs/service_levels.md @@ -27,20 +27,23 @@ one can run the following query: ### Service Level Configuration Table ```CREATE TABLE system_distributed.service_levels ( - service_level text PRIMARY KEY); + service_level text PRIMARY KEY, + timeout duration, + workload_type text) ``` The table is used to store and distribute the service levels configuration. The table column names meanings are: *service_level* - the name of the service level. *timeout* - timeout for operations performed by users under this service level +*workload_type* - type of workload declared for this service level (unspecified, interactive or batch) ``` select * from system_distributed.service_levels ; - service_level | timeout ----------------+--------- - sl | 50ms + service_level | timeout | workload_type +---------------+---------+--------------- + sl | 500ms | interactive ``` @@ -99,3 +102,31 @@ role2: `timeout = 10ms` role3: `timeout = 2s` role4: `timeout = 10ms` +### Workload types + +It's possible to declare a workload type for a service level, currently out of three available values: + 1. unspecified - generic workload without any specific characteristics; default + 2. interactive - workload sensitive to latency, expected to have high/unbounded concurrency, + with dynamic characteristics, OLTP; + example: users clicking on a website and generating events with their clicks + 3. batch - workload for processing large amounts of data, not sensitive to latency, expected to have + fixed concurrency, OLAP, ETL; + example: processing billions of historical sales records to generate useful statistics + +Declaring a workload type provides more context for Scylla to decide how to handle the sessions. +For instance, if a coordinator node receives requests with a rate higher than it can handle, +it will make different decisions depending on the declared workload type: + - for batch workloads it makes sense to apply backpressure - the concurrency is assumed to be fixed, + so delaying a reply will likely also reduce the rate at which new requests are sent; + - for interactive workloads, backpressure would only waste resources - delaying a reply does not + decrease the rate of incoming requests, so it's reasonable for the coordinator to start shedding + surplus requests. + +If multiple workload types are applicable for a role, it makes sense if: + - all the applicable workload types are identical + - some of the service levels do not have any workload types specified + +Otherwise, e.g. if a role has multiple workload types declared, +the conflicts are resolved as follows: + - `X` vs `unspecified` -> `X` + - `batch` vs `interactive` -> `batch` - under the assumption that `batch` is safer, because it would not trigger load shedding as eagerly as `interactive` From 01b7e445f9e511bbbcabe94a2e9990c9c15d969f Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 May 2021 11:55:57 +0200 Subject: [PATCH 09/10] cql-pytest: add basic tests for service level workload types The test cases check whether it's possible to declare workload type for a service level and if its input is validated. --- test/cql-pytest/test_service_levels.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/cql-pytest/test_service_levels.py b/test/cql-pytest/test_service_levels.py index 9b0685d1d2..038fd8ee9b 100644 --- a/test/cql-pytest/test_service_levels.py +++ b/test/cql-pytest/test_service_levels.py @@ -68,3 +68,21 @@ def test_attached_service_level(scylla_only, cql): assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl res_one = cql.execute(f"LIST ALL ATTACHED SERVICE LEVELS").one() assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl + +# Test that declaring service level workload types is possible +def test_set_workload_type(scylla_only, cql): + with new_service_level(cql) as sl: + res = cql.execute(f"LIST SERVICE LEVEL {sl}") + assert res.one().workload_type == 'unspecified' + for wt in ['interactive', 'batch', 'unspecified']: + cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{wt}'") + res = cql.execute(f"LIST SERVICE LEVEL {sl}") + assert res.one().workload_type == wt + + # Test that workload type input is validated +def test_set_invalid_workload_types(scylla_only, cql): + with new_service_level(cql) as sl: + for incorrect in ['', 'null', 'i', 'b', 'dog', 'x'*256]: + print(f"Checking {incorrect}") + with pytest.raises(Exception): + cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{incorrect}'") \ No newline at end of file From 99f356d764218511636e2bd2d9024235b4e57813 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 25 May 2021 12:38:26 +0200 Subject: [PATCH 10/10] test: add a case for conflicting workload types The test case verifies that if several workload types are effective for a single role, the conflict resolution is well defined. --- test/boost/auth_test.cc | 68 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/test/boost/auth_test.cc b/test/boost/auth_test.cc index e07816978c..2770c97a54 100644 --- a/test/boost/auth_test.cc +++ b/test/boost/auth_test.cc @@ -288,7 +288,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { // user1 // // which means that user1 should inherit timeouts from all other users - cquery_nofail(e, "GRANT user2 TO user1"); + cquery_nofail(e, "GRANT user2 TO user1"); cquery_nofail(e, "GRANT user3 TO user2"); cquery_nofail(e, "GRANT user4 TO user2"); e.refresh_client_state().get(); @@ -318,3 +318,69 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)"); }, cfg); } + +SEASTAR_TEST_CASE(test_alter_with_workload_type) { + auto cfg = make_shared(); + cfg->authenticator(sstring(auth::password_authenticator_name)); + + return do_with_cql_env_thread([] (cql_test_env& e) { + auth::role_config config { + .can_login = true, + }; + auth::authentication_options opts { + .password = "pass" + }; + auth::create_role(e.local_auth_service(), "user1", config, opts).get(); + auth::create_role(e.local_auth_service(), "user2", config, opts).get(); + auth::create_role(e.local_auth_service(), "user3", config, opts).get(); + auth::create_role(e.local_auth_service(), "user4", config, opts).get(); + authenticate(e, "user1", "pass").get(); + + cquery_nofail(e, "CREATE SERVICE LEVEL sl"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1"); + + auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels"); + assert_that(msg).is_rows().with_rows({{ + utf8_type->decompose("unspecified") + }}); + + e.refresh_client_state().get(); + // Default workload type is `unspecified` + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::unspecified); + + // When multiple per-role timeouts apply, the smallest value is always effective + cquery_nofail(e, "CREATE SERVICE LEVEL sl2 WITH workload_type = 'unspecified'"); + cquery_nofail(e, "CREATE SERVICE LEVEL sl3 WITH workload_type = 'batch'"); + cquery_nofail(e, "CREATE SERVICE LEVEL sl4 WITH workload_type = 'interactive'"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl2 TO user2"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl3 TO user3"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl4 TO user4"); + cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH workload_type = 'interactive'"); + // The roles are granted as follows: + // user4 user3 + // \ / + // user2 + // / + // user1 + // + // which means that user1 should inherit workload types from all other users + cquery_nofail(e, "GRANT user2 TO user1"); + cquery_nofail(e, "GRANT user3 TO user2"); + cquery_nofail(e, "GRANT user4 TO user2"); + e.refresh_client_state().get(); + // For user1, the effective workload type should be batch + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user2, still batch + authenticate(e, "user2", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user3, batch again + authenticate(e, "user3", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user4, the workload is interactive + authenticate(e, "user4", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::interactive); + }, cfg); +} \ No newline at end of file