transport: add rate_limit_error

Adds a CQL protocol extension which introduces the rate_limit_error. The
new error code will be used to indicate that the operation failed due to
it exceeding the allowed per-partition rate limit.

The error code is supposed to be returned only if the corresponding CQL
extension is enabled by the client - if it's not enabled, then
Config_error will be returned in its stead.
This commit is contained in:
Piotr Dulikowski
2021-12-13 19:11:18 +01:00
parent c59a730c1e
commit efc3953c0a
10 changed files with 124 additions and 6 deletions

23
db/operation_type.hh Normal file
View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
#include <iosfwd>
namespace db {
enum class operation_type : uint8_t {
read = 0,
write = 1
};
std::ostream& operator<<(std::ostream& os, operation_type op_type);
}

View File

@@ -146,3 +146,37 @@ parameters:
the bit mask that should be used by the client to test against when checking
prepared statement metadata flags to see if the current query is conditional
or not.
## Rate limit error
This extension allows the driver to send a new type of error in case the operation
goes over the allowed per-partition rate limit. This kind of error does not fit
other existing error codes well, hence the need for the protocol extension.
On receiving this error, the driver should not retry the request; instead,
the error should be propagated to the user so that they can decide what to do
with it - sometimes it might make sense to propagate the error, in other cases
it might make sense to retry with backoff.
The body of the error consists of the usual error code, error message and then
the following fields: `<op_type><rejected_by_coordinator>`, where:
- `op_type` is a byte which identifies the operation which is the origin
of the rate limit.
- 0: read
- 1: write
- `rejected_by_coordinator` is a byte which is 1 if the operation was rejected
on the coordinator and 0 if it was rejected by replicas.
If the driver does not understand this extension and does not enable it,
the Config_error will be used instead of the new error code.
In order to be forward compatible with error codes added in the future protocol
versions, this extension doesn't reserve a fixed error code - instead, it
advertises the integer value used as the error code in the SUPPORTED response.
This extension is identified by the `SCYLLA_RATE_LIMIT_ERROR` key.
The string map in the SUPPORTED response will contain the following parameters:
- `ERROR_CODE`: a 32-bit signed decimal integer which Scylla
will use as the error code for the rate limit exception.

View File

@@ -31,7 +31,8 @@ namespace exceptions {
using coordinator_exception_container = utils::exception_container<
mutation_write_timeout_exception,
read_timeout_exception,
read_failure_exception
read_failure_exception,
rate_limit_exception
>;
template<typename T = void>

View File

@@ -39,7 +39,8 @@ const std::unordered_map<exception_code, sstring>& exception_map() {
{exception_code::INVALID, "invalid"},
{exception_code::CONFIG_ERROR, "config_error"},
{exception_code::ALREADY_EXISTS, "already_exists"},
{exception_code::UNPREPARED, "unprepared"}
{exception_code::UNPREPARED, "unprepared"},
{exception_code::RATE_LIMIT_ERROR, "rate_limit_error"}
};
return map;
}
@@ -77,6 +78,12 @@ overloaded_exception::overloaded_exception(size_t c) noexcept
: cassandra_exception(exception_code::OVERLOADED, prepare_message("Too many in flight hints: {}", c))
{}
rate_limit_exception::rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept
: cassandra_exception(exception_code::CONFIG_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
, op_type(op_type_)
, rejected_by_coordinator(rejected_by_coordinator_)
{ }
prepared_query_not_found_exception::prepared_query_not_found_exception(bytes id) noexcept
: request_validation_exception{exception_code::UNPREPARED, prepare_message("No prepared statement with ID {} found.", id)}
, id{id}

View File

@@ -12,6 +12,7 @@
#include "db/consistency_level_type.hh"
#include "db/write_type.hh"
#include "db/operation_type.hh"
#include <stdexcept>
#include <seastar/core/sstring.hh>
#include "bytes.hh"
@@ -42,7 +43,17 @@ enum class exception_code : int32_t {
INVALID = 0x2200,
CONFIG_ERROR = 0x2300,
ALREADY_EXISTS = 0x2400,
UNPREPARED = 0x2500
UNPREPARED = 0x2500,
// Scylla-specific error codes
// The error codes below are advertised to the drivers during connection
// handshake using the protocol extension negotiation, and are only
// enabled if the drivers explicitly enable them. Therefore it's perfectly
// fine to change them in case some new error codes are introduced
// in Cassandra.
// NOTE TO DRIVER DEVELOPERS: These constants must not be relied upon,
// they must be learned from protocol extensions instead.
RATE_LIMIT_ERROR = 0xF000
};
const std::unordered_map<exception_code, sstring>& exception_map();
@@ -183,6 +194,13 @@ struct overloaded_exception : public cassandra_exception {
cassandra_exception(exception_code::OVERLOADED, std::move(msg)) {}
};
struct rate_limit_exception : public cassandra_exception {
db::operation_type op_type;
bool rejected_by_coordinator;
rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept;
};
class request_validation_exception : public cassandra_exception {
public:
using cassandra_exception::cassandra_exception;

View File

@@ -42,6 +42,7 @@
#include "gms/feature_service.hh"
#include "timeout_config.hh"
#include "service/storage_proxy.hh"
#include "db/operation_type.hh"
#include "utils/human_readable.hh"
#include "utils/fb_utilities.hh"
@@ -1933,6 +1934,14 @@ std::ostream& operator<<(std::ostream& os, db::consistency_level cl) {
}
}
std::ostream& operator<<(std::ostream& os, operation_type op_type) {
switch (op_type) {
case operation_type::read: return os << "read";
case operation_type::write: return os << "write";
}
abort();
}
}
std::ostream&

