Merge 'Per-partition rate limiting' from Piotr Dulikowski

Due to its sharded and token-based architecture, Scylla works best when the user workload is more or less uniformly balanced across all nodes and shards. However, a common case when this assumption is broken is the "hot partition" - suddenly, a single partition starts getting a lot more reads and writes in comparison to other partitions. Because the shards owning the partition have only a fraction of the total cluster capacity, this quickly causes latency problems for other partitions within the same shard and vnode.

This PR introduces per-partition rate limiting feature. Now, users can choose to apply per-partition limits to their tables of choice using a schema extension:

```
ALTER TABLE ks.tbl
WITH per_partition_rate_limit = {
	'max_writes_per_second': 100,
	'max_reads_per_second': 200
};
```

Reads and writes which are detected to go over that quota are rejected to the client using a new RATE_LIMIT_ERROR CQL error code - existing error codes didn't really fit well with the rate limit error, so a new error code is added. This code is implemented as a part of a CQL protocol extension and returned to clients only if they requested the extension - if not, the existing CONFIG_ERROR will be used instead.

Limits are tracked and enforced on the replica side. If a write fails with some replicas reporting rate limit being reached, the rate limit error is propagated to the client. Additionally, the following optimization is implemented: if the coordinator shard/node is also a replica, we account the operation into the rate limit early and return an error in case of exceeding the rate limit before sending any messages to other replicas at all.

The PR covers regular, non-batch writes and single-partition reads. LWT and counters are not covered here.

Results of `perf_simple_query --smp=1 --operations-per-shard=1000000`:

- Write mode:
  ```
  8f690fdd47 (PR base):
  129644.11 tps ( 56.2 allocs/op,  13.2 tasks/op,   49785 insns/op)
  This PR:
  125564.01 tps ( 56.2 allocs/op,  13.2 tasks/op,   49825 insns/op)
  ```
- Read mode:
  ```
  8f690fdd47 (PR base):
  150026.63 tps ( 63.1 allocs/op,  12.1 tasks/op,   42806 insns/op)
  This PR:
  151043.00 tps ( 63.1 allocs/op,  12.1 tasks/op,   43075 insns/op)
  ```

Manual upgrade test:
- Start 3 nodes, 4 shards each, Scylla version 8f690fdd47
- Create a keyspace with scylla-bench, RF=3
- Start reading and writing with scylla-bench with CL=QUORUM
- Manually upgrade nodes one by one to the version from this PR
- Upgrade succeeded, apart from a small number of operations which failed when each node was being put down all reads/writes succeeded
- Successfully altered the scylla-bench table to have a read and write limit and those limits were enforced as expected

Fixes: #4703

Closes #9810

* github.com:scylladb/scylla:
  storage_proxy: metrics for per-partition rate limiting of reads
  storage_proxy: metrics for per-partition rate limiting of writes
  database: add stats for per partition rate limiting
  tests: add per_partition_rate_limit_test
  config: add add_per_partition_rate_limit_extension function for testing
  cf_prop_defs: guard per-partition rate limit with a feature
  query-request: add allow_limit flag
  storage_proxy: add allow rate limit flag to get_read_executor
  storage_proxy: resultize return type of get_read_executor
  storage_proxy: add per partition rate limit info to read RPC
  storage_proxy: add per partition rate limit info to query_result_local(_digest)
  storage_proxy: add allow rate limit flag to mutate/mutate_result
  storage_proxy: add allow rate limit flag to mutate_internal
  storage_proxy: add allow rate limit flag to mutate_begin
  storage_proxy: choose the right per partition rate limit info in write handler
  storage_proxy: resultize return types of write handler creation path
  storage_proxy: add per partition rate limit to mutation_holders
  storage_proxy: add per partition rate limit info to write RPC
  storage_proxy: add per partition rate limit info to mutate_locally
  database: apply per-partition rate limiting for reads/writes
  database: move and rename: classify_query -> classify_request
  schema: add per_partition_rate_limit schema extension
  db: add rate_limiter
  storage_proxy: propagate rate_limit_exception through read RPC
  gms: add TYPED_ERRORS_IN_READ_RPC cluster feature
  storage_proxy: pass rate_limit_exception through write RPC
  replica: add rate_limit_exception and a simple serialization framework
  docs: design doc for per-partition rate limiting
  transport: add rate_limit_error
This commit is contained in:
Avi Kivity
2022-06-23 19:39:22 +03:00
54 changed files with 2127 additions and 280 deletions

View File

@@ -450,6 +450,7 @@ set(scylla_sources
db/large_data_handler.cc
db/legacy_schema_migrator.cc
db/marshal/type_parser.cc
db/rate_limiter.cc
db/schema_tables.cc
db/size_estimates_virtual_reader.cc
db/snapshot-ctl.cc

View File

@@ -1510,6 +1510,7 @@ static future<std::unique_ptr<rjson::value>> get_previous_item(
stats.reads_before_write++;
auto selection = cql3::selection::selection::wildcard(schema);
auto command = previous_item_read_command(proxy, schema, ck, selection);
command->allow_limit = db::allow_per_partition_rate_limit::yes;
auto cl = db::consistency_level::LOCAL_QUORUM;
return proxy.query(schema, command, to_partition_ranges(*schema, pk), cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), std::move(permit), client_state)).then(
@@ -1543,7 +1544,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
if (!m) {
return make_ready_future<executor::request_return_type>(api_error::conditional_check_failed("Failed condition."));
}
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit)).then([this] () mutable {
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes).then([this] () mutable {
return rmw_operation_return(std::move(_return_attributes));
});
});
@@ -1551,7 +1552,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
std::optional<mutation> m = apply(nullptr, api::new_timestamp());
assert(m); // !needs_read_before_write, so apply() did not check a condition
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit)).then([this] () mutable {
return proxy.mutate(std::vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes).then([this] () mutable {
return rmw_operation_return(std::move(_return_attributes));
});
}
@@ -1896,7 +1897,8 @@ static future<> do_batch_write(service::storage_proxy& proxy,
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
std::move(permit));
std::move(permit),
db::allow_per_partition_rate_limit::yes);
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding
@@ -3252,6 +3254,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
auto selection = cql3::selection::selection::wildcard(rs.schema);
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable {

View File

@@ -284,7 +284,8 @@ static future<> expire_item(service::storage_proxy& proxy,
return proxy.mutate(std::vector<mutation>{std::move(m)},
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(), // FIXME - which timeout?
qs.get_trace_state(), qs.get_permit());
qs.get_trace_state(), qs.get_permit(),
db::allow_per_partition_rate_limit::no);
}
static size_t random_offset(size_t min, size_t max) {

View File

@@ -505,6 +505,8 @@ scylla_tests = set([
'test/boost/group0_test',
'test/boost/exception_container_test',
'test/boost/result_utils_test',
'test/boost/rate_limiter_test',
'test/boost/per_partition_rate_limit_test',
'test/boost/expr_test',
'test/manual/ec2_snitch_test',
'test/manual/enormous_table_scan_test',
@@ -668,6 +670,7 @@ scylla_core = (['replica/database.cc',
'replica/table.cc',
'replica/distributed_loader.cc',
'replica/memtable.cc',
'replica/exceptions.cc',
'absl-flat_hash_map.cc',
'atomic_cell.cc',
'caching_options.cc',
@@ -886,6 +889,8 @@ scylla_core = (['replica/database.cc',
'db/view/row_locking.cc',
'db/sstables-format-selector.cc',
'db/snapshot-ctl.cc',
'db/rate_limiter.cc',
'db/per_partition_rate_limit_options.cc',
'index/secondary_index_manager.cc',
'index/secondary_index.cc',
'utils/UUID_gen.cc',
@@ -1128,6 +1133,8 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/storage_proxy.idl.hh',
'idl/group0_state_machine.idl.hh',
'idl/forward_request.idl.hh',
'idl/replica_exception.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
]
rusts = [
@@ -1279,6 +1286,7 @@ deps['test/boost/linearizing_input_stream_test'] = [
"test/lib/log.cc",
]
deps['test/boost/expr_test'] = ['test/boost/expr_test.cc'] + scylla_core
deps['test/boost/rate_limiter_test'] = ['test/boost/rate_limiter_test.cc', 'db/rate_limiter.cc']
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/schema_loader_test'] += ['tools/schema_loader.cc']

View File

@@ -317,7 +317,7 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
mutate_atomic = false;
}
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit));
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, mutate_atomic, std::move(tr_state), std::move(permit), db::allow_per_partition_rate_limit::yes);
}
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_with_conditions(

View File

@@ -17,6 +17,8 @@
#include "gms/feature_service.hh"
#include "tombstone_gc_extension.hh"
#include "tombstone_gc.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "db/per_partition_rate_limit_options.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -127,6 +129,11 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception("CDC not supported by the cluster");
}
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
if (per_partition_rate_limit_options && !db.features().typed_errors_in_read_rpc) {
throw exceptions::configuration_exception("Per-partition rate limit is not supported yet by the whole cluster");
}
auto tombstone_gc_options = get_tombstone_gc_options(schema_extensions);
validate_tombstone_gc_options(tombstone_gc_options, db, ks_name);
@@ -219,6 +226,16 @@ const tombstone_gc_options* cf_prop_defs::get_tombstone_gc_options(const schema:
return &ext->get_options();
}
const db::per_partition_rate_limit_options* cf_prop_defs::get_per_partition_rate_limit_options(const schema::extensions_map& schema_exts) const {
auto it = schema_exts.find(db::per_partition_rate_limit_extension::NAME);
if (it == schema_exts.end()) {
return nullptr;
}
auto ext = dynamic_pointer_cast<db::per_partition_rate_limit_extension>(it->second);
return &ext->get_options();
}
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));

View File

@@ -78,6 +78,7 @@ public:
const cdc::options* get_cdc_options(const schema::extensions_map&) const;
std::optional<caching_options> get_caching_options() const;
const tombstone_gc_options* get_tombstone_gc_options(const schema::extensions_map&) const;
const db::per_partition_rate_limit_options* get_per_partition_rate_limit_options(const schema::extensions_map&) const;
#if 0
public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException
{

View File

@@ -284,7 +284,7 @@ modification_statement::execute_without_condition(query_processor& qp, service::
return make_ready_future<coordinator_result<>>(bo::success());
}
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), this->is_raw_counter_shard_write());
return qp.proxy().mutate_with_triggers(std::move(mutations), cl, timeout, false, qs.get_trace_state(), qs.get_permit(), db::allow_per_partition_rate_limit::yes, this->is_raw_counter_shard_write());
});
}

View File

@@ -360,6 +360,7 @@ select_statement::do_execute(query_processor& qp,
utils::UUID(),
query::is_first_page::no,
options.get_timestamp(state));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
int32_t page_size = options.get_page_size();
@@ -530,6 +531,7 @@ indexed_table_select_statement::prepare_command_for_base_query(query_processor&
utils::UUID(),
query::is_first_page::no,
options.get_timestamp(state));
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
return cmd;
}

View File

