mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-30 11:36:54 +00:00
Merge 'Introduce per-service-level workload types and their first use-case - shedding in interactive workloads' from Piotr Sarna
This draft extends and obsoletes #8123 by introducing a way of determining the workload type from service level parameters, and then using this context to qualify requests for shedding. The rough idea is that when the admission queue in the CQL server is hit, it might make more sense to start shedding surplus requests instead of accumulating them on the semaphore. The assumption that interactive workloads are more interested in the success rate of as many requests as possible, and hanging on a semaphore reduces the chances for a request to succeed. Thus, it may make sense to shed some requests to reduce the load on this coordinator and let the existing requests to finish. It's a draft, because I only performed local guided tests. #8123 was followed by some experiments on a multinode cluster which I want to rerun first. Closes #8680 * github.com:scylladb/scylla: test: add a case for conflicting workload types cql-pytest: add basic tests for service level workload types docs: describe workload types for service levels sys_dist_ks: fix redundant parsing in get_service_level sys_dist_ks: make get_service_level exception-safe transport: start shedding requests during potential overload client_state: hook workload type from service levels cql3: add listing service level workload type cql3: add persisting service level workload type qos: add workload_type service level parameter
This commit is contained in:
@@ -61,6 +61,7 @@ list_service_level_statement::execute(query_processor& qp,
|
||||
|
||||
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({make_column("service_level", utf8_type),
|
||||
make_column("timeout", duration_type),
|
||||
make_column("workload_type", utf8_type)
|
||||
});
|
||||
|
||||
return make_ready_future().then([this, &state] () {
|
||||
@@ -88,7 +89,9 @@ list_service_level_statement::execute(query_processor& qp,
|
||||
auto rs = std::make_unique<result_set>(metadata);
|
||||
for (auto &&[sl_name, slo] : sl_info) {
|
||||
rs->add_row(std::vector<bytes_opt>{
|
||||
utf8_type->decompose(sl_name), d(slo.timeout)});
|
||||
utf8_type->decompose(sl_name),
|
||||
d(slo.timeout),
|
||||
utf8_type->decompose(qos::service_level_options::to_string(slo.workload))});
|
||||
}
|
||||
|
||||
auto rows = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(std::move(rs))));
|
||||
|
||||
@@ -31,7 +31,7 @@ namespace statements {
|
||||
|
||||
void sl_prop_defs::validate() {
|
||||
static std::set<sstring> timeout_props {
|
||||
"timeout"
|
||||
"timeout", "workload_type"
|
||||
};
|
||||
auto get_duration = [&] (const std::optional<sstring>& repr) -> qos::service_level_options::timeout_type {
|
||||
if (!repr) {
|
||||
@@ -56,6 +56,14 @@ void sl_prop_defs::validate() {
|
||||
|
||||
property_definitions::validate(timeout_props);
|
||||
_slo.timeout = get_duration(get_simple("timeout"));
|
||||
auto workload_string_opt = get_simple("workload_type");
|
||||
if (workload_string_opt) {
|
||||
auto workload = qos::service_level_options::parse_workload_type(*workload_string_opt);
|
||||
if (!workload) {
|
||||
throw exceptions::invalid_request_exception(format("Invalid workload type: {}", *workload_string_opt));
|
||||
}
|
||||
_slo.workload = *workload;
|
||||
}
|
||||
}
|
||||
|
||||
qos::service_level_options sl_prop_defs::get_service_level_options() const {
|
||||
|
||||
@@ -162,26 +162,28 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor&
|
||||
}
|
||||
|
||||
static future<> add_new_columns_if_missing(database& db, ::service::migration_manager& mm) noexcept {
|
||||
static std::string_view new_columns[] {
|
||||
"timeout"
|
||||
static thread_local std::pair<std::string_view, data_type> new_columns[] {
|
||||
{"timeout", duration_type},
|
||||
{"workload_type", utf8_type}
|
||||
};
|
||||
try {
|
||||
auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
|
||||
schema_builder b(schema);
|
||||
bool updated = false;
|
||||
for (const std::string_view& col_name : new_columns) {
|
||||
for (const auto& col : new_columns) {
|
||||
auto& [col_name, col_type] = col;
|
||||
bytes options_name = to_bytes(col_name.data());
|
||||
if (schema->get_column_definition(options_name)) {
|
||||
continue;
|
||||
}
|
||||
updated = true;
|
||||
b.with_column(options_name, duration_type, column_kind::regular_column);
|
||||
b.with_column(options_name, col_type, column_kind::regular_column);
|
||||
}
|
||||
if (!updated) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
schema_ptr table = b.build();
|
||||
return mm.announce_column_family_update(table, false, {}, api::timestamp_type(0)).handle_exception([] (const std::exception_ptr&) {});
|
||||
return mm.announce_column_family_update(table, false, {}, api::timestamp_type(1)).handle_exception([] (const std::exception_ptr&) {});
|
||||
} catch (...) {
|
||||
dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception());
|
||||
return make_ready_future<>();
|
||||
@@ -581,11 +583,17 @@ future<qos::service_levels_info> system_distributed_keyspace::get_service_levels
|
||||
return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {}).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
|
||||
qos::service_levels_info service_levels;
|
||||
for (auto &&row : *result_set) {
|
||||
auto service_level_name = row.get_as<sstring>("service_level");
|
||||
qos::service_level_options slo{
|
||||
.timeout = get_duration(row, "timeout"),
|
||||
};
|
||||
service_levels.emplace(service_level_name, slo);
|
||||
try {
|
||||
auto service_level_name = row.get_as<sstring>("service_level");
|
||||
auto workload = qos::service_level_options::parse_workload_type(row.get_opt<sstring>("workload_type").value_or(""));
|
||||
qos::service_level_options slo{
|
||||
.timeout = get_duration(row, "timeout"),
|
||||
.workload = workload.value_or(qos::service_level_options::workload_type::unspecified),
|
||||
};
|
||||
service_levels.emplace(service_level_name, slo);
|
||||
} catch (...) {
|
||||
dlogger.warn("Failed to fetch data for service levels: {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
return service_levels;
|
||||
});
|
||||
@@ -593,15 +601,21 @@ future<qos::service_levels_info> system_distributed_keyspace::get_service_levels
|
||||
|
||||
future<qos::service_levels_info> system_distributed_keyspace::get_service_level(sstring service_level_name) const {
|
||||
static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS);
|
||||
return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
|
||||
return _qp.execute_internal(prepared_query, db::consistency_level::ONE, internal_distributed_query_state(), {service_level_name}).then(
|
||||
[service_level_name = std::move(service_level_name)] (shared_ptr<cql3::untyped_result_set> result_set) {
|
||||
qos::service_levels_info service_levels;
|
||||
if (!result_set->empty()) {
|
||||
auto &&row = result_set->one();
|
||||
auto service_level_name = row.get_as<sstring>("service_level");
|
||||
qos::service_level_options slo{
|
||||
.timeout = get_duration(row, "timeout"),
|
||||
};
|
||||
service_levels.emplace(service_level_name, slo);
|
||||
try {
|
||||
auto &&row = result_set->one();
|
||||
auto workload = qos::service_level_options::parse_workload_type(row.get_opt<sstring>("workload_type").value_or(""));
|
||||
qos::service_level_options slo{
|
||||
.timeout = get_duration(row, "timeout"),
|
||||
.workload = workload.value_or(qos::service_level_options::workload_type::unspecified),
|
||||
};
|
||||
service_levels.emplace(service_level_name, slo);
|
||||
} catch (...) {
|
||||
dlogger.warn("Failed to fetch data for service level {}: {}", service_level_name, std::current_exception());
|
||||
}
|
||||
}
|
||||
return service_levels;
|
||||
});
|
||||
@@ -625,10 +639,12 @@ future<> system_distributed_keyspace::set_service_level(sstring service_level_na
|
||||
},
|
||||
}, tv);
|
||||
};
|
||||
co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS),
|
||||
co_await _qp.execute_internal(format("UPDATE {}.{} SET timeout = ?, workload_type = ? WHERE service_level = ?;", NAME, SERVICE_LEVELS),
|
||||
db::consistency_level::ONE,
|
||||
internal_distributed_query_state(),
|
||||
{to_data_value(slo.timeout), service_level_name});
|
||||
{to_data_value(slo.timeout),
|
||||
data_value(qos::service_level_options::to_string(slo.workload)),
|
||||
service_level_name});
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const {
|
||||
|
||||
@@ -27,20 +27,23 @@ one can run the following query:
|
||||
### Service Level Configuration Table
|
||||
|
||||
```CREATE TABLE system_distributed.service_levels (
|
||||
service_level text PRIMARY KEY);
|
||||
service_level text PRIMARY KEY,
|
||||
timeout duration,
|
||||
workload_type text)
|
||||
```
|
||||
|
||||
The table is used to store and distribute the service levels configuration.
|
||||
The table column names meanings are:
|
||||
*service_level* - the name of the service level.
|
||||
*timeout* - timeout for operations performed by users under this service level
|
||||
*workload_type* - type of workload declared for this service level (unspecified, interactive or batch)
|
||||
|
||||
```
|
||||
select * from system_distributed.service_levels ;
|
||||
|
||||
service_level | timeout
|
||||
---------------+---------
|
||||
sl | 50ms
|
||||
service_level | timeout | workload_type
|
||||
---------------+---------+---------------
|
||||
sl | 500ms | interactive
|
||||
|
||||
```
|
||||
|
||||
@@ -99,3 +102,31 @@ role2: `timeout = 10ms`
|
||||
role3: `timeout = 2s`
|
||||
role4: `timeout = 10ms`
|
||||
|
||||
### Workload types
|
||||
|
||||
It's possible to declare a workload type for a service level, currently out of three available values:
|
||||
1. unspecified - generic workload without any specific characteristics; default
|
||||
2. interactive - workload sensitive to latency, expected to have high/unbounded concurrency,
|
||||
with dynamic characteristics, OLTP;
|
||||
example: users clicking on a website and generating events with their clicks
|
||||
3. batch - workload for processing large amounts of data, not sensitive to latency, expected to have
|
||||
fixed concurrency, OLAP, ETL;
|
||||
example: processing billions of historical sales records to generate useful statistics
|
||||
|
||||
Declaring a workload type provides more context for Scylla to decide how to handle the sessions.
|
||||
For instance, if a coordinator node receives requests with a rate higher than it can handle,
|
||||
it will make different decisions depending on the declared workload type:
|
||||
- for batch workloads it makes sense to apply backpressure - the concurrency is assumed to be fixed,
|
||||
so delaying a reply will likely also reduce the rate at which new requests are sent;
|
||||
- for interactive workloads, backpressure would only waste resources - delaying a reply does not
|
||||
decrease the rate of incoming requests, so it's reasonable for the coordinator to start shedding
|
||||
surplus requests.
|
||||
|
||||
If multiple workload types are applicable for a role, it makes sense if:
|
||||
- all the applicable workload types are identical
|
||||
- some of the service levels do not have any workload types specified
|
||||
|
||||
Otherwise, e.g. if a role has multiple workload types declared,
|
||||
the conflicts are resolved as follows:
|
||||
- `X` vs `unspecified` -> `X`
|
||||
- `batch` vs `interactive` -> `batch` - under the assumption that `batch` is safer, because it would not trigger load shedding as eagerly as `interactive`
|
||||
|
||||
@@ -300,5 +300,7 @@ future<> service::client_state::maybe_update_per_service_level_params() {
|
||||
_timeout_config.truncate_timeout = slo_timeout_or(_default_timeout_config.truncate_timeout);
|
||||
_timeout_config.cas_timeout = slo_timeout_or(_default_timeout_config.cas_timeout);
|
||||
_timeout_config.other_timeout = slo_timeout_or(_default_timeout_config.other_timeout);
|
||||
|
||||
_workload_type = slo_opt->workload;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,6 +72,7 @@ public:
|
||||
enum class auth_state : uint8_t {
|
||||
UNINITIALIZED, AUTHENTICATION, READY
|
||||
};
|
||||
using workload_type = qos::service_level_options::workload_type;
|
||||
|
||||
// This class is used to move client_state between shards
|
||||
// It is created on a shard that owns client_state than passed
|
||||
@@ -150,10 +151,16 @@ private:
|
||||
timeout_config _default_timeout_config;
|
||||
timeout_config _timeout_config;
|
||||
|
||||
workload_type _workload_type = workload_type::unspecified;
|
||||
|
||||
public:
|
||||
struct internal_tag {};
|
||||
struct external_tag {};
|
||||
|
||||
workload_type get_workload_type() const noexcept {
|
||||
return _workload_type;
|
||||
}
|
||||
|
||||
auth_state get_auth_state() const noexcept {
|
||||
return _auth_state;
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
#include "qos_common.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
|
||||
namespace qos {
|
||||
|
||||
service_level_options service_level_options::replace_defaults(const service_level_options& default_values) const {
|
||||
@@ -38,6 +39,9 @@ service_level_options service_level_options::replace_defaults(const service_leve
|
||||
// leave the value as is
|
||||
},
|
||||
}, ret.timeout);
|
||||
if (ret.workload == workload_type::unspecified) {
|
||||
ret.workload = default_values.workload;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -56,7 +60,37 @@ service_level_options service_level_options::merge_with(const service_level_opti
|
||||
}
|
||||
},
|
||||
}, ret.timeout);
|
||||
// Specified workloads should be preferred over unspecified ones
|
||||
if (ret.workload == workload_type::unspecified || other.workload == workload_type::unspecified) {
|
||||
ret.workload = std::max(ret.workload, other.workload);
|
||||
} else {
|
||||
ret.workload = std::min(ret.workload, other.workload);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string_view service_level_options::to_string(const workload_type& wt) {
|
||||
switch (wt) {
|
||||
case workload_type::unspecified: return "unspecified";
|
||||
case workload_type::batch: return "batch";
|
||||
case workload_type::interactive: return "interactive";
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type& wt) {
|
||||
return os << service_level_options::to_string(wt);
|
||||
}
|
||||
|
||||
std::optional<service_level_options::workload_type> service_level_options::parse_workload_type(std::string_view sv) {
|
||||
if (sv == "unspecified") {
|
||||
return workload_type::unspecified;
|
||||
} else if (sv == "interactive") {
|
||||
return workload_type::interactive;
|
||||
} else if (sv == "batch") {
|
||||
return workload_type::batch;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -45,8 +45,13 @@ struct service_level_options {
|
||||
bool operator!=(const delete_marker&) const { return false; };
|
||||
};
|
||||
|
||||
enum class workload_type {
|
||||
unspecified, batch, interactive
|
||||
};
|
||||
|
||||
using timeout_type = std::variant<unset_marker, delete_marker, lowres_clock::duration>;
|
||||
timeout_type timeout = unset_marker{};
|
||||
workload_type workload = workload_type::unspecified;
|
||||
|
||||
service_level_options replace_defaults(const service_level_options& other) const;
|
||||
// Merges the values of two service level options. The semantics depends
|
||||
@@ -55,8 +60,13 @@ struct service_level_options {
|
||||
|
||||
bool operator==(const service_level_options& other) const = default;
|
||||
bool operator!=(const service_level_options& other) const = default;
|
||||
|
||||
static std::string_view to_string(const workload_type& wt);
|
||||
static std::optional<workload_type> parse_workload_type(std::string_view sv);
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const service_level_options::workload_type&);
|
||||
|
||||
using service_levels_info = std::map<sstring, service_level_options>;
|
||||
|
||||
///
|
||||
|
||||
@@ -288,7 +288,7 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
// user1
|
||||
//
|
||||
// which means that user1 should inherit timeouts from all other users
|
||||
cquery_nofail(e, "GRANT user2 TO user1");
|
||||
cquery_nofail(e, "GRANT user2 TO user1");
|
||||
cquery_nofail(e, "GRANT user3 TO user2");
|
||||
cquery_nofail(e, "GRANT user4 TO user2");
|
||||
e.refresh_client_state().get();
|
||||
@@ -318,3 +318,69 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
|
||||
cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)");
|
||||
}, cfg);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_alter_with_workload_type) {
|
||||
auto cfg = make_shared<db::config>();
|
||||
cfg->authenticator(sstring(auth::password_authenticator_name));
|
||||
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auth::role_config config {
|
||||
.can_login = true,
|
||||
};
|
||||
auth::authentication_options opts {
|
||||
.password = "pass"
|
||||
};
|
||||
auth::create_role(e.local_auth_service(), "user1", config, opts).get();
|
||||
auth::create_role(e.local_auth_service(), "user2", config, opts).get();
|
||||
auth::create_role(e.local_auth_service(), "user3", config, opts).get();
|
||||
auth::create_role(e.local_auth_service(), "user4", config, opts).get();
|
||||
authenticate(e, "user1", "pass").get();
|
||||
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl TO user1");
|
||||
|
||||
auto msg = cquery_nofail(e, "SELECT workload_type FROM system_distributed.service_levels");
|
||||
assert_that(msg).is_rows().with_rows({{
|
||||
utf8_type->decompose("unspecified")
|
||||
}});
|
||||
|
||||
e.refresh_client_state().get();
|
||||
// Default workload type is `unspecified`
|
||||
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::unspecified);
|
||||
|
||||
// When multiple per-role timeouts apply, the smallest value is always effective
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl2 WITH workload_type = 'unspecified'");
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl3 WITH workload_type = 'batch'");
|
||||
cquery_nofail(e, "CREATE SERVICE LEVEL sl4 WITH workload_type = 'interactive'");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl2 TO user2");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl3 TO user3");
|
||||
cquery_nofail(e, "ATTACH SERVICE LEVEL sl4 TO user4");
|
||||
cquery_nofail(e, "ALTER SERVICE LEVEL sl WITH workload_type = 'interactive'");
|
||||
// The roles are granted as follows:
|
||||
// user4 user3
|
||||
// \ /
|
||||
// user2
|
||||
// /
|
||||
// user1
|
||||
//
|
||||
// which means that user1 should inherit workload types from all other users
|
||||
cquery_nofail(e, "GRANT user2 TO user1");
|
||||
cquery_nofail(e, "GRANT user3 TO user2");
|
||||
cquery_nofail(e, "GRANT user4 TO user2");
|
||||
e.refresh_client_state().get();
|
||||
// For user1, the effective workload type should be batch
|
||||
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch);
|
||||
// after switching to user2, still batch
|
||||
authenticate(e, "user2", "pass").get();
|
||||
e.refresh_client_state().get();
|
||||
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch);
|
||||
// after switching to user3, batch again
|
||||
authenticate(e, "user3", "pass").get();
|
||||
e.refresh_client_state().get();
|
||||
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::batch);
|
||||
// after switching to user4, the workload is interactive
|
||||
authenticate(e, "user4", "pass").get();
|
||||
e.refresh_client_state().get();
|
||||
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::interactive);
|
||||
}, cfg);
|
||||
}
|
||||
@@ -4878,14 +4878,14 @@ SEASTAR_TEST_CASE(test_user_based_sla_queries) {
|
||||
e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get();
|
||||
auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{utf8_type->decompose("sl_1"), {}},
|
||||
{utf8_type->decompose("sl_1"), {}, utf8_type->decompose("unspecified")},
|
||||
});
|
||||
e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get();
|
||||
//drop service levels
|
||||
e.execute_cql("DROP SERVICE_LEVEL sl_1;").get();
|
||||
msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0();
|
||||
assert_that(msg).is_rows().with_rows({
|
||||
{utf8_type->decompose("sl_2"), {}},
|
||||
{utf8_type->decompose("sl_2"), {}, utf8_type->decompose("unspecified")},
|
||||
});
|
||||
|
||||
// validate exceptions (illegal requests)
|
||||
|
||||
@@ -68,3 +68,21 @@ def test_attached_service_level(scylla_only, cql):
|
||||
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
||||
res_one = cql.execute(f"LIST ALL ATTACHED SERVICE LEVELS").one()
|
||||
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
|
||||
|
||||
# Test that declaring service level workload types is possible
|
||||
def test_set_workload_type(scylla_only, cql):
|
||||
with new_service_level(cql) as sl:
|
||||
res = cql.execute(f"LIST SERVICE LEVEL {sl}")
|
||||
assert res.one().workload_type == 'unspecified'
|
||||
for wt in ['interactive', 'batch', 'unspecified']:
|
||||
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{wt}'")
|
||||
res = cql.execute(f"LIST SERVICE LEVEL {sl}")
|
||||
assert res.one().workload_type == wt
|
||||
|
||||
# Test that workload type input is validated
|
||||
def test_set_invalid_workload_types(scylla_only, cql):
|
||||
with new_service_level(cql) as sl:
|
||||
for incorrect in ['', 'null', 'i', 'b', 'dog', 'x'*256]:
|
||||
print(f"Checking {incorrect}")
|
||||
with pytest.raises(Exception):
|
||||
cql.execute(f"ALTER SERVICE LEVEL {sl} WITH workload_type = '{incorrect}'")
|
||||
@@ -521,6 +521,10 @@ cql_server::connection::connection(cql_server& server, socket_address server_add
|
||||
, _server_addr(server_addr)
|
||||
, _client_state(service::client_state::external_tag{}, server._auth_service, &server._sl_controller, server.timeout_config(), addr)
|
||||
{
|
||||
_shedding_timer.set_callback([this] {
|
||||
clogger.debug("Shedding all incoming requests due to overload");
|
||||
_shed_incoming_requests = true;
|
||||
});
|
||||
}
|
||||
|
||||
cql_server::connection::~connection() {
|
||||
@@ -577,6 +581,13 @@ future<> cql_server::connection::process_request() {
|
||||
}
|
||||
|
||||
auto& f = *maybe_frame;
|
||||
|
||||
const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
|
||||
if (allow_shedding && _shed_incoming_requests) {
|
||||
++_server._stats.requests_shed;
|
||||
return _read_buf.skip(f.length);
|
||||
}
|
||||
|
||||
tracing_request_type tracing_requested = tracing_request_type::not_requested;
|
||||
if (f.flags & cql_frame_flags::tracing) {
|
||||
// If tracing is requested for a specific CQL command - flush
|
||||
@@ -608,19 +619,47 @@ future<> cql_server::connection::process_request() {
|
||||
});
|
||||
}
|
||||
|
||||
auto fut = get_units(_server._memory_available, mem_estimate);
|
||||
const auto shedding_timeout = std::chrono::milliseconds(50);
|
||||
auto fut = allow_shedding
|
||||
? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) {
|
||||
try {
|
||||
return make_ready_future<semaphore_units<>>(f.get0());
|
||||
} catch (semaphore_timed_out sto) {
|
||||
// Cancel shedding in case no more requests are going to do that on completion
|
||||
if (_pending_requests_gate.get_count() == 0) {
|
||||
_shed_incoming_requests = false;
|
||||
}
|
||||
return _read_buf.skip(length).then([sto = std::move(sto)] () mutable {
|
||||
return make_exception_future<semaphore_units<>>(std::move(sto));
|
||||
});
|
||||
}
|
||||
})
|
||||
: get_units(_server._memory_available, mem_estimate);
|
||||
if (_server._memory_available.waiters()) {
|
||||
if (allow_shedding && !_shedding_timer.armed()) {
|
||||
_shedding_timer.arm(shedding_timeout);
|
||||
}
|
||||
++_server._stats.requests_blocked_memory;
|
||||
}
|
||||
|
||||
return fut.then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
|
||||
return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) {
|
||||
if (mem_permit_fut.failed()) {
|
||||
// Ignore semaphore errors - they are expected if load shedding took place
|
||||
mem_permit_fut.ignore_ready_future();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
semaphore_units<> mem_permit = mem_permit_fut.get0();
|
||||
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
|
||||
|
||||
++_server._stats.requests_served;
|
||||
++_server._stats.requests_serving;
|
||||
|
||||
_pending_requests_gate.enter();
|
||||
auto leave = defer([this] { _pending_requests_gate.leave(); });
|
||||
auto leave = defer([this] {
|
||||
_shedding_timer.cancel();
|
||||
_shed_incoming_requests = false;
|
||||
_pending_requests_gate.leave();
|
||||
});
|
||||
auto istream = buf.get_istream();
|
||||
(void)_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit)
|
||||
.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
|
||||
@@ -184,6 +184,8 @@ private:
|
||||
cql_serialization_format _cql_serialization_format = cql_serialization_format::latest();
|
||||
service::client_state _client_state;
|
||||
std::unordered_map<uint16_t, cql_query_state> _query_states;
|
||||
timer<lowres_clock> _shedding_timer;
|
||||
bool _shed_incoming_requests = false;
|
||||
unsigned _request_cpu = 0;
|
||||
|
||||
enum class tracing_request_type : uint8_t {
|
||||
|
||||
Reference in New Issue
Block a user