View File

@@ -9,13 +9,15 @@
#include <seastar/core/print.hh>
#include "transport/cql_protocol_extension.hh"
#include "cql3/result_set.hh"
#include "exceptions/exceptions.hh"
#include <map>
namespace cql_transport {
static const std::map<cql_protocol_extension, seastar::sstring> EXTENSION_NAMES = {
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"}
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"},
{cql_protocol_extension::RATE_LIMIT_ERROR, "SCYLLA_RATE_LIMIT_ERROR"}
};
cql_protocol_extension_enum_set supported_cql_protocol_extensions() {
@@ -30,6 +32,8 @@ std::vector<seastar::sstring> additional_options_for_proto_ext(cql_protocol_exte
switch (ext) {
case cql_protocol_extension::LWT_ADD_METADATA_MARK:
return {format("LWT_OPTIMIZATION_META_BIT_MASK={:d}", cql3::prepared_metadata::LWT_FLAG_MASK)};
case cql_protocol_extension::RATE_LIMIT_ERROR:
return {format("ERROR_CODE={:d}", exceptions::exception_code::RATE_LIMIT_ERROR)};
default:
return {};
}

View File

@@ -28,11 +28,13 @@ namespace cql_transport {
* `docs/protocol-extensions.md`.
*/
enum class cql_protocol_extension {
LWT_ADD_METADATA_MARK
LWT_ADD_METADATA_MARK,
RATE_LIMIT_ERROR
};
using cql_protocol_extension_enum = super_enum<cql_protocol_extension,
cql_protocol_extension::LWT_ADD_METADATA_MARK>;
cql_protocol_extension::LWT_ADD_METADATA_MARK,
cql_protocol_extension::RATE_LIMIT_ERROR>;
using cql_protocol_extension_enum_set = enum_set<cql_protocol_extension_enum>;

View File

@@ -37,6 +37,7 @@
#include <seastar/core/execution_stage.hh>
#include "utils/result_try.hh"
#include "utils/result_combinators.hh"
#include "db/operation_type.hh"
#include "enum_set.hh"
#include "service/query_state.hh"
@@ -483,6 +484,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
}), utils::result_catch<exceptions::function_execution_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
}), utils::result_catch<exceptions::rate_limit_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_rate_limit_error(stream, ex.code(), ex.what(), ex.op_type, ex.rejected_by_coordinator, trace_state, client_state);
}), utils::result_catch<exceptions::cassandra_exception>([&] (const auto& ex) {
// Note: the CQL protocol specifies that many types of errors have
// mandatory parameters. These cassandra_exception subclasses MUST
@@ -1275,6 +1279,20 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_function_fail
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const
{
if (!client_state.is_protocol_extension_set(cql_protocol_extension::RATE_LIMIT_ERROR)) {
return make_error(stream, exceptions::exception_code::CONFIG_ERROR, std::move(msg), tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_byte(static_cast<uint8_t>(op_type));
response->write_byte(static_cast<uint8_t>(rejected_by_coordinator));
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);

View File

@@ -31,6 +31,7 @@
#include "transport/messages/result_message.hh"
#include "utils/chunked_vector.hh"
#include "exceptions/coordinator_result.hh"
#include "db/operation_type.hh"
namespace cql3 {
@@ -247,6 +248,7 @@ private:
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const;
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;