@@ -243,7 +243,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
// send to partially or wholly fail in actually sending stuff. Since we don't
// have hints (yet), send with CL=ALL, and hope we can re-do this soon.
// See below, we use retry on write failure.
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit());
return _qp.proxy().mutate(mutations, db::consistency_level::ALL, db::no_timeout, nullptr, empty_service_permit(), db::allow_per_partition_rate_limit::no);
});
}).then_wrapped([this, id](future<> batch_result) {
try {

View File

@@ -23,6 +23,7 @@
#include "cdc/cdc_extension.hh"
#include "tombstone_gc_extension.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "config.hh"
#include "extensions.hh"
#include "log.hh"
@@ -910,6 +911,10 @@ void db::config::add_cdc_extension() {
_extensions->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
}
void db::config::add_per_partition_rate_limit_extension() {
_extensions->add_schema_extension<db::per_partition_rate_limit_extension>(db::per_partition_rate_limit_extension::NAME);
}
void db::config::setup_directories() {
maybe_in_workdir(commitlog_directory, "commitlog");
maybe_in_workdir(data_file_directories, "data");

View File

@@ -107,6 +107,7 @@ public:
// For testing only
void add_cdc_extension();
void add_per_partition_rate_limit_extension();
/// True iff the feature is enabled.
bool check_experimental(experimental_features_t::feature f) const;

23
db/operation_type.hh Normal file
View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <iosfwd>
namespace db {
enum class operation_type : uint8_t {
read = 0,
write = 1
};
std::ostream& operator<<(std::ostream& os, operation_type op_type);
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "db/per_partition_rate_limit_options.hh"
namespace db {
class per_partition_rate_limit_extension : public schema_extension {
per_partition_rate_limit_options _options;
public:
static constexpr auto NAME = "per_partition_rate_limit";
per_partition_rate_limit_extension() = default;
per_partition_rate_limit_extension(const per_partition_rate_limit_options& opts) : _options(opts) {}
explicit per_partition_rate_limit_extension(const std::map<sstring, sstring>& tags) : _options(tags) {}
explicit per_partition_rate_limit_extension(const bytes& b) : _options(deserialize(b)) {}
explicit per_partition_rate_limit_extension(const sstring& s) {
throw std::logic_error("Cannot create per partition rate limit info from string");
}
bytes serialize() const override {
return ser::serialize_to_buffer<bytes>(_options.to_map());
}
static std::map<sstring, sstring> deserialize(const bytes_view& buffer) {
return ser::deserialize_from_buffer(buffer, boost::type<std::map<sstring, sstring>>());
}
const per_partition_rate_limit_options& get_options() const {
return _options;
}
};
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <variant>
#include <seastar/util/bool_class.hh>
namespace db {
using allow_per_partition_rate_limit = seastar::bool_class<class allow_per_partition_rate_limit_tag>;
namespace per_partition_rate_limit {
// Tells the replica to account the operation (increase the corresponding counter)
// and accept it regardless from the value of the counter.
//
// Used when the coordinator IS a replica (correct node and shard).
struct account_only {};
// Tells the replica to account the operation and decide whether to reject
// or not, based on the random variable sent by the coordinator.
//
// Used when the coordinator IS NOT a replica (wrong node or shard).
struct account_and_enforce {
// A random 32-bit number generated by the coordinator.
// Replicas are supposed to use it in order to decide whether
// to accept or reject.
uint32_t random_variable;
inline double get_random_variable_as_double() const {
return double(random_variable) / double(1LL << 32);
}
};
// std::monostate -> do not count to the rate limit and never reject
// account_and_enforce -> account to the rate limit and optionally reject
using info = std::variant<std::monostate, account_only, account_and_enforce>;
} // namespace per_partition_rate_limit
} // namespace db

View File

@@ -0,0 +1,63 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <optional>
#include <boost/range/adaptor/map.hpp>
#include "serializer.hh"
#include "schema.hh"
#include "log.hh"
namespace db {
const char* per_partition_rate_limit_options::max_writes_per_second_key = "max_writes_per_second";
const char* per_partition_rate_limit_options::max_reads_per_second_key = "max_reads_per_second";
per_partition_rate_limit_options::per_partition_rate_limit_options(std::map<sstring, sstring> map) {
auto handle_uint32_arg = [&] (const char* key) -> std::optional<uint32_t> {
auto it = map.find(key);
if (it == map.end()) {
return std::nullopt;
}
try {
const uint32_t ret = std::stol(it->second);
map.erase(it);
return ret;
} catch (std::invalid_argument&) {
throw exceptions::configuration_exception(format(
"Invalid value for {} option: expected a non-negative number",
key));
} catch (std::out_of_range&) {
throw exceptions::configuration_exception(format(
"Value for {} is out of range accepted by 32-bit numbers",
key));
}
};
_max_writes_per_second = handle_uint32_arg(max_writes_per_second_key);
_max_reads_per_second = handle_uint32_arg(max_reads_per_second_key);
if (!map.empty()) {
throw exceptions::configuration_exception(format(
"Unknown keys in map for per_partition_rate_limit extension: {}",
::join(", ", map | boost::adaptors::map_keys)));
}
}
std::map<sstring, sstring> per_partition_rate_limit_options::to_map() const {
std::map<sstring, sstring> ret;
if (_max_writes_per_second) {
ret.insert_or_assign(max_writes_per_second_key, std::to_string(*_max_writes_per_second));
}
if (_max_reads_per_second) {
ret.insert_or_assign(max_reads_per_second_key, std::to_string(*_max_reads_per_second));
}
return ret;
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <optional>
#include "operation_type.hh"
#include "serializer.hh"
#include "schema.hh"
#include "log.hh"
namespace db {
class per_partition_rate_limit_options final {
private:
static const char* max_writes_per_second_key;
static const char* max_reads_per_second_key;
private:
std::optional<uint32_t> _max_writes_per_second;
std::optional<uint32_t> _max_reads_per_second;
public:
per_partition_rate_limit_options() = default;
per_partition_rate_limit_options(std::map<sstring, sstring> map);
std::map<sstring, sstring> to_map() const;
inline std::optional<uint32_t> get_max_ops_per_second(operation_type op_type) const {
switch (op_type) {
case operation_type::write:
return _max_writes_per_second;
case operation_type::read:
return _max_reads_per_second;
}
}
inline void set_max_writes_per_second(std::optional<uint32_t> v) {
_max_writes_per_second = v;
}
inline std::optional<uint32_t> get_max_writes_per_second() const {
return _max_writes_per_second;
}
inline void set_max_reads_per_second(std::optional<uint32_t> v) {
_max_reads_per_second = v;
}
inline std::optional<uint32_t> get_max_reads_per_second() const {
return _max_reads_per_second;
}
};
}

305
db/rate_limiter.cc Normal file
View File

@@ -0,0 +1,305 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <cmath>
#include <numbers>
#include <array>
#include <random>
#include <variant>
#include <chrono>
#include <seastar/core/metrics.hh>
#include "utils/small_vector.hh"
#include "utils/murmur_hash.hh"
#include "db/rate_limiter.hh"
// The rate limiter keeps a hashmap of counters differentiated by operation type
// (e.g. read or write) and the partition token. On each operation,
// the corresponding counter is increased by 1.
//
// The counters are decremented via two mechanisms:
//
// 1. Every `time_window_duration`, all counters are halved.
// 2. Within a time window, on every `bucket_size` operations all counters
// are decremented by 1.
//
// The mechanism 1) makes sure that we do not forget about very frequent
// operations too quick and makes it possible to reject in a probabilistic
// manner (this is described in more detail in design notes).
//
// The mechanism 2) protects the internal hashmap from being flooded with
// counters with low values. This causes the rate limiter to underestimate
// the counter values by the current number of "buckets" within this
// time window. This strategy is also known as "lossy counting".
//
// Both mechanisms 1) and 2) are implemented in a lazy manner.
namespace db {
static constexpr size_t hash_bits = 16;
static constexpr size_t entry_count = 1 << hash_bits;
static constexpr size_t bucket_size = 10000;
void rate_limiter_base::on_timer() noexcept {
_time_window_history.pop_back();
_time_window_history.insert(_time_window_history.begin(), time_window_entry {
.entries_active = _current_entries_in_time_window,
.lossy_counting_decrease = _current_bucket,
});
_current_bucket = 0;
_current_ops_in_bucket = 0;
_current_entries_in_time_window = 0;
_current_time_window = (_current_time_window + 1) % (1 << time_window_bits);
// Because time window ids are 12 bit numbers and we increase the current
// time window number by 1 every second, it wraps around every 4096
// seconds (more than an hour). Because of this, some very old entry
// updated last 4096 seconds may accidentally become valid again.
//
// In order to prevent this, we make sure to update the entries
// more frequently. We do this by refreshing all the entries within half
// of the wraparound period (2048 seconds).
//
// Instead of clearing everything at once, we divide this operation
// into many small steps and perform them during time window change.
//
// All of this should make sure that each entry's time window is not
// older than 2048 seconds from the current generation.
constexpr size_t period = 1 << (time_window_bits - 1);
constexpr size_t entries_per_step = entry_count / period;
const size_t begin = _current_time_window * entries_per_step;
for (size_t i = 0; i < entries_per_step; i++) {
entry_refresh(_entries[(begin + i) % entry_count]);
}
}
rate_limiter_base::entry* rate_limiter_base::get_entry(uint32_t label, uint64_t token) noexcept {
// We need to either find the existing entry for this (label, token) combination
// or otherwise find an invalid entry which we can initialize and use.
//
// We start by looking at the entry corresponding to the computed hash,
// if it's occupied by another (label, token) try other entries using
// the quadratic probing strategy.
//
// We limit ourselves to 32 attempts - if no suitable entry is found
// then we return nullptr and admit the operation unconditionally.
// Because we use quadratic probing and entries can be deleted (lazily),
// a situation can occur where an entry A suddenly becomes inaccessible
// because another entry B which is earlier on the probe chain is deleted.
// One of the following will happen:
//
// 1. Either we will allocate a new entry over B and A becomes accessible
// again,
// 2. Or we will allocate a new entry for the same operation/partition as A
// and A will eventually expire.
//
// In the worst case, A might be a "hot" entry and be actively rate limited
// and the described situation will cause a large number of operations
// to be admitted. Fortunately, this will move the entry earlier in the
// probe chain, so this situation will happen a limited number of times (if
// any at all) for a single "hot" entry.
size_t hash = compute_hash(label, token);
static constexpr size_t max_probes = 32;
for (size_t i = 0; i < max_probes; i++) {
// Quadratic probing - every iteration jumps further than the previous one
hash = (hash + i) % entry_count;
entry& b = _entries[hash];
++_metrics.probe_count;
entry_refresh(b);
if (entry_is_empty(b)) {
++_metrics.allocations_on_empty;
b.token = token;
b.label = label;
b.op_count = _current_bucket;
return &b;
} else if (b.token == token && b.label == label) {
++_metrics.successful_lookups;
return &b;
}
}
++_metrics.failed_allocations;
return nullptr;
}
size_t rate_limiter_base::compute_hash(uint32_t label, uint64_t token) noexcept {
// The map key is a tuple (token, key) + salt
// The key is hashed with murmur hash for good hash quality
static constexpr size_t key_length = sizeof(token) + sizeof(label) + sizeof(_salt);
std::array<uint8_t, key_length> key;
uint8_t* ptr = key.data();
memcpy(ptr, &token, sizeof(token));
ptr += sizeof(token);
memcpy(ptr, &label, sizeof(label));
ptr += sizeof(label);
memcpy(ptr, &_salt, sizeof(_salt));
std::array<uint64_t, 2> out;
utils::murmur_hash::hash3_x64_128(key.data(), key_length, 0, out);
return out[0];
}
void rate_limiter_base::entry_refresh(rate_limiter_base::entry& b) noexcept {
uint32_t window_delta = _current_time_window - b.time_window;
if (window_delta == 0) {
// The entry is fresh, it was allocated in this time window
return;
}
if (window_delta < _time_window_history.size()) {
// The entry is not that old so we have to apply the effects
// of lossy counting and halving on time window switch
--_time_window_history[window_delta - 1].entries_active;
while (window_delta > 0) {
if (b.op_count > _time_window_history[window_delta - 1].lossy_counting_decrease) {
b.op_count -= _time_window_history[window_delta - 1].lossy_counting_decrease;
} else {
b.op_count = 0;
}
b.op_count /= 2;
--window_delta;
}
} else {
// The entry is very old and the op_count can be safely decreased to zero
b.op_count = 0;
}
++_current_entries_in_time_window;
b.time_window = _current_time_window;
}
bool rate_limiter_base::entry_is_empty(const rate_limiter_base::entry& b) noexcept {
return b.op_count <= _current_bucket;
}
void rate_limiter_base::register_metrics() {
namespace sm = seastar::metrics;
_metric_group.add_group("per_partition_rate_limiter", {
// TODO: Most of the following metrics are pretty low-level and not useful for users,
// perhaps they should be hidden behind a configuration flag
sm::make_counter("allocations", _metrics.allocations_on_empty,
sm::description("Number of times a entry was allocated over an empty/expired entry.")),
sm::make_counter("successful_lookups", _metrics.successful_lookups,
sm::description("Number of times a lookup returned an already allocated entry.")),
sm::make_counter("failed_allocations", _metrics.failed_allocations,
sm::description("Number of times the rate limiter gave up trying to allocate.")),
sm::make_counter("probe_count", _metrics.probe_count,
sm::description("Number of probes made during lookups.")),
sm::make_gauge("load_factor", [&] {
uint32_t occupied_entry_count = _current_entries_in_time_window;
for (const auto& twe : _time_window_history) {
occupied_entry_count += twe.entries_active;
}
return double(occupied_entry_count) / double(entry_count);
},
sm::description("Current load factor of the hash table (upper bound, may be overestimated).")),
});
}
rate_limiter_base::rate_limiter_base()
: _salt(std::random_device{}())
, _entries(entry_count)
, _time_window_history(op_count_bits - 1) {
register_metrics();
}
uint64_t rate_limiter_base::increase_and_get_counter(label& l, uint64_t token) noexcept {
// Assign a label if not done yet
if (l._label == 0) {
l._label = _next_label++;
}
entry* b = get_entry(l._label, token);
if (!b) {
// We failed to allocate a entry for this partition. This means that
// we won't track hit count for this partition during this time window.
// Assume that it's OK to admit the operation.
return 0;
}
// Protect from wrap-around
b->op_count = std::min<uint32_t>((1 << op_count_bits) - 1, b->op_count + 1);
++_current_ops_in_bucket;
if (_current_ops_in_bucket >= bucket_size) {
// Every `bucket_size` operations, virtually decrement all entries
// by one. We implement it by always subtracting the `_current_bucket`
// when comparing the count in the entry with the limit.
++_current_bucket;
_current_ops_in_bucket -= bucket_size;
}
return b->op_count - _current_bucket;
}
rate_limiter_base::can_proceed rate_limiter_base::account_operation(
label& l, uint64_t token, uint64_t limit,
const db::per_partition_rate_limit::info& rate_limit_info) noexcept {
if (std::holds_alternative<std::monostate>(rate_limit_info)) {
// Rate limiting turned off
return can_proceed::yes;
}
const uint64_t count = increase_and_get_counter(l, token);
if (auto* info = std::get_if<db::per_partition_rate_limit::account_and_enforce>(&rate_limit_info)) {
// On each time window change we halve the entry counts, therefore
// a partition with X ops/s will stabilize at 2X hits at the end
// of each time window.
if (count <= 2 * limit) {
return can_proceed::yes;
} else {
// As mentioned before, assuming a fixed operation rate, the operation
// count in a entry will oscillate between X at the beginning of the
// time window and 2X at the end. In order to only accept `limit`
// operations within a time window, we need to reject with probability
// P_c(x), where P_c(x) is a function such that, integrated over [X, 2X]
// will be equal to `limit`. `P_c(x) = limit / (x * ln 2)` satisfies
// this criterion.
//
// All replicas get the same value for the random variable X, with an
// expectation that all replicas' counters oscillate between the same
// values. Because of that, most of the time replicas will agree
// and either all accept or reject.
if (info->get_random_variable_as_double() * double(count) * std::numbers::ln2 < double(limit)) {
return can_proceed::yes;
} else {
return can_proceed::no;
}
}
} else {
return can_proceed::yes;
}
}
template class generic_rate_limiter<seastar::lowres_clock>;
}

178
db/rate_limiter.hh Normal file
View File

@@ -0,0 +1,178 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <cstddef>
#include <chrono>
#include <limits>
#include <concepts>
#include <vector>
#include <optional>
#include <random>
#include <seastar/core/future.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/util/bool_class.hh>
#include "utils/chunked_vector.hh"
#include "db/per_partition_rate_limit_info.hh"
// A data structure used to implement per-partition rate limiting. It accounts
// operations and enforces limits when it is detected that the operation rate
// is too high.
namespace db {
class rate_limiter_base {
public:
static constexpr size_t op_count_bits = 20;
static constexpr size_t time_window_bits = 12;
private:
struct metrics {
uint64_t allocations_on_empty = 0;
uint64_t successful_lookups = 0;
uint64_t failed_allocations = 0;
uint64_t probe_count = 0;
};
// Represents a piece of the hashmap storage.
struct entry {
public:
// The partition key token of the operation which allocated this entry.
uint64_t token = 0;
// The label of the operation which allocated this entry.
// Labels are used to differentiate operations which should be counted
// separately, e.g. reads and writes to the same table or writes
// to two different tables.
uint32_t label = 0;
// The number of operations counted for given token/label.
// It is virtually decremented on each window change, so the real
// operation count is actually `op_count - _current_bucket`.
// If the number drops to zero or below, the entry is considered
// "expired" and may be overwritten by another operation.
uint32_t op_count : op_count_bits = 0;
// ID of the time window in which the entry was allocated.
uint32_t time_window : time_window_bits = 0;
};
struct time_window_entry {
// How many entries are there active within this time window?
uint32_t entries_active = 0;
// By how much should the counter should be decreased within
// this time window?
uint32_t lossy_counting_decrease = 0;
};
public:
struct can_proceed_tag{};
using can_proceed = seastar::bool_class<can_proceed_tag>;
// Identifies a type of operation which is counted separately from other
// operations. For example, reads and writes for given table should have
// separate labels.
struct label {
private:
// The current ID used to identify the label in the rate limiter.
// It is assigned on first use.
uint32_t _label = 0;
friend class rate_limiter_base;
};
private:
uint32_t _current_bucket = 0;
uint32_t _current_ops_in_bucket = 0;
uint32_t _current_entries_in_time_window = 0;
uint32_t _next_label = 1;
uint32_t _current_time_window = 0;
const uint32_t _salt;
utils::chunked_vector<entry> _entries;
std::vector<time_window_entry> _time_window_history;
metrics _metrics;
seastar::metrics::metric_groups _metric_group;
private:
entry* get_entry(uint32_t label, uint64_t token) noexcept;
size_t compute_hash(uint32_t label, uint64_t token) noexcept;
void entry_refresh(entry& b) noexcept;
bool entry_is_empty(const entry& b) noexcept;
void register_metrics();
protected:
void on_timer() noexcept;
public:
rate_limiter_base();
rate_limiter_base(const rate_limiter_base&) = delete;
rate_limiter_base(rate_limiter_base&&) = delete;
rate_limiter_base& operator=(const rate_limiter_base&) = delete;
rate_limiter_base& operator=(rate_limiter_base&&) = delete;
// (For testing purposes only)
// Increments the counter for given (label, token) and returns
// the new value of the counter.
uint64_t increase_and_get_counter(label& l, uint64_t token) noexcept;
// Increments the counter for given (label, token).
// If the counter indicates that the partition is over the limit,
// returns can_proceed::no with some probability.
//
// The `random_variable` parameter should be a value from range [0, 1).
// It is used as the source of randomness - the function chooses a threshold
// and accepts if and only if `random_variable` is below it.
//
// The probability is calculated in such a way that statistically
// only `limit` operations per second are admitted.
can_proceed account_operation(label& l, uint64_t token, uint64_t limit,
const db::per_partition_rate_limit::info& rate_limit_info) noexcept;
};
template<typename ClockType>
class generic_rate_limiter : public rate_limiter_base {
private:
seastar::timer<ClockType> _timer;
public:
generic_rate_limiter()
: rate_limiter_base() {
// Rate limiting is more accurate when the rate limiter timers
// on all nodes are synchronized. Assume that the nodes' clocks
// are synchronized and schedule the first tick on the beginning
// of the closest second.
const auto period = std::chrono::seconds(1);
const auto now = std::chrono::system_clock::now();
const auto initial_delay = period - now.time_since_epoch() % period;
_timer.set_callback([this] { on_timer(); });
_timer.arm(ClockType::now() + initial_delay, period);
}
};
extern template class generic_rate_limiter<seastar::lowres_clock>;
using rate_limiter = generic_rate_limiter<seastar::lowres_clock>;
}

View File

@@ -565,6 +565,7 @@ system_distributed_keyspace::insert_cdc_generation(
db::timeout_clock::now() + 60s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);
});
@@ -661,6 +662,7 @@ system_distributed_keyspace::create_cdc_desc(
db::timeout_clock::now() + 30s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);
});
@@ -704,6 +706,7 @@ system_distributed_keyspace::cdc_desc_exists(
db::timeout_clock::now() + 10s,
nullptr, // trace_state
empty_service_permit(),
db::allow_per_partition_rate_limit::no,
false // raw_counters
);

View File

@@ -2253,7 +2253,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit()).get();
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
}
}

View File

@@ -140,3 +140,43 @@ Subscripting a list in a WHERE clause is supported as are maps.
```cql
WHERE some_list[:index] = :value
```
## Per-partition rate limit
The `per_partition_rate_limit` option can be used to limit the allowed
rate of requests to each partition in a given table. When the cluster detects
that the rate of requests exceeds configured limit, the cluster will start
rejecting some of them in order to bring the throughput back to the configured
limit. Rejected requests are less costly which can help reduce overload.
_NOTE_: Due to Scylla's distributed nature, tracking per-partition request rates
is not perfect and the actual rate of accepted requests may be higher up to
a factor of keyspace's `RF`. This feature should not be used to enforce precise
limits but rather serve as an overload protection feature.
_NOTE): This feature works best when shard-aware drivers are used (rejected
requests have the least cost).
Limits are configured separately for reads and writes. Some examples:
```cql
ALTER TABLE t WITH per_partition_rate_limit = {
'max_reads_per_second': 100,
'max_writes_per_second': 200
};
```
Limit reads only, no limit for writes:
```cql
ALTER TABLE t WITH per_partition_rate_limit = {
'max_reads_per_second': 200
};
```
Rejected requests receive the scylla-specific "Rate limit exceeded" error.
If the driver doesn't support it, `Config_error` will be sent instead.
For more details, see:
- Detailed [`design notes`](./per-partition-rate-limit.md)
- Description of the [rate limit exceeded](./protocol-extensions.md#rate-limit-error) error

View File

@@ -0,0 +1,153 @@
# Per-partition rate limiting
Scylla clusters operate best when the data is spread across a large number
of small partitions, and reads/writes are spread uniformly across all shards
and nodes. Due to various reasons (bugs, malicious end users etc.) this
assumption may suddenly not hold anymore and one partition may start getting
a disproportionate number of requests. In turn, this usually leads to the owning
shards being overloaded - a scenario called "hot partition" - and the total
cluster latency becoming worse.
The _per partition rate limit_ feature allows users to limit the rate
of accepted requests on a per-partition basis. When a partition exceeds
the configured limit of operations of given type (reads/writes) per second,
the cluster will start responding with errors to some of the operations for that
partition so that, statistically, the rate of accepted requests is kept
at the configured limit. Rejected operations use less resources, therefore
this feature can help in the "hot partition" situation.
_NOTE_: this is an overload protection mechanism and may not be used to reliably
enforce limits in some situations. Due to Scylla's distributed nature,
the actual number of accepted requests depends on the cluster and driver
configuration and may be larger by a factor of RF (keyspace's replication
factor). It is recommended to set the limit to a value an order of magnitude
larger than the maximum expected per-partition throughput. See the
[Inaccurracies](#inaccurracies) section for more information.
## Usage
### Server-side configuration
Per-partition limits are set separately for reads and writes, on a per-table
basis. Limits can be set with the `per_partition_rate_limit` extension when
CREATE'ing or ALTER'ing a table using a schema extension:
```cql
ALTER TABLE ks.tbl WITH per_partition_rate_limit = {
'max_reads_per_second': 123,
'max_writes_per_second': 456
};
```
Both `max_reads_per_second` and `max_writes_per_second` are optional - omitting
one of them means "no limit" for that type of operation.
### Driver response
Rejected operations are reported as an ERROR response to the driver.
If the driver supports it, the response contains a scylla-specific error code
indicating that the operation was rejected. For more details about the error
code, see the [Rate limit error](./protocol-extensions.md#Rate%20limit%20error)
section in the `protocol-extensions.md` doc.
If the driver doesn't support the new error code, the `Config_error` code
is returned instead. The code was chosen in order for the retry policies
of the drivers not to retry the requests and instead propagate them directly
to the users.
## How it works
Accounting related to tracking per-partition limits is done by replicas.
Each replica keeps a map of counters which are identified by a combination
of (token, table, operation type). When the replica accounts an operation,
it increments the relevant counter. All counters are halved every second.
Depending on whether the coordinator is a replica or not, the flow is
a bit different. Here, "coordinator == replica" requirement also means
that the operation is handled on the correct shard.
Only reads and writes explicitly issued by the user are counted to the limit.
Read repair, hints, batch replay, CDC preimage query and internal system queries
are _not_ counted to the limit.
Paxos and counters are not covered in current implemenation.
### Coordinator is not a replica
Coordinator generates a random number from range `[0, 1)` with uniform
distribution and sends it to replicas along with the operation request.
Each replica accounts the operation and then calculates a rejection threshold
based on the local counter value. If the number received from the coordinator
is above the threshold, the operation is rejected.
The assumption is that all replicas will converge to similar counter values.
Most of the time they will agree on the decision and not much work
will be wasted due to some replicas accepting and other rejecting.
### Coordinator is a replica
As before, the coordinator generates a random number. However, it does not
send requests to replicas immediately but rather calculates local rejection
threshold. If the number is above threshold, the whole operation is skipped
and the operation is only accounted on the coordinator. Otherwise, coordinator
proceeds with sending the requests, and replicas are told only to account
the operation but never reject it.
This strategy leads to no wasted replica work. However, when the coordinator
rejects the operation other replicas do not account it, so it may lead to
a bit more requests being accepted (but still not more than `RF * limit`).
### How to calculate rejection threshold
Let's assume the simplest case where there is only one replica. It will
increment its counter on every operation. Because all counters are halved
every second, assuming the rate of `V` ops/s the counter will eventually
oscillate between `V` and `2V`. If the limit is `L` ops/s, then we would
like to admit only `L` operation within each second - therefore the probability
should satisfy the following:
```
L = Sum(i = V..2V) { P(i) }
```
This can be approximated with a definite integral:
```
L = Int(x = V..2V) { P(x) }
```
A solution to this integral is:
```
P(x) = L / (x * ln 2)
```
where `x` is the current value of the counter. This is the formula used
in the current implementation.
### Inaccurracies
In practice, RF is rarely 1 so there is more than one replica. Depending on
the type of the operation, this introduces some inaccurracies in counting.
- Writes are counted relatively well because all live replicas participate
in a write operation, so all replicas should have an up-to-date counter
value. Because of the "coordinator is replica" case, rejected writes
will not be accounted on all replicas. In tests, the amount of accepted
operations was quite close to the limit and much less than the theoretical
`RF * limit`.
- Reads are less accurate because not all replicas may participate in a given
read operation (this depends on CL). In the worst case of CL=ONE and
round-robin strategy, up to `RF * limit` ops/s will be accepted. Higher
consistencies are counted better, e.g. CL=ALL - although they are also
susceptible to the inaccurracy introduced by "coordinator is replica" case.
- In case of non-shard-aware drivers, it is best to keep the clocks in sync.
When the coordinator is not a replica, each replica decides whether to accept
or not, based on the random number sent by coordinator. If the replicas have
their clocks in sync, then their per-partition counters should have close
values and they will agree on the decision whether to reject or not most of
the time. If not, they will disagree more frequently which will result in
wasted replica work and the effective rate limit will be lower or higher,
depending on the consistency. In the worst case, it might be 30% lower or
45% higher than the real limit.

View File

@@ -146,3 +146,37 @@ parameters:
the bit mask that should be used by the client to test against when checking
prepared statement metadata flags to see if the current query is conditional
or not.
## Rate limit error
This extension allows the driver to send a new type of error in case the operation
goes over the allowed per-partition rate limit. This kind of error does not fit
other existing error codes well, hence the need for the protocol extension.
On receiving this error, the driver should not retry the request; instead,
the error should be propagated to the user so that they can decide what to do
with it - sometimes it might make sense to propagate the error, in other cases
it might make sense to retry with backoff.
The body of the error consists of the usual error code, error message and then
the following fields: `<op_type><rejected_by_coordinator>`, where:
- `op_type` is a byte which identifies the operation which is the origin
of the rate limit.
- 0: read
- 1: write
- `rejected_by_coordinator` is a byte which is 1 if the operation was rejected
on the coordinator and 0 if it was rejected by replicas.
If the driver does not understand this extension and does not enable it,
the Config_error will be used instead of the new error code.
In order to be forward compatible with error codes added in the future protocol
versions, this extension doesn't reserve a fixed error code - instead, it
advertises the integer value used as the error code in the SUPPORTED response.
This extension is identified by the `SCYLLA_RATE_LIMIT_ERROR` key.
The string map in the SUPPORTED response will contain the following parameters:
- `ERROR_CODE`: a 32-bit signed decimal integer which Scylla
will use as the error code for the rate limit exception.

View File

@@ -31,7 +31,8 @@ namespace exceptions {
using coordinator_exception_container = utils::exception_container<
mutation_write_timeout_exception,
read_timeout_exception,
read_failure_exception
read_failure_exception,
rate_limit_exception
>;
template<typename T = void>

View File

@@ -39,7 +39,8 @@ const std::unordered_map<exception_code, sstring>& exception_map() {
{exception_code::INVALID, "invalid"},
{exception_code::CONFIG_ERROR, "config_error"},
{exception_code::ALREADY_EXISTS, "already_exists"},
{exception_code::UNPREPARED, "unprepared"}
{exception_code::UNPREPARED, "unprepared"},
{exception_code::RATE_LIMIT_ERROR, "rate_limit_error"}
};
return map;
}
@@ -77,6 +78,12 @@ overloaded_exception::overloaded_exception(size_t c) noexcept
: cassandra_exception(exception_code::OVERLOADED, prepare_message("Too many in flight hints: {}", c))
{}
rate_limit_exception::rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept
: cassandra_exception(exception_code::CONFIG_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type_, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
, op_type(op_type_)
, rejected_by_coordinator(rejected_by_coordinator_)
{ }
prepared_query_not_found_exception::prepared_query_not_found_exception(bytes id) noexcept
: request_validation_exception{exception_code::UNPREPARED, prepare_message("No prepared statement with ID {} found.", id)}
, id{id}

View File

@@ -12,6 +12,7 @@
#include "db/consistency_level_type.hh"
#include "db/write_type.hh"
#include "db/operation_type.hh"
#include <stdexcept>
#include <seastar/core/sstring.hh>
#include "bytes.hh"
@@ -42,7 +43,17 @@ enum class exception_code : int32_t {
INVALID = 0x2200,
CONFIG_ERROR = 0x2300,
ALREADY_EXISTS = 0x2400,
UNPREPARED = 0x2500
UNPREPARED = 0x2500,
// Scylla-specific error codes
// The error codes below are advertised to the drivers during connection
// handshake using the protocol extension negotiation, and are only
// enabled if the drivers explicitly enable them. Therefore it's perfectly
// fine to change them in case some new error codes are introduced
// in Cassandra.
// NOTE TO DRIVER DEVELOPERS: These constants must not be relied upon,
// they must be learned from protocol extensions instead.
RATE_LIMIT_ERROR = 0xF000
};
const std::unordered_map<exception_code, sstring>& exception_map();
@@ -183,6 +194,13 @@ struct overloaded_exception : public cassandra_exception {
cassandra_exception(exception_code::OVERLOADED, std::move(msg)) {}
};
struct rate_limit_exception : public cassandra_exception {
db::operation_type op_type;
bool rejected_by_coordinator;
rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept;
};
class request_validation_exception : public cassandra_exception {
public:
using cassandra_exception::cassandra_exception;

View File

@@ -108,6 +108,7 @@ public:
gms::feature tombstone_gc_options { *this, "TOMBSTONE_GC_OPTIONS"sv };
gms::feature parallelized_aggregation { *this, "PARALLELIZED_AGGREGATION"sv };
gms::feature keyspace_storage_options { *this, "KEYSPACE_STORAGE_OPTIONS"sv };
gms::feature typed_errors_in_read_rpc { *this, "TYPED_ERRORS_IN_READ_RPC"sv };
public:

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace db {
namespace per_partition_rate_limit {
struct account_only {};
struct account_and_enforce {
uint32_t random_variable;
};
// using info = std::variant<std::monostate, account_only, account_and_enforce>;
} // namespace per_partition_rate_limit
} // namespace db

View File

@@ -0,0 +1,25 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace replica {
struct unknown_exception {};
struct no_exception {};
class rate_limit_exception {
};
struct exception_variant {
std::variant<replica::unknown_exception,
replica::no_exception,
replica::rate_limit_exception
> reason;
};
}

View File

@@ -6,14 +6,14 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[version 1.3.0]]);
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]]);
verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]);
verb [[with_client_info, one_way]] mutation_failed (unsigned shard, uint64_t response_id, size_t num_failed, db::view::update_backlog backlog [[version 3.1.0]]);
verb [[with_client_info, one_way]] mutation_failed (unsigned shard, uint64_t response_id, size_t num_failed, db::view::update_backlog backlog [[version 3.1.0]], replica::exception_variant exception [[version 5.1.0]]);
verb [[with_client_info, with_timeout]] counter_mutation (std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info);
verb [[with_client_info, with_timeout, one_way]] hint_mutation (frozen_mutation fm, inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[version 1.3.0]] /* this verb was mistakenly introduced with optional trace_info */);
verb [[with_client_info, with_timeout]] read_data (query::read_command cmd, ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]]) -> query::result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]];
verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd, ::compat::wrapping_partition_range pr) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]];
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd, ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]];
verb [[with_client_info, with_timeout]] read_data (query::read_command cmd, ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]]) -> query::result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
verb [[with_client_info, with_timeout]] read_mutation_data (query::read_command cmd, ::compat::wrapping_partition_range pr) -> reconcilable_result [[lw_shared_ptr]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
verb [[with_client_info, with_timeout]] read_digest (query::read_command cmd, ::compat::wrapping_partition_range pr, query::digest_algorithm digest [[version 3.0.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]]) -> query::result_digest, api::timestamp_type [[version 1.2.0]], cache_temperature [[version 2.0.0]], replica::exception_variant [[version 5.1.0]];
verb [[with_timeout]] truncate (sstring, sstring);
verb [[with_client_info, with_timeout]] paxos_prepare (query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da, std::optional<tracing::trace_info> trace_info) -> service::paxos::prepare_response [[unique_ptr]];
verb [[with_client_info, with_timeout]] paxos_accept (service::paxos::proposal proposal [[ref]], std::optional<tracing::trace_info> trace_info) -> bool;

View File

@@ -86,6 +86,7 @@
#include "alternator/controller.hh"
#include "alternator/ttl.hh"
#include "tools/entry_point.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_group0_client.hh"
@@ -459,6 +460,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
ext->add_schema_extension<db::paxos_grace_seconds_extension>(db::paxos_grace_seconds_extension::NAME);
ext->add_schema_extension<tombstone_gc_extension>(tombstone_gc_extension::NAME);
ext->add_schema_extension<db::per_partition_rate_limit_extension>(db::per_partition_rate_limit_extension::NAME);
auto cfg = make_lw_shared<db::config>(ext);
auto init = app.get_options_description().add_options();

View File

@@ -42,7 +42,9 @@
#include "cache_temperature.hh"
#include "raft/raft.hh"
#include "service/raft/messaging.hh"
#include "replica/exceptions.hh"
#include "serializer.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
#include "idl/result.dist.hh"
@@ -67,6 +69,8 @@
#include "idl/raft_storage.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/group0.dist.hh"
#include "idl/replica_exception.dist.hh"
#include "idl/per_partition_rate_limit_info.dist.hh"
#include "idl/storage_proxy.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
@@ -94,6 +98,8 @@
#include "idl/raft.dist.impl.hh"
#include "idl/group0.dist.impl.hh"
#include "idl/view.dist.impl.hh"
#include "idl/replica_exception.dist.impl.hh"
#include "idl/per_partition_rate_limit_info.dist.impl.hh"
#include "idl/storage_proxy.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>

View File

@@ -18,6 +18,7 @@
#include "tracing/tracing.hh"
#include "utils/small_vector.hh"
#include "query_class_config.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "bytes.hh"
@@ -298,6 +299,7 @@ public:
std::optional<query::max_result_size> max_result_size;
uint32_t row_limit_high_bits;
api::timestamp_type read_timestamp; // not serialized
db::allow_per_partition_rate_limit allow_limit; // not serialized
public:
// IDL constructor
read_command(utils::UUID cf_id,
@@ -323,6 +325,7 @@ public:
, max_result_size(max_result_size)
, row_limit_high_bits(row_limit_high_bits)
, read_timestamp(api::new_timestamp())
, allow_limit(db::allow_per_partition_rate_limit::no)
{ }
read_command(utils::UUID cf_id,
@@ -335,7 +338,8 @@ public:
std::optional<tracing::trace_info> ti = std::nullopt,
utils::UUID query_uuid = utils::UUID(),
query::is_first_page is_first_page = query::is_first_page::no,
api::timestamp_type rt = api::new_timestamp())
api::timestamp_type rt = api::new_timestamp(),
db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no)
: cf_id(std::move(cf_id))
, schema_version(std::move(schema_version))
, slice(std::move(slice))
@@ -348,6 +352,7 @@ public:
, max_result_size(max_result_size)
, row_limit_high_bits(static_cast<uint32_t>(static_cast<uint64_t>(row_limit) >> 32))
, read_timestamp(rt)
, allow_limit(allow_limit)
{ }

View File

@@ -50,7 +50,7 @@ future<> write_hashes(service::storage_proxy& proxy, redis::redis_options& optio
m.set_clustered_cell(ckey, column, std::move(cell));
auto write_consistency_level = options.get_write_consistency_level();
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
@@ -68,7 +68,7 @@ future<> write_strings(service::storage_proxy& proxy, redis::redis_options& opti
db::timeout_clock::time_point timeout = db::timeout_clock::now() + options.get_write_timeout();
auto m = make_mutation(proxy, options, std::move(key), std::move(data), ttl);
auto write_consistency_level = options.get_write_consistency_level();
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
@@ -87,7 +87,7 @@ future<> delete_objects(service::storage_proxy& proxy, redis::redis_options& opt
auto remove = [&proxy, timeout, write_consistency_level, permit, &options, keys = std::move(keys)] (const sstring& cf_name) {
return parallel_for_each(keys.begin(), keys.end(), [&proxy, timeout, write_consistency_level, &options, permit, cf_name] (const bytes& key) {
auto m = make_tombstone(proxy, options, cf_name, key);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(std::vector<mutation> {std::move(m)}, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
});
};
return parallel_for_each(tables.begin(), tables.end(), remove);
@@ -107,7 +107,7 @@ future<> delete_fields(service::storage_proxy& proxy, redis::redis_options& opti
m.partition().apply_delete(*schema, ckey, tombstone { ts, clk });
mutations.push_back(m);
}
return proxy.mutate(mutations, write_consistency_level, timeout, nullptr, permit);
return proxy.mutate(mutations, write_consistency_level, timeout, nullptr, permit, db::allow_per_partition_rate_limit::yes);
}
}

View File

@@ -8,6 +8,7 @@
#include "redis/query_utils.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "redis/options.hh"
#include "timeout_config.hh"
#include "service/client_state.hh"

View File

@@ -7,6 +7,7 @@
*/
#include "log.hh"
#include "replica/database_fwd.hh"
#include "utils/lister.hh"
#include "replica/database.hh"
#include <seastar/core/future-util.hh>
@@ -42,6 +43,7 @@
#include "gms/feature_service.hh"
#include "timeout_config.hh"
#include "service/storage_proxy.hh"
#include "db/operation_type.hh"
#include "utils/human_readable.hh"
#include "utils/fb_utilities.hh"
@@ -62,6 +64,7 @@
#include "tombstone_gc.hh"
#include "replica/data_dictionary_impl.hh"
#include "replica/exceptions.hh"
#include "readers/multi_range.hh"
#include "readers/multishard.hh"
@@ -540,6 +543,9 @@ database::setup_metrics() {
sm::make_counter("total_writes_timedout", _stats->total_writes_timedout,
sm::description("Counts write operations failed due to a timeout. A positive value is a sign of storage being overloaded.")),
sm::make_counter("total_writes_rate_limited", _stats->total_writes_rate_limited,
sm::description("Counts write operations which were rejected on the replica side because the per-partition limit was reached.")),
sm::make_counter("total_reads", _read_concurrency_sem.get_stats().total_successful_reads,
sm::description("Counts the total number of successful user reads on this shard."),
{user_label_instance}),
@@ -558,6 +564,9 @@ database::setup_metrics() {
"Add the total_reads to this value to get the total amount of reads issued on this shard."),
{system_label_instance}),
sm::make_counter("total_reads_rate_limited", _stats->total_reads_rate_limited,
sm::description("Counts read operations which were rejected on the replica side because the per-partition limit was reached.")),
sm::make_current_bytes("view_update_backlog", [this] { return get_view_update_backlog().current; },
sm::description("Holds the current size in bytes of the pending view updates for all tables")),
@@ -1283,15 +1292,103 @@ database::existing_index_names(const sstring& ks_name, const sstring& cf_to_excl
return names;
}
namespace {
enum class request_class {
user,
system,
maintenance,
};
request_class classify_request(const database_config& _dbcfg) {
const auto current_group = current_scheduling_group();
// Everything running in the statement group is considered a user request
if (current_group == _dbcfg.statement_scheduling_group) {
return request_class::user;
// System requests run in the default (main) scheduling group
// All requests executed on behalf of internal work also uses the system semaphore
} else if (current_group == default_scheduling_group()
|| current_group == _dbcfg.compaction_scheduling_group
|| current_group == _dbcfg.gossip_scheduling_group
|| current_group == _dbcfg.memory_compaction_scheduling_group
|| current_group == _dbcfg.memtable_scheduling_group
|| current_group == _dbcfg.memtable_to_cache_scheduling_group) {
return request_class::system;
// Requests done on behalf of view update generation run in the streaming group
} else if (current_scheduling_group() == _dbcfg.streaming_scheduling_group) {
return request_class::maintenance;
// Everything else is considered a user request
} else {
return request_class::user;
}
}
} // anonymous namespace
static bool can_apply_per_partition_rate_limit(const schema& s, const database_config& dbcfg, db::operation_type op_type) {
return s.per_partition_rate_limit_options().get_max_ops_per_second(op_type).has_value()
&& classify_request(dbcfg) == request_class::user;
}
bool database::can_apply_per_partition_rate_limit(const schema& s, db::operation_type op_type) const {
return replica::can_apply_per_partition_rate_limit(s, _dbcfg, op_type);
}
std::optional<db::rate_limiter::can_proceed> database::account_coordinator_operation_to_rate_limit(table& tbl, const dht::token& token,
db::per_partition_rate_limit::account_and_enforce account_and_enforce_info,
db::operation_type op_type) {
std::optional<uint32_t> table_limit = tbl.schema()->per_partition_rate_limit_options().get_max_ops_per_second(op_type);
db::rate_limiter::label& lbl = tbl.get_rate_limiter_label_for_op_type(op_type);
return _rate_limiter.account_operation(lbl, dht::token::to_int64(token), *table_limit, account_and_enforce_info);
}
static db::rate_limiter::can_proceed account_singular_ranges_to_rate_limit(
db::rate_limiter& limiter, column_family& cf,
const dht::partition_range_vector& ranges,
const database_config& dbcfg,
db::per_partition_rate_limit::info rate_limit_info) {
using can_proceed = db::rate_limiter::can_proceed;
if (std::holds_alternative<std::monostate>(rate_limit_info) || !can_apply_per_partition_rate_limit(*cf.schema(), dbcfg, db::operation_type::read)) {
// Rate limiting is disabled for this query
return can_proceed::yes;
}
auto table_limit = *cf.schema()->per_partition_rate_limit_options().get_max_reads_per_second();
can_proceed ret = can_proceed::yes;
auto& read_label = cf.get_rate_limiter_label_for_reads();
for (const auto& range : ranges) {
if (!range.is_singular()) {
continue;
}
auto token = dht::token::to_int64(ranges.front().start()->value().token());
if (limiter.account_operation(read_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
// Don't return immediately - account all ranges first
ret = can_proceed::no;
}
}
return ret;
}
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
const auto reversed = cmd.slice.is_reversed();
if (reversed) {
s = s->make_reversed();
}
column_family& cf = find_column_family(cmd.cf_id);
if (account_singular_ranges_to_rate_limit(_rate_limiter, cf, ranges, _dbcfg, rate_limit_info) == db::rate_limiter::can_proceed::no) {
++_stats->total_reads_rate_limited;
co_await coroutine::return_exception(replica::rate_limit_exception());
}
auto& semaphore = get_reader_concurrency_semaphore();
auto max_result_size = cmd.max_result_size ? *cmd.max_result_size : get_unlimited_query_max_result_size();
@@ -1404,56 +1501,22 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
co_return std::tuple(std::move(result), hit_rate);
}
namespace {
enum class query_class {
user,
system,
maintenance,
};
query_class classify_query(const database_config& _dbcfg) {
const auto current_group = current_scheduling_group();
// Everything running in the statement group is considered a user query
if (current_group == _dbcfg.statement_scheduling_group) {
return query_class::user;
// System queries run in the default (main) scheduling group
// All queries executed on behalf of internal work also uses the system semaphore
} else if (current_group == default_scheduling_group()
|| current_group == _dbcfg.compaction_scheduling_group
|| current_group == _dbcfg.gossip_scheduling_group
|| current_group == _dbcfg.memory_compaction_scheduling_group
|| current_group == _dbcfg.memtable_scheduling_group
|| current_group == _dbcfg.memtable_to_cache_scheduling_group) {
return query_class::system;
// Reads done on behalf of view update generation run in the streaming group
} else if (current_scheduling_group() == _dbcfg.streaming_scheduling_group) {
return query_class::maintenance;
// Everything else is considered a user query
} else {
return query_class::user;
}
}
} // anonymous namespace
query::max_result_size database::get_unlimited_query_max_result_size() const {
switch (classify_query(_dbcfg)) {
case query_class::user:
switch (classify_request(_dbcfg)) {
case request_class::user:
return query::max_result_size(_cfg.max_memory_for_unlimited_query_soft_limit(), _cfg.max_memory_for_unlimited_query_hard_limit());
case query_class::system: [[fallthrough]];
case query_class::maintenance:
case request_class::system: [[fallthrough]];
case request_class::maintenance:
return query::max_result_size(query::result_memory_limiter::unlimited_result_size);
}
std::abort();
}
reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() {
switch (classify_query(_dbcfg)) {
case query_class::user: return _read_concurrency_sem;
case query_class::system: return _system_read_concurrency_sem;
case query_class::maintenance: return _streaming_concurrency_sem;
switch (classify_request(_dbcfg)) {
case request_class::user: return _read_concurrency_sem;
case request_class::system: return _system_read_concurrency_sem;
case request_class::maintenance: return _streaming_concurrency_sem;
}
std::abort();
}
@@ -1769,13 +1832,22 @@ future<> database::apply_with_commitlog(column_family& cf, const mutation& m, db
}
}
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync) {
future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog::force_sync sync, db::per_partition_rate_limit::info rate_limit_info) {
// I'm doing a nullcheck here since the init code path for db etc
// is a little in flux and commitlog is created only when db is
// initied from datadir.
auto uuid = m.column_family_id();
auto& cf = find_column_family(uuid);
if (!std::holds_alternative<std::monostate>(rate_limit_info) && can_apply_per_partition_rate_limit(*s, db::operation_type::write)) {
auto table_limit = *s->per_partition_rate_limit_options().get_max_writes_per_second();
auto& write_label = cf.get_rate_limiter_label_for_writes();
auto token = dht::token::to_int64(dht::get_token(*s, m.key()));
if (_rate_limiter.account_operation(write_label, token, table_limit, rate_limit_info) == db::rate_limiter::can_proceed::no) {
co_await coroutine::return_exception(replica::rate_limit_exception());
}
}
sync = sync || db::commitlog::force_sync(s->wait_for_sync_to_commitlog());
// Signal to view building code that a write is in progress,
@@ -1819,6 +1891,12 @@ Future database::update_write_metrics(Future&& f) {
if (is_timeout_exception(ep)) {
++s->total_writes_timedout;
}
try {
std::rethrow_exception(ep);
} catch (replica::rate_limit_exception&) {
++s->total_writes_rate_limited;
} catch (...) {
}
return futurize<Future>::make_exception_future(std::move(ep));
}
++s->total_writes;
@@ -1832,7 +1910,7 @@ void database::update_write_metrics_for_timed_out_write() {
++_stats->total_writes_timedout;
}
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout) {
future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info) {
if (dblog.is_enabled(logging::log_level::trace)) {
dblog.trace("apply {}", m.pretty_printer(s));
}
@@ -1843,7 +1921,7 @@ future<> database::apply(schema_ptr s, const frozen_mutation& m, tracing::trace_
if (!s->is_synced()) {
on_internal_error(dblog, format("attempted to apply mutation using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync));
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, sync, rate_limit_info));
}
future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout) {
@@ -1854,7 +1932,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no));
return update_write_metrics(_apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{}));
});
}
@@ -1933,6 +2011,14 @@ std::ostream& operator<<(std::ostream& os, db::consistency_level cl) {
}
}
std::ostream& operator<<(std::ostream& os, operation_type op_type) {
switch (op_type) {
case operation_type::read: return os << "read";
case operation_type::write: return os << "write";
}
abort();
}
}
std::ostream&

