diff --git a/cql3/statements/list_service_level_statement.cc b/cql3/statements/list_service_level_statement.cc index c71e2e7f0f..3cb3a1a2ba 100644 --- a/cql3/statements/list_service_level_statement.cc +++ b/cql3/statements/list_service_level_statement.cc @@ -61,6 +61,7 @@ list_service_level_statement::execute(query_processor& qp, static thread_local const std::vector> metadata({make_column("service_level", utf8_type), make_column("timeout", duration_type), + make_column("workload_type", utf8_type) }); return make_ready_future().then([this, &state] () { @@ -88,7 +89,9 @@ list_service_level_statement::execute(query_processor& qp, auto rs = std::make_unique(metadata); for (auto &&[sl_name, slo] : sl_info) { rs->add_row(std::vector{ - utf8_type->decompose(sl_name), d(slo.timeout)}); + utf8_type->decompose(sl_name), + d(slo.timeout), + utf8_type->decompose(qos::service_level_options::to_string(slo.workload))}); } auto rows = ::make_shared(result(std::move(std::move(rs)))); diff --git a/cql3/statements/sl_prop_defs.cc b/cql3/statements/sl_prop_defs.cc index 473d7e48ec..5655ce4771 100644 --- a/cql3/statements/sl_prop_defs.cc +++ b/cql3/statements/sl_prop_defs.cc @@ -31,7 +31,7 @@ namespace statements { void sl_prop_defs::validate() { static std::set timeout_props { - "timeout" + "timeout", "workload_type" }; auto get_duration = [&] (const std::optional& repr) -> qos::service_level_options::timeout_type { if (!repr) { @@ -56,6 +56,14 @@ void sl_prop_defs::validate() { property_definitions::validate(timeout_props); _slo.timeout = get_duration(get_simple("timeout")); + auto workload_string_opt = get_simple("workload_type"); + if (workload_string_opt) { + auto workload = qos::service_level_options::parse_workload_type(*workload_string_opt); + if (!workload) { + throw exceptions::invalid_request_exception(format("Invalid workload type: {}", *workload_string_opt)); + } + _slo.workload = *workload; + } } qos::service_level_options sl_prop_defs::get_service_level_options() const { diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index cee2b11760..f6962c43be 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -162,26 +162,28 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& } static future<> add_new_columns_if_missing(database& db, ::service::migration_manager& mm) noexcept { - static std::string_view new_columns[] { - "timeout" + static thread_local std::pair new_columns[] { + {"timeout", duration_type}, + {"workload_type", utf8_type} }; try { auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); schema_builder b(schema); bool updated = false; - for (const std::string_view& col_name : new_columns) { + for (const auto& col : new_columns) { + auto& [col_name, col_type] = col; bytes options_name = to_bytes(col_name.data()); if (schema->get_column_definition(options_name)) { continue; } updated = true; - b.with_column(options_name, duration_type, column_kind::regular_column); + b.with_column(options_name, col_type, column_kind::regular_column); } if (!updated) { return make_ready_future<>(); } schema_ptr table = b.build(); - return mm.announce_column_family_update(table, false, {}, api::timestamp_type(0)).handle_exception([] (const std::exception_ptr&) {}); + return mm.announce_column_family_update(table, false, {}, api::timestamp_type(1)).handle_exception([] (const std::exception_ptr&) {}); } catch (...) { dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception()); return make_ready_future<>(); @@ -581,11 +583,17 @@ future system_distributed_keyspace::get_service_levels return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {}).then([] (shared_ptr result_set) { qos::service_levels_info service_levels; for (auto &&row : *result_set) { - auto service_level_name = row.get_as("service_level"); - qos::service_level_options slo{ - .timeout = get_duration(row, "timeout"), - }; - service_levels.emplace(service_level_name, slo); + try { + auto service_level_name = row.get_as("service_level"); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); + qos::service_level_options slo{ + .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), + }; + service_levels.emplace(service_level_name, slo); + } catch (...) { + dlogger.warn("Failed to fetch data for service levels: {}", std::current_exception()); + } } return service_levels; }); @@ -593,15 +601,21 @@ future system_distributed_keyspace::get_service_levels future system_distributed_keyspace::get_service_level(sstring service_level_name) const { static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS); - return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then([] (shared_ptr result_set) { + return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then( + [service_level_name = std::move(service_level_name)] (shared_ptr result_set) { qos::service_levels_info service_levels; if (!result_set->empty()) { - auto &&row = result_set->one(); - auto service_level_name = row.get_as("service_level"); - qos::service_level_options slo{ - .timeout = get_duration(row, "timeout"), - }; - service_levels.emplace(service_level_name, slo); + try { + auto &&row = result_set->one(); + auto workload = qos::service_level_options::parse_workload_type(row.get_opt("workload_type").value_or("")); + qos::service_level_options slo{ + .timeout = get_duration(row, "timeout"), + .workload = workload.value_or(qos::service_level_options::workload_type::unspecified), + }; + service_levels.emplace(service_level_name, slo); + } catch (...) { + dlogger.warn("Failed to fetch data for service level {}: {}", service_level_name, std::current_exception()); + } } return service_levels; }); @@ -625,10 +639,12 @@ future<> system_distributed_keyspace::set_service_level(sstring service_level_na }, }, tv); }; - co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS), + co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ?, workload_type = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS), db::consistency_level::ONE, internal_distributed_query_state(), - {to_data_value(slo.timeout), service_level_name}); + {to_data_value(slo.timeout), + data_value(qos::service_level_options::to_string(slo.workload)), + service_level_name}); } future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const { diff --git a/docs/service_levels.md b/docs/service_levels.md index 4def1d4031..838617236a 100644 --- a/docs/service_levels.md +++ b/docs/service_levels.md @@ -27,20 +27,23 @@ one can run the following query: ### Service Level Configuration Table ```CREATE TABLE system_distributed.service_levels ( - service_level text PRIMARY KEY); + service_level text PRIMARY KEY, + timeout duration, + workload_type text) ``` The table is used to store and distribute the service levels configuration. The table column names meanings are: *service_level* - the name of the service level. *timeout* - timeout for operations performed by users under this service level +*workload_type* - type of workload declared for this service level (unspecified, interactive or batch) ``` select * from system_distributed.service_levels ; - service_level | timeout ----------------+--------- - sl | 50ms + service_level | timeout | workload_type +---------------+---------+--------------- + sl | 500ms | interactive ``` @@ -99,3 +102,31 @@ role2: `timeout = 10ms` role3: `timeout = 2s` role4: `timeout = 10ms` +### Workload types + +It's possible to declare a workload type for a service level, currently out of three available values: + 1. unspecified - generic workload without any specific characteristics; default + 2. interactive - workload sensitive to latency, expected to have high/unbounded concurrency, + with dynamic characteristics, OLTP; + example: users clicking on a website and generating events with their clicks + 3. batch - workload for processing large amounts of data, not sensitive to latency, expected to have + fixed concurrency, OLAP, ETL; + example: processing billions of historical sales records to generate useful statistics + +Declaring a workload type provides more context for Scylla to decide how to handle the sessions. +For instance, if a coordinator node receives requests with a rate higher than it can handle, +it will make different decisions depending on the declared workload type: + - for batch workloads it makes sense to apply backpressure - the concurrency is assumed to be fixed, + so delaying a reply will likely also reduce the rate at which new requests are sent; + - for interactive workloads, backpressure would only waste resources - delaying a reply does not + decrease the rate of incoming requests, so it's reasonable for the coordinator to start shedding + surplus requests. + +If multiple workload types are applicable for a role, it makes sense if: + - all the applicable workload types are identical + - some of the service levels do not have any workload types specified + +Otherwise, e.g. if a role has multiple workload types declared, +the conflicts are resolved as follows: + - `X` vs `unspecified` -> `X` + - `batch` vs `interactive` -> `batch` - under the assumption that `batch` is safer, because it would not trigger load shedding as eagerly as `interactive` diff --git a/service/client_state.cc b/service/client_state.cc index c401b2ea32..e5e0acedb8 100644 --- a/service/client_state.cc +++ b/service/client_state.cc @@ -300,5 +300,7 @@ future<> service::client_state::maybe_update_per_service_level_params() { _timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout); _timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout); _timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout); + + _workload_type = slo_opt->workload; } } diff --git a/service/client_state.hh b/service/client_state.hh index fd25f71ec7..f0219e4320 100644 --- a/service/client_state.hh +++ b/service/client_state.hh @@ -72,6 +72,7 @@ public: enum class auth_state : uint8_t { UNINITIALIZED, AUTHENTICATION, READY }; + using workload_type = qos::service_level_options::workload_type; // This class is used to move client_state between shards // It is created on a shard that owns client_state than passed @@ -150,10 +151,16 @@ private: timeout_config _default_timeout_config; timeout_config _timeout_config; + workload_type _workload_type = workload_type::unspecified; + public: struct internal_tag {}; struct external_tag {}; + workload_type get_workload_type() const noexcept { + return _workload_type; + } + auth_state get_auth_state() const noexcept { return _auth_state; } diff --git a/service/qos/qos_common.cc b/service/qos/qos_common.cc index 26be4391da..8b57f92897 100644 --- a/service/qos/qos_common.cc +++ b/service/qos/qos_common.cc @@ -21,6 +21,7 @@ #include "qos_common.hh" #include "utils/overloaded_functor.hh" + namespace qos { service_level_options service_level_options::replace_defaults(const service_level_options& default_values) const { @@ -38,6 +39,9 @@ service_level_options service_level_options::replace_defaults(const service_leve // leave the value as is }, }, ret.timeout); + if (ret.workload == workload_type::unspecified) { + ret.workload = default_values.workload; + } return ret; } @@ -56,7 +60,37 @@ service_level_options service_level_options::merge_with(const service_level_opti } }, }, ret.timeout); + // Specified workloads should be preferred over unspecified ones + if (ret.workload == workload_type::unspecified || other.workload == workload_type::unspecified) { + ret.workload = std::max(ret.workload, other.workload); + } else { + ret.workload = std::min(ret.workload, other.workload); + } return ret; } +std::string_view service_level_options::to_string(const workload_type& wt) { + switch (wt) { + case workload_type::unspecified: return "unspecified"; + case workload_type::batch: return "batch"; + case workload_type::interactive: return "interactive"; + } + abort(); +} + +std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type& wt) { + return os << service_level_options::to_string(wt); +} + +std::optional service_level_options::parse_workload_type(std::string_view sv) { + if (sv == "unspecified") { + return workload_type::unspecified; + } else if (sv == "interactive") { + return workload_type::interactive; + } else if (sv == "batch") { + return workload_type::batch; + } + return std::nullopt; +} + } diff --git a/service/qos/qos_common.hh b/service/qos/qos_common.hh index 54845a17c5..0a4d6a0bef 100644 --- a/service/qos/qos_common.hh +++ b/service/qos/qos_common.hh @@ -45,8 +45,13 @@ struct service_level_options { bool operator!=(const delete_marker&) const { return false; }; }; + enum class workload_type { + unspecified, batch, interactive + }; + using timeout_type = std::variant; timeout_type timeout = unset_marker{}; + workload_type workload = workload_type::unspecified; service_level_options replace_defaults(const service_level_options& other) const; // Merges the values of two service level options. The semantics depends @@ -55,8 +60,13 @@ struct service_level_options { bool operator==(const service_level_options& other) const = default; bool operator!=(const service_level_options& other) const = default; + + static std::string_view to_string(const workload_type& wt); + static std::optional parse_workload_type(std::string_view sv); }; +std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type&); + using service_levels_info = std::map; /// diff --git a/test/boost/auth_test.cc b/test/boost/auth_test.cc index e07816978c..2770c97a54 100644 --- a/test/boost/auth_test.cc +++ b/test/boost/auth_test.cc @@ -288,7 +288,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { // user1 // // which means that user1 should inherit timeouts from all other users - cquery_nofail(e, "GRANT user2 TO user1"); + cquery_nofail(e, "GRANT user2 TO user1"); cquery_nofail(e, "GRANT user3 TO user2"); cquery_nofail(e, "GRANT user4 TO user2"); e.refresh_client_state().get(); @@ -318,3 +318,69 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) { cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)"); }, cfg); } + +SEASTAR_TEST_CASE(test_alter_with_workload_type) { + auto cfg = make_shared(); + cfg->authenticator(sstring(auth::password_authenticator_name)); + + return do_with_cql_env_thread([] (cql_test_env& e) { + auth::role_config config { + .can_login = true, + }; + auth::authentication_options opts { + .password = "pass" + }; + auth::create_role(e.local_auth_service(), "user1", config, opts).get(); + auth::create_role(e.local_auth_service(), "user2", config, opts).get(); + auth::create_role(e.local_auth_service(), "user3", config, opts).get(); + auth::create_role(e.local_auth_service(), "user4", config, opts).get(); + authenticate(e, "user1", "pass").get(); + + cquery_nofail(e, "CREATE SERVICE LEVEL sl"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1"); + + auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels"); + assert_that(msg).is_rows().with_rows({{ + utf8_type->decompose("unspecified") + }}); + + e.refresh_client_state().get(); + // Default workload type is `unspecified` + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::unspecified); + + // When multiple per-role timeouts apply, the smallest value is always effective + cquery_nofail(e, "CREATE SERVICE LEVEL sl2 WITH workload_type = 'unspecified'"); + cquery_nofail(e, "CREATE SERVICE LEVEL sl3 WITH workload_type = 'batch'"); + cquery_nofail(e, "CREATE SERVICE LEVEL sl4 WITH workload_type = 'interactive'"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl2 TO user2"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl3 TO user3"); + cquery_nofail(e, "ATTACH SERVICE LEVEL sl4 TO user4"); + cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH workload_type = 'interactive'"); + // The roles are granted as follows: + // user4 user3 + // \ / + // user2 + // / + // user1 + // + // which means that user1 should inherit workload types from all other users + cquery_nofail(e, "GRANT user2 TO user1"); + cquery_nofail(e, "GRANT user3 TO user2"); + cquery_nofail(e, "GRANT user4 TO user2"); + e.refresh_client_state().get(); + // For user1, the effective workload type should be batch + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user2, still batch + authenticate(e, "user2", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user3, batch again + authenticate(e, "user3", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch); + // after switching to user4, the workload is interactive + authenticate(e, "user4", "pass").get(); + e.refresh_client_state().get(); + BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::interactive); + }, cfg); +} \ No newline at end of file diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index f84f117bbc..48098179a0 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -4878,14 +4878,14 @@ SEASTAR_TEST_CASE(test_user_based_sla_queries) { e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0(); assert_that(msg).is_rows().with_rows({ - {utf8_type->decompose("sl_1"), {}}, + {utf8_type->decompose("sl_1"), {}, utf8_type->decompose("unspecified")}, }); e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(); //drop service levels e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0(); assert_that(msg).is_rows().with_rows({ - {utf8_type->decompose("sl_2"), {}}, + {utf8_type->decompose("sl_2"), {}, utf8_type->decompose("unspecified")}, }); // validate exceptions (illegal requests) diff --git a/test/cql-pytest/test_service_levels.py b/test/cql-pytest/test_service_levels.py index 9b0685d1d2..038fd8ee9b 100644 --- a/test/cql-pytest/test_service_levels.py +++ b/test/cql-pytest/test_service_levels.py @@ -68,3 +68,21 @@ def test_attached_service_level(scylla_only, cql): assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl res_one = cql.execute(f"LIST ALL ATTACHED SERVICE LEVELS").one() assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl + +# Test that declaring service level workload types is possible +def test_set_workload_type(scylla_only, cql): + with new_service_level(cql) as sl: + res = cql.execute(f"LIST SERVICE LEVEL {sl}") + assert res.one().workload_type == 'unspecified' + for wt in ['interactive', 'batch', 'unspecified']: + cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{wt}'") + res = cql.execute(f"LIST SERVICE LEVEL {sl}") + assert res.one().workload_type == wt + + # Test that workload type input is validated +def test_set_invalid_workload_types(scylla_only, cql): + with new_service_level(cql) as sl: + for incorrect in ['', 'null', 'i', 'b', 'dog', 'x'*256]: + print(f"Checking {incorrect}") + with pytest.raises(Exception): + cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{incorrect}'") \ No newline at end of file diff --git a/transport/server.cc b/transport/server.cc index c867934b06..afd7b07345 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -521,6 +521,10 @@ cql_server::connection::connection(cql_server& server, socket_address server_add , _server_addr(server_addr) , _client_state(service::client_state::external_tag{}, server._auth_service, &server._sl_controller, server.timeout_config(), addr) { + _shedding_timer.set_callback([this] { + clogger.debug("Shedding all incoming requests due to overload"); + _shed_incoming_requests = true; + }); } cql_server::connection::~connection() { @@ -577,6 +581,13 @@ future<> cql_server::connection::process_request() { } auto& f = *maybe_frame; + + const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive; + if (allow_shedding && _shed_incoming_requests) { + ++_server._stats.requests_shed; + return _read_buf.skip(f.length); + } + tracing_request_type tracing_requested = tracing_request_type::not_requested; if (f.flags & cql_frame_flags::tracing) { // If tracing is requested for a specific CQL command - flush @@ -608,19 +619,47 @@ future<> cql_server::connection::process_request() { }); } - auto fut = get_units(_server._memory_available, mem_estimate); + const auto shedding_timeout = std::chrono::milliseconds(50); + auto fut = allow_shedding + ? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) { + try { + return make_ready_future>(f.get0()); + } catch (semaphore_timed_out sto) { + // Cancel shedding in case no more requests are going to do that on completion + if (_pending_requests_gate.get_count() == 0) { + _shed_incoming_requests = false; + } + return _read_buf.skip(length).then([sto = std::move(sto)] () mutable { + return make_exception_future>(std::move(sto)); + }); + } + }) + : get_units(_server._memory_available, mem_estimate); if (_server._memory_available.waiters()) { + if (allow_shedding && !_shedding_timer.armed()) { + _shedding_timer.arm(shedding_timeout); + } ++_server._stats.requests_blocked_memory; } - return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) { + return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) { + if (mem_permit_fut.failed()) { + // Ignore semaphore errors - they are expected if load shedding took place + mem_permit_fut.ignore_ready_future(); + return make_ready_future<>(); + } + semaphore_units<> mem_permit = mem_permit_fut.get0(); return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable { ++_server._stats.requests_served; ++_server._stats.requests_serving; _pending_requests_gate.enter(); - auto leave = defer([this] { _pending_requests_gate.leave(); }); + auto leave = defer([this] { + _shedding_timer.cancel(); + _shed_incoming_requests = false; + _pending_requests_gate.leave(); + }); auto istream = buf.get_istream(); (void)_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) .then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future>> response_f) mutable { diff --git a/transport/server.hh b/transport/server.hh index 1955d894df..5fceec99d9 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -184,6 +184,8 @@ private: cql_serialization_format _cql_serialization_format = cql_serialization_format::latest(); service::client_state _client_state; std::unordered_map _query_states; + timer _shedding_timer; + bool _shed_incoming_requests = false; unsigned _request_cpu = 0; enum class tracing_request_type : uint8_t {