View File

@@ -66,6 +66,9 @@
#include "absl-flat_hash_map.hh"
#include "utils/cross-shard-barrier.hh"
#include "sstables/generation_type.hh"
#include "db/rate_limiter.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "db/operation_type.hh"
class cell_locker;
class cell_locker_stats;
@@ -452,6 +455,11 @@ private:
std::vector<view_ptr> _views;
std::unique_ptr<cell_locker> _counter_cell_locks; // Memory-intensive; allocate only when needed.
// Labels used to identify writes and reads for this table in the rate_limiter structure.
db::rate_limiter::label _rate_limiter_label_for_writes;
db::rate_limiter::label _rate_limiter_label_for_reads;
void set_metrics();
seastar::metrics::metric_groups _metrics;
@@ -744,6 +752,23 @@ public:
return _cache;
}
db::rate_limiter::label& get_rate_limiter_label_for_op_type(db::operation_type op_type) {
switch (op_type) {
case db::operation_type::write:
return _rate_limiter_label_for_writes;
case db::operation_type::read:
return _rate_limiter_label_for_reads;
}
}
db::rate_limiter::label& get_rate_limiter_label_for_writes() {
return _rate_limiter_label_for_writes;
}
db::rate_limiter::label& get_rate_limiter_label_for_reads() {
return _rate_limiter_label_for_reads;
}
future<std::vector<locked_cell>> lock_counter_cells(const mutation& m, db::timeout_clock::time_point timeout);
logalloc::occupancy_stats occupancy() const;
@@ -1252,8 +1277,10 @@ private:
uint64_t total_writes = 0;
uint64_t total_writes_failed = 0;
uint64_t total_writes_timedout = 0;
uint64_t total_writes_rate_limited = 0;
uint64_t total_reads = 0;
uint64_t total_reads_failed = 0;
uint64_t total_reads_rate_limited = 0;
uint64_t short_data_queries = 0;
uint64_t short_mutation_queries = 0;
@@ -1294,7 +1321,8 @@ private:
const frozen_mutation&,
tracing::trace_state_ptr,
db::timeout_clock::time_point,
db::commitlog_force_sync> _apply_stage;
db::commitlog_force_sync,
db::per_partition_rate_limit::info> _apply_stage;
flat_hash_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
@@ -1332,6 +1360,8 @@ private:
std::unique_ptr<wasm::engine> _wasm_engine;
utils::cross_shard_barrier _stop_barrier;
db::rate_limiter _rate_limiter;
public:
data_dictionary::database as_data_dictionary() const;
std::shared_ptr<data_dictionary::user_types_storage> as_user_types_storage() const noexcept;
@@ -1361,7 +1391,7 @@ private:
void setup_metrics();
void setup_scylla_memory_diagnostics_producer();
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync);
future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
@@ -1485,14 +1515,28 @@ public:
future<> stop();
future<> close_tables(table_kind kind_to_close);
/// Checks whether per-partition rate limit can be applied to the operation or not.
bool can_apply_per_partition_rate_limit(const schema& s, db::operation_type op_type) const;
/// Tries to account given operation to the rate limit when the coordinator is a replica.
/// This function can be called ONLY when rate limiting can be applied to the operation (see `can_apply_per_partition_rate_limit`)
/// AND the current node/shard is a replica for the given operation.
///
/// nullopt -> the decision should be delegated to replicas
/// can_proceed::no -> operation should be rejected
/// can_proceed::yes -> operation should be accepted
std::optional<db::rate_limiter::can_proceed> account_coordinator_operation_to_rate_limit(table& tbl, const dht::token& token,
db::per_partition_rate_limit::account_and_enforce account_and_enforce_info,
db::operation_type op_type);
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> query(schema_ptr, const query::read_command& cmd, query::result_options opts,
const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout);
db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
future<std::tuple<reconcilable_result, cache_temperature>> query_mutations(schema_ptr, const query::read_command& cmd, const dht::partition_range& range,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
// Apply the mutation atomically.
// Throws timed_out_error when timeout is reached.
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout);
future<> apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::commitlog_force_sync sync, db::timeout_clock::time_point timeout, db::per_partition_rate_limit::info rate_limit_info = std::monostate{});
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);

40
replica/exceptions.cc Normal file
View File

@@ -0,0 +1,40 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <concepts>
#include <sstream>
#include <stdexcept>
#include <type_traits>
#include "replica/exceptions.hh"
#include "utils/exceptions.hh"
namespace replica {
exception_variant try_encode_replica_exception(std::exception_ptr eptr) {
try {
std::rethrow_exception(std::move(eptr));
} catch (rate_limit_exception&) {
return rate_limit_exception();
} catch (...) {
return no_exception{};
}
}
std::exception_ptr exception_variant::into_exception_ptr() noexcept {
return std::visit([] <typename Ex> (Ex&& ex) {
if constexpr (std::is_same_v<Ex, unknown_exception>) {
return std::make_exception_ptr(std::runtime_error("unknown exception"));
} else {
return std::make_exception_ptr(std::move(ex));
}
}, std::move(reason));
}
}

73
replica/exceptions.hh Normal file
View File

@@ -0,0 +1,73 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <exception>
#include <optional>
#include <variant>
#include "seastar/core/sstring.hh"
#include "seastar/core/timed_out_error.hh"
#include "utils/exception_container.hh"
#include "utils/result.hh"
namespace replica {
// A marker indicating that the exception_variant holds an unknown exception.
// For example, replica sends a new type of error and coordinator does not
// understand it because it wasn't upgraded to a newer version yet.
struct unknown_exception {};
// A marker indicating that the exception variant doesn't hold any exception.
struct no_exception {};
class replica_exception : public std::exception {
public:
replica_exception() noexcept {};
};
class rate_limit_exception final : public replica_exception {
public:
rate_limit_exception() noexcept
: replica_exception()
{ }
virtual const char* what() const noexcept override { return "rate limit exceeded"; }
};
struct exception_variant {
std::variant<unknown_exception,
no_exception,
rate_limit_exception
> reason;
exception_variant()
: reason(no_exception{})
{ }
template<typename Ex>
exception_variant(Ex&& ex)
: reason(std::move(ex))
{ }
std::exception_ptr into_exception_ptr() noexcept;
inline operator bool() const noexcept {
return !std::holds_alternative<no_exception>(reason);
}
};
// Tries to encode the exception into an exception_variant.
// If given exception cannot be encoded into one of the replica exception types,
// returns no_exception.
exception_variant try_encode_replica_exception(std::exception_ptr eptr);
}

View File

@@ -31,6 +31,7 @@
#include "db/paxos_grace_seconds_extension.hh"
#include "utils/rjson.hh"
#include "tombstone_gc_options.hh"
#include "db/per_partition_rate_limit_extension.hh"
constexpr int32_t schema::NAME_LENGTH;
@@ -1269,6 +1270,12 @@ schema_ptr schema_builder::build() {
dynamic_pointer_cast<db::paxos_grace_seconds_extension>(it->second)->get_paxos_grace_seconds();
}
// cache the `per_partition_rate_limit` parameters for fast access through the schema object.
if (auto it = new_raw._extensions.find(db::per_partition_rate_limit_extension::NAME); it != new_raw._extensions.end()) {
new_raw._per_partition_rate_limit_options =
dynamic_pointer_cast<db::per_partition_rate_limit_extension>(it->second)->get_options();
}
return make_lw_shared<schema>(schema::private_tag{}, new_raw, _view_info);
}
@@ -1302,6 +1309,11 @@ schema_builder& schema_builder::with_tombstone_gc_options(const tombstone_gc_opt
return *this;
}
schema_builder& schema_builder::with_per_partition_rate_limit_options(const db::per_partition_rate_limit_options& opts) {
add_extension(db::per_partition_rate_limit_extension::NAME, ::make_shared<db::per_partition_rate_limit_extension>(opts));
return *this;
}
schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) {
add_extension(db::paxos_grace_seconds_extension::NAME, ::make_shared<db::paxos_grace_seconds_extension>(seconds));
return *this;

View File

@@ -29,6 +29,7 @@
#include "column_computation.hh"
#include "timestamp.hh"
#include "tombstone_gc_options.hh"
#include "db/per_partition_rate_limit_options.hh"
namespace dht {
@@ -621,6 +622,7 @@ private:
double _dc_local_read_repair_chance = 0.0;
double _read_repair_chance = 0.0;
double _crc_check_chance = 1;
db::per_partition_rate_limit_options _per_partition_rate_limit_options;
int32_t _min_compaction_threshold = DEFAULT_MIN_COMPACTION_THRESHOLD;
int32_t _max_compaction_threshold = DEFAULT_MAX_COMPACTION_THRESHOLD;
int32_t _min_index_interval = DEFAULT_MIN_INDEX_INTERVAL;
@@ -813,6 +815,10 @@ public:
const ::tombstone_gc_options& tombstone_gc_options() const;
const db::per_partition_rate_limit_options& per_partition_rate_limit_options() const {
return _raw._per_partition_rate_limit_options;
}
const ::speculative_retry& speculative_retry() const {
return _raw._speculative_retry;
}

View File

@@ -14,6 +14,10 @@
#include "dht/i_partitioner.hh"
#include "tombstone_gc_options.hh"
namespace db {
class per_partition_rate_limit_options;
}
struct schema_builder {
public:
enum class compact_storage { no, yes };
@@ -280,6 +284,7 @@ public:
schema_builder& with_cdc_options(const cdc::options&);
schema_builder& with_tombstone_gc_options(const tombstone_gc_options& opts);
schema_builder& with_per_partition_rate_limit_options(const db::per_partition_rate_limit_options&);
default_names get_default_names() const {
return default_names(_raw);

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,8 @@
#include "partition_range_compat.hh"
#include "exceptions/exceptions.hh"
#include "exceptions/coordinator_result.hh"
#include "replica/exceptions.hh"
#include "db/per_partition_rate_limit_info.hh"
class reconcilable_result;
class frozen_mutation_and_schema;
@@ -126,6 +128,7 @@ public:
NONE,
TIMEOUT,
FAILURE,
RATE_LIMIT,
};
template<typename T = void>
using result = exceptions::coordinator_result<T>;
@@ -268,6 +271,7 @@ private:
tracing::trace_state_ptr,
service_permit,
bool,
db::allow_per_partition_rate_limit,
lw_shared_ptr<cdc::operation_result_tracker>> _mutate_stage;
netw::connection_drop_slot_t _connection_dropped;
netw::connection_drop_registration_t _condrop_registration;
@@ -307,18 +311,18 @@ private:
void got_failure_response(response_id_type id, gms::inet_address from, size_t count, std::optional<db::view::update_backlog> backlog, error err, std::optional<sstring> msg);
future<result<>> response_wait(response_id_type id, clock_type::time_point timeout);
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
response_id_type create_write_response_handler_helper(schema_ptr s, const dht::token& token,
result<response_id_type> create_write_response_handler_helper(schema_ptr s, const dht::token& token,
std::unique_ptr<mutation_holder> mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state,
service_permit permit);
response_id_type create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit);
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(replica::keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, inet_address_vector_replica_set targets,
const inet_address_vector_topology_change& pending_endpoints, inet_address_vector_topology_change, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats, service_permit permit, db::per_partition_rate_limit::info rate_limit_info);
result<response_id_type> create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
result<response_id_type> create_write_response_handler(const std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, dht::token, inet_address_vector_replica_set>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
void register_cdc_operation_result_tracker(const storage_proxy::unique_response_handler_vector& ids, lw_shared_ptr<cdc::operation_result_tracker> tracker);
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
template<typename Range>
@@ -331,7 +335,7 @@ private:
static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps);
inet_address_vector_replica_set get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const;
db::read_repair_decision new_read_repair_decision(const schema& s);
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd,
result<::shared_ptr<abstract_read_executor>> get_read_executor(lw_shared_ptr<query::read_command> cmd,
schema_ptr schema,
dht::partition_range pr,
db::consistency_level cl,
@@ -343,11 +347,13 @@ private:
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
query::result_options opts,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout);
clock_type::time_point timeout,
db::per_partition_rate_limit::info rate_limit_info);
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>> query_result_local_digest(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
query::digest_algorithm da);
query::digest_algorithm da,
db::per_partition_rate_limit::info rate_limit_info);
future<result<coordinator_query_result>> query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
@@ -376,9 +382,9 @@ private:
db::consistency_level cl,
coordinator_query_options optional_params);
template<typename Range, typename CreateWriteHandler>
future<unique_response_handler_vector> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler handler);
template<typename Range>
future<unique_response_handler_vector> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
future<result<unique_response_handler_vector>> mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit);
future<result<>> mutate_begin(unique_response_handler_vector ids, db::consistency_level cl, tracing::trace_state_ptr trace_state, std::optional<clock_type::time_point> timeout_opt = { });
future<result<>> mutate_end(future<result<>> mutate_result, utils::latency_counter, write_stats& stats, tracing::trace_state_ptr trace_state);
future<result<>> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::optional<mutation>>> diffs, db::consistency_level cl, tracing::trace_state_ptr trace_state, service_permit permit);
@@ -386,7 +392,7 @@ private:
void unthrottle();
void handle_read_error(std::variant<exceptions::coordinator_exception_container, std::exception_ptr> failure, bool range);
template<typename Range>
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { });
future<result<>> mutate_internal(Range mutations, db::consistency_level cl, bool counter_write, tracing::trace_state_ptr tr_state, service_permit permit, std::optional<clock_type::time_point> timeout_opt = { }, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker = { }, db::allow_per_partition_rate_limit allow_limit = db::allow_per_partition_rate_limit::no);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_nonsingular_mutations_locally(
schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range_vector&& pr, tracing::trace_state_ptr trace_state,
clock_type::time_point timeout);
@@ -401,7 +407,7 @@ private:
gms::inet_address find_leader_for_counter_update(const mutation& m, db::consistency_level cl);
future<result<>> do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker);
future<result<>> do_mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool, db::allow_per_partition_rate_limit allow_limit, lw_shared_ptr<cdc::operation_result_tracker> cdc_tracker);
future<> send_to_endpoint(
std::unique_ptr<mutation_holder> m,
@@ -424,21 +430,24 @@ private:
void retire_view_response_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
void connection_dropped(gms::inet_address);
private:
template<typename... Elements>
future<rpc::tuple<Elements..., replica::exception_variant>> encode_replica_exception_for_rpc(future<rpc::tuple<Elements...>>&& f, auto&& default_tuple_maker);
future<> handle_counter_mutation(const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info);
future<rpc::no_wait_type> handle_write(netw::msg_addr src_addr, rpc::opt_time_point t,
utils::UUID schema_version, auto in, inet_address_vector_replica_set forward, gms::inet_address reply_to,
unsigned shard, storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info,
auto&& apply_fn, auto&& forward_fn);
future<rpc::no_wait_type> receive_mutation_handler (smp_service_group smp_grp, const rpc::client_info& cinfo, rpc::opt_time_point t, frozen_mutation in, inet_address_vector_replica_set forward,
gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info);
gms::inet_address reply_to, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt);
future<rpc::no_wait_type> handle_paxos_learn(const rpc::client_info& cinfo, rpc::opt_time_point t, paxos::proposal decision,
inet_address_vector_replica_set forward, gms::inet_address reply_to, unsigned shard,
storage_proxy::response_id_type response_id, std::optional<tracing::trace_info> trace_info);
future<rpc::no_wait_type> handle_mutation_done(const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, rpc::optional<db::view::update_backlog> backlog);
future<rpc::no_wait_type> handle_mutation_failed(const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, size_t num_failed, rpc::optional<db::view::update_backlog> backlog);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> handle_read_data(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> handle_read_mutation_data(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr);
future<rpc::tuple<query::result_digest, long, cache_temperature>> handle_read_digest(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda);
future<rpc::no_wait_type> handle_mutation_failed(const rpc::client_info& cinfo, unsigned shard, storage_proxy::response_id_type response_id, size_t num_failed, rpc::optional<db::view::update_backlog> backlog, rpc::optional<replica::exception_variant> exception);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, replica::exception_variant>> handle_read_data(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt);
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature, replica::exception_variant>> handle_read_mutation_data(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr);
future<rpc::tuple<query::result_digest, long, cache_temperature, replica::exception_variant>> handle_read_digest(const rpc::client_info& cinfo, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda, rpc::optional<db::per_partition_rate_limit::info> rate_limit_info_opt);
future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname);
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>> handle_paxos_prepare(const rpc::client_info& cinfo, rpc::opt_time_point timeout,
query::read_command cmd, partition_key key, utils::UUID ballot, bool only_digest, query::digest_algorithm da,
@@ -495,29 +504,29 @@ public:
private:
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp);
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout,
smp_service_group smp_grp);
smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);
// Applies mutations on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, smp_service_group smp_grp);
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout, smp_service_group smp_grp, db::per_partition_rate_limit::info rate_limit_info);
public:
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max()) {
return mutate_locally(m, tr_state, sync, timeout, _write_smp_service_group);
future<> mutate_locally(const mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) {
return mutate_locally(m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info);
}
// Applies mutation on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max()) {
return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group);
future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) {
return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info);
}
// Applies mutations on this node.
// Resolves with timed_out_error when timeout is reached.
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
future<> mutate_locally(std::vector<mutation> mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate());
future<> mutate_hint(const schema_ptr&, const frozen_mutation& m, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max());
@@ -531,14 +540,14 @@ public:
* @param consistency_level the consistency level for the operation
* @param tr_state trace state handle
*/
future<> mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
future<> mutate(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
/**
* See mutate. Does the same, but returns some exceptions
* through the result<>, which allows for efficient inspection
* of the exception on the exception handling path.
*/
future<result<>> mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
future<result<>> mutate_result(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
paxos_participants
get_paxos_participants(const sstring& ks_name, const dht::token& token, db::consistency_level consistency_for_paxos);
@@ -547,7 +556,8 @@ public:
clock_type::time_point timeout, service_permit permit);
future<result<>> mutate_with_triggers(std::vector<mutation> mutations, db::consistency_level cl, clock_type::time_point timeout,
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit, bool raw_counters = false);
bool should_mutate_atomically, tracing::trace_state_ptr tr_state, service_permit permit,
db::allow_per_partition_rate_limit allow_limit, bool raw_counters = false);
/**
* See mutate. Adds additional steps before and after writing a batch.

View File

@@ -77,6 +77,8 @@ struct write_stats {
utils::timed_rate_moving_average write_unavailables;
utils::timed_rate_moving_average write_timeouts;
utils::timed_rate_moving_average write_rate_limited_by_replicas;
utils::timed_rate_moving_average write_rate_limited_by_coordinator;
utils::timed_rate_moving_average_and_histogram write;
utils::time_estimated_histogram estimated_write;
@@ -127,6 +129,8 @@ struct stats : public write_stats {
seastar::metrics::metric_groups _metrics;
utils::timed_rate_moving_average read_timeouts;
utils::timed_rate_moving_average read_unavailables;
utils::timed_rate_moving_average read_rate_limited_by_replicas;
utils::timed_rate_moving_average read_rate_limited_by_coordinator;
utils::timed_rate_moving_average range_slice_timeouts;
utils::timed_rate_moving_average range_slice_unavailables;

View File

@@ -0,0 +1,109 @@
#include <chrono>
#include <cstdint>
#include <seastar/core/coroutine.hh>
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "mutation.hh"
#include "service/storage_proxy.hh"
SEASTAR_TEST_CASE(test_internal_operation_filtering) {
return do_with_cql_env_thread([] (cql_test_env& e) -> future<> {
// The test requires at least two shards
// so that it can test the shard!=coordinator case
BOOST_REQUIRE_GT(smp::count, 1);
cquery_nofail(e, "CREATE TABLE ks.tbl (pk int PRIMARY KEY) \
WITH per_partition_rate_limit = {'max_reads_per_second': 1, 'max_writes_per_second': 1}");
auto& db = e.db();
auto& qp = e.qp();
const auto sptr = db.local().find_schema("ks", "tbl");
auto pk = partition_key::from_singular(*sptr, int32_t(0));
unsigned local_shard = dht::shard_of(*sptr, dht::get_token(*sptr, pk.view()));
unsigned foreign_shard = (local_shard + 1) % smp::count;
auto run_writes = [&qp, &db, pk] (db::allow_per_partition_rate_limit allow_limit) -> future<> {
BOOST_TEST_MESSAGE("Testing writes");
const auto sptr = db.local().find_schema("ks", "tbl");
auto m = mutation(sptr, partition_key(pk));
// Rejection is probabilistic, so try many times
for (int i = 0; i < 100; i++) {
qp.local().proxy().mutate({m},
db::consistency_level::ALL,
service::storage_proxy::clock_type::now() + std::chrono::seconds(10),
nullptr,
empty_service_permit(),
allow_limit).get();
}
return make_ready_future<>();
};
auto run_reads = [&qp, &db, pk] (db::allow_per_partition_rate_limit allow_limit) -> future<> {
BOOST_TEST_MESSAGE("Testing reads");
const auto sptr = db.local().find_schema("ks", "tbl");
auto pk_def = sptr->get_column_definition("pk");
auto dk = dht::decorate_key(*sptr, partition_key(pk));
auto selection = cql3::selection::selection::for_columns(sptr, {pk_def});
auto opts = selection->get_query_options();
auto partition_slice = query::partition_slice(
{query::clustering_range::make_open_ended_both_sides()}, {}, {}, std::move(opts));
auto cmd = make_lw_shared<query::read_command>(sptr->id(), sptr->version(), partition_slice, query::max_result_size(1), query::row_limit(1));
cmd->allow_limit = allow_limit;
// Rejection is probabilistic, so try many times
for (int i = 0; i < 100; i++) {
qp.local().proxy().query(sptr,
cmd,
{dht::partition_range(dk)},
db::consistency_level::ALL,
service::storage_proxy::coordinator_query_options(
db::timeout_clock::now() + std::chrono::seconds(10),
empty_service_permit(),
service::client_state::for_internal_calls())).get();
}
return make_ready_future<>();
};
auto sgroups = get_scheduling_groups().get();
for (unsigned shard : {local_shard, foreign_shard}) {
for (scheduling_group sg : {sgroups.statement_scheduling_group, sgroups.streaming_scheduling_group}) {
for (db::allow_per_partition_rate_limit allow_limit : {db::allow_per_partition_rate_limit::yes, db::allow_per_partition_rate_limit::no}) {
// Rate limiting must be explicitly enabled and handled on the correct scheduling group.
const bool expect_limiting = (sg == sgroups.statement_scheduling_group) && bool(allow_limit);
BOOST_TEST_MESSAGE(format("Test config, shard: {}, scheduling_group: {}, allow_limit: {}, expect_limiting: {}",
(shard == local_shard) ? "local" : "foreign",
(sg == sgroups.statement_scheduling_group) ? "statement" : "streaming",
allow_limit,
expect_limiting));
smp::submit_to(shard, [&] () mutable {
return seastar::async(thread_attributes{sg}, [&] {
if (expect_limiting) {
BOOST_REQUIRE_THROW(run_writes(allow_limit).get(), exceptions::rate_limit_exception);
BOOST_REQUIRE_THROW(run_reads(allow_limit).get(), exceptions::rate_limit_exception);
} else {
BOOST_REQUIRE_NO_THROW(run_writes(allow_limit).get());
BOOST_REQUIRE_NO_THROW(run_reads(allow_limit).get());
}
});
}).get();
}
}
}
return make_ready_future<>();
});
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <chrono>
#include <cstdint>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/later.hh>
#include <seastar/testing/test_case.hh>
#include "db/rate_limiter.hh"
using namespace seastar;
using test_rate_limiter = db::generic_rate_limiter<seastar::manual_clock>;
future<> step_seconds(int seconds) {
for (int i = 0; i < seconds; i++) {
// The rate limiter's timer executes periodically every second
// and we want the timer to run `seconds` times.
// Because `manual_clock::advance` executes each timer only once
// even if they reschedule, we cannot just advance by requested
// number of seconds - instead, we must advance multiple times
// by one second.
manual_clock::advance(std::chrono::seconds(1));
co_await yield();
}
}
SEASTAR_TEST_CASE(test_rate_limiter_no_rejections_on_sequential) {
const uint64_t token_count = 1000 * 1000;
const uint64_t limit = 1;
test_rate_limiter::label lbl;
test_rate_limiter limiter;
for (uint64_t token = 0; token < token_count; token++) {
BOOST_REQUIRE_LE(limiter.increase_and_get_counter(lbl, token), 1);
co_await maybe_yield();
}
}
SEASTAR_TEST_CASE(test_rate_limiter_partition_label_separation) {
const uint64_t token_count = 30;
const uint64_t repeat_count = 10;
std::vector<test_rate_limiter::label> labels{3};
test_rate_limiter limiter;
for (uint64_t i = 0; i < repeat_count; i++) {
for (uint64_t token = 0; token < token_count; token++) {
for (auto& l : labels) {
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(l, token), i + 1);
co_await maybe_yield();
}
}
}
}
SEASTAR_TEST_CASE(test_rate_limiter_halving_over_time) {
test_rate_limiter::label lbl;
test_rate_limiter limiter;
for (int i = 0; i < 16; i++) {
limiter.increase_and_get_counter(lbl, 0);
}
// Should be cut in half
co_await step_seconds(1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), (16 / 2) + 1);
// Should decrease four times (9 -> 2)
co_await step_seconds(2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), (9 / 4) + 1);
// Should be reset
co_await step_seconds(10);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
}
SEASTAR_TEST_CASE(test_rate_limiter_time_window_wraparound_handling) {
test_rate_limiter::label lbl;
test_rate_limiter limiter;
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 3);
// Advance far into the future so that the time window wraps around
co_await step_seconds(1 << test_rate_limiter::time_window_bits);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 1);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 2);
BOOST_REQUIRE_EQUAL(limiter.increase_and_get_counter(lbl, 0), 3);
// TODO: Workaround for seastar#1072. Calling `manual_clock::advance`
// multiple times and then quitting the test immediately causes
// the test framework to hang. I didn't have the time to debug it, but I
// suspect there are some pending tasks which need to finish before exiting
// from the main test task.
co_await seastar::sleep(std::chrono::seconds(1));
}
SEASTAR_TEST_CASE(test_rate_limiter_account_operation) {
const uint64_t limit = 1;
const int ops_per_loop = 1000;
test_rate_limiter::label lbl;
test_rate_limiter limiter;
// We use UINT_MAX as the random parameter so that we get rejected quickly
db::per_partition_rate_limit::account_and_enforce info {
.random_variable = UINT32_MAX,
};
bool encountered_rejection = false;
for (int i = 0; i < ops_per_loop; i++) {
if (limiter.account_operation(lbl, 0, limit, info) == test_rate_limiter::can_proceed::no) {
encountered_rejection = true;
break;
}
co_await maybe_yield();
}
BOOST_REQUIRE(encountered_rejection);
}

View File

@@ -101,6 +101,7 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
db_config->commitlog_use_o_dsync.set(false);
db_config->add_cdc_extension();
db_config->add_per_partition_rate_limit_extension();
db_config->flush_schema_tables_after_modification.set(false);
}

View File

@@ -405,8 +405,10 @@ public:
clustering_ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
auto slice = query::partition_slice(std::move(clustering_ranges), { }, std::move(regular_columns), opts,
std::move(specific_ranges), cql_serialization_format::internal());
return make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice),
auto cmd = make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice),
query::row_limit(row_limit), query::partition_limit(partition_limit));
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
return cmd;
}
static future<> do_get_paged_slice(
@@ -511,7 +513,7 @@ public:
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -527,7 +529,7 @@ public:
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -564,7 +566,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -591,7 +593,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
// This mutation contains only counter tombstones so it can be applied like non-counter mutations.
auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout;
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -604,7 +606,7 @@ public:
return _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return _proxy.local().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
return _proxy.local().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit), db::allow_per_partition_rate_limit::yes);
});
});
}
@@ -692,6 +694,7 @@ public:
auto& proxy = _proxy.local();
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice),
query::row_limit(row_limit));
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
auto f = _query_state.get_client_state().has_schema_access(_db, *schema, auth::permission::SELECT);
return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
@@ -1611,7 +1614,9 @@ private:
}
auto slice = query::partition_slice(std::move(clustering_ranges), {}, std::move(regular_columns), opts,
nullptr, cql_serialization_format::internal(), per_partition_row_limit);
return make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice));
auto cmd = make_lw_shared<query::read_command>(s.id(), s.version(), std::move(slice), proxy.get_max_result_size(slice));
cmd->allow_limit = db::allow_per_partition_rate_limit::yes;
return cmd;
}
static ColumnParent column_path_to_column_parent(const ColumnPath& column_path) {
ColumnParent ret;

View File

@@ -9,13 +9,15 @@
#include <seastar/core/print.hh>
#include "transport/cql_protocol_extension.hh"
#include "cql3/result_set.hh"
#include "exceptions/exceptions.hh"
#include <map>
namespace cql_transport {
static const std::map<cql_protocol_extension, seastar::sstring> EXTENSION_NAMES = {
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"}
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"},
{cql_protocol_extension::RATE_LIMIT_ERROR, "SCYLLA_RATE_LIMIT_ERROR"}
};
cql_protocol_extension_enum_set supported_cql_protocol_extensions() {
@@ -30,6 +32,8 @@ std::vector<seastar::sstring> additional_options_for_proto_ext(cql_protocol_exte
switch (ext) {
case cql_protocol_extension::LWT_ADD_METADATA_MARK:
return {format("LWT_OPTIMIZATION_META_BIT_MASK={:d}", cql3::prepared_metadata::LWT_FLAG_MASK)};
case cql_protocol_extension::RATE_LIMIT_ERROR:
return {format("ERROR_CODE={:d}", exceptions::exception_code::RATE_LIMIT_ERROR)};
default:
return {};
}

View File

@@ -28,11 +28,13 @@ namespace cql_transport {
* `docs/protocol-extensions.md`.
*/
enum class cql_protocol_extension {
LWT_ADD_METADATA_MARK
LWT_ADD_METADATA_MARK,
RATE_LIMIT_ERROR
};
using cql_protocol_extension_enum = super_enum<cql_protocol_extension,
cql_protocol_extension::LWT_ADD_METADATA_MARK>;
cql_protocol_extension::LWT_ADD_METADATA_MARK,
cql_protocol_extension::RATE_LIMIT_ERROR>;
using cql_protocol_extension_enum_set = enum_set<cql_protocol_extension_enum>;

View File

@@ -37,6 +37,7 @@
#include <seastar/core/execution_stage.hh>
#include "utils/result_try.hh"
#include "utils/result_combinators.hh"
#include "db/operation_type.hh"
#include "enum_set.hh"
#include "service/query_state.hh"
@@ -483,6 +484,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
}), utils::result_catch<exceptions::function_execution_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
}), utils::result_catch<exceptions::rate_limit_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_rate_limit_error(stream, ex.code(), ex.what(), ex.op_type, ex.rejected_by_coordinator, trace_state, client_state);
}), utils::result_catch<exceptions::cassandra_exception>([&] (const auto& ex) {
// Note: the CQL protocol specifies that many types of errors have
// mandatory parameters. These cassandra_exception subclasses MUST
@@ -1275,6 +1279,20 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_function_fail
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const
{
if (!client_state.is_protocol_extension_set(cql_protocol_extension::RATE_LIMIT_ERROR)) {
return make_error(stream, exceptions::exception_code::CONFIG_ERROR, std::move(msg), tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_byte(static_cast<uint8_t>(op_type));
response->write_byte(static_cast<uint8_t>(rejected_by_coordinator));
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);

View File

@@ -31,6 +31,7 @@
#include "transport/messages/result_message.hh"
#include "utils/chunked_vector.hh"
#include "exceptions/coordinator_result.hh"
#include "db/operation_type.hh"
namespace cql3 {
@@ -247,6 +248,7 @@ private:
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const;
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;