transport: add rate_limit_error
Adds a CQL protocol extension which introduces the rate_limit_error. The new error code will be used to indicate that the operation failed due to it exceeding the allowed per-partition rate limit. The error code is supposed to be returned only if the corresponding CQL extension is enabled by the client - if it's not enabled, then Config_error will be returned in its stead.
This commit is contained in:
23
db/operation_type.hh
Normal file
23
db/operation_type.hh
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <iosfwd>
|
||||
|
||||
namespace db {
|
||||
|
||||
enum class operation_type : uint8_t {
|
||||
read = 0,
|
||||
write = 1
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, operation_type op_type);
|
||||
|
||||
}
|
||||
@@ -146,3 +146,37 @@ parameters:
|
||||
the bit mask that should be used by the client to test against when checking
|
||||
prepared statement metadata flags to see if the current query is conditional
|
||||
or not.
|
||||
|
||||
## Rate limit error
|
||||
|
||||
This extension allows the driver to send a new type of error in case the operation
|
||||
goes over the allowed per-partition rate limit. This kind of error does not fit
|
||||
other existing error codes well, hence the need for the protocol extension.
|
||||
|
||||
On receiving this error, the driver should not retry the request; instead,
|
||||
the error should be propagated to the user so that they can decide what to do
|
||||
with it - sometimes it might make sense to propagate the error, in other cases
|
||||
it might make sense to retry with backoff.
|
||||
|
||||
The body of the error consists of the usual error code, error message and then
|
||||
the following fields: `<op_type><rejected_by_coordinator>`, where:
|
||||
|
||||
- `op_type` is a byte which identifies the operation which is the origin
|
||||
of the rate limit.
|
||||
- 0: read
|
||||
- 1: write
|
||||
- `rejected_by_coordinator` is a byte which is 1 if the operation was rejected
|
||||
on the coordinator and 0 if it was rejected by replicas.
|
||||
|
||||
If the driver does not understand this extension and does not enable it,
|
||||
the Config_error will be used instead of the new error code.
|
||||
|
||||
In order to be forward compatible with error codes added in the future protocol
|
||||
versions, this extension doesn't reserve a fixed error code - instead, it
|
||||
advertises the integer value used as the error code in the SUPPORTED response.
|
||||
|
||||
This extension is identified by the `SCYLLA_RATE_LIMIT_ERROR` key.
|
||||
The string map in the SUPPORTED response will contain the following parameters:
|
||||
|
||||
- `ERROR_CODE`: a 32-bit signed decimal integer which Scylla
|
||||
will use as the error code for the rate limit exception.
|
||||
|
||||
@@ -31,7 +31,8 @@ namespace exceptions {
|
||||
using coordinator_exception_container = utils::exception_container<
|
||||
mutation_write_timeout_exception,
|
||||
read_timeout_exception,
|
||||
read_failure_exception
|
||||
read_failure_exception,
|
||||
rate_limit_exception
|
||||
>;
|
||||
|
||||
template<typename T = void>
|
||||
|
||||
@@ -39,7 +39,8 @@ const std::unordered_map<exception_code, sstring>& exception_map() {
|
||||
{exception_code::INVALID, "invalid"},
|
||||
{exception_code::CONFIG_ERROR, "config_error"},
|
||||
{exception_code::ALREADY_EXISTS, "already_exists"},
|
||||
{exception_code::UNPREPARED, "unprepared"}
|
||||
{exception_code::UNPREPARED, "unprepared"},
|
||||
{exception_code::RATE_LIMIT_ERROR, "rate_limit_error"}
|
||||
};
|
||||
return map;
|
||||
}
|
||||
@@ -77,6 +78,12 @@ overloaded_exception::overloaded_exception(size_t c) noexcept
|
||||
: cassandra_exception(exception_code::OVERLOADED, prepare_message("Too many in flight hints: {}", c))
|
||||
{}
|
||||
|
||||
rate_limit_exception::rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept
|
||||
: cassandra_exception(exception_code::CONFIG_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
|
||||
, op_type(op_type_)
|
||||
, rejected_by_coordinator(rejected_by_coordinator_)
|
||||
{ }
|
||||
|
||||
prepared_query_not_found_exception::prepared_query_not_found_exception(bytes id) noexcept
|
||||
: request_validation_exception{exception_code::UNPREPARED, prepare_message("No prepared statement with ID {} found.", id)}
|
||||
, id{id}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/write_type.hh"
|
||||
#include "db/operation_type.hh"
|
||||
#include <stdexcept>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "bytes.hh"
|
||||
@@ -42,7 +43,17 @@ enum class exception_code : int32_t {
|
||||
INVALID = 0x2200,
|
||||
CONFIG_ERROR = 0x2300,
|
||||
ALREADY_EXISTS = 0x2400,
|
||||
UNPREPARED = 0x2500
|
||||
UNPREPARED = 0x2500,
|
||||
|
||||
// Scylla-specific error codes
|
||||
// The error codes below are advertised to the drivers during connection
|
||||
// handshake using the protocol extension negotiation, and are only
|
||||
// enabled if the drivers explicitly enable them. Therefore it's perfectly
|
||||
// fine to change them in case some new error codes are introduced
|
||||
// in Cassandra.
|
||||
// NOTE TO DRIVER DEVELOPERS: These constants must not be relied upon,
|
||||
// they must be learned from protocol extensions instead.
|
||||
RATE_LIMIT_ERROR = 0xF000
|
||||
};
|
||||
|
||||
const std::unordered_map<exception_code, sstring>& exception_map();
|
||||
@@ -183,6 +194,13 @@ struct overloaded_exception : public cassandra_exception {
|
||||
cassandra_exception(exception_code::OVERLOADED, std::move(msg)) {}
|
||||
};
|
||||
|
||||
struct rate_limit_exception : public cassandra_exception {
|
||||
db::operation_type op_type;
|
||||
bool rejected_by_coordinator;
|
||||
|
||||
rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept;
|
||||
};
|
||||
|
||||
class request_validation_exception : public cassandra_exception {
|
||||
public:
|
||||
using cassandra_exception::cassandra_exception;
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include "gms/feature_service.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/operation_type.hh"
|
||||
|
||||
#include "utils/human_readable.hh"
|
||||
#include "utils/fb_utilities.hh"
|
||||
@@ -1933,6 +1934,14 @@ std::ostream& operator<<(std::ostream& os, db::consistency_level cl) {
|
||||
}
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, operation_type op_type) {
|
||||
switch (op_type) {
|
||||
case operation_type::read: return os << "read";
|
||||
case operation_type::write: return os << "write";
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
std::ostream&
|
||||
|
||||
@@ -9,13 +9,15 @@
|
||||
#include <seastar/core/print.hh>
|
||||
#include "transport/cql_protocol_extension.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
#include <map>
|
||||
|
||||
namespace cql_transport {
|
||||
|
||||
static const std::map<cql_protocol_extension, seastar::sstring> EXTENSION_NAMES = {
|
||||
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"}
|
||||
{cql_protocol_extension::LWT_ADD_METADATA_MARK, "SCYLLA_LWT_ADD_METADATA_MARK"},
|
||||
{cql_protocol_extension::RATE_LIMIT_ERROR, "SCYLLA_RATE_LIMIT_ERROR"}
|
||||
};
|
||||
|
||||
cql_protocol_extension_enum_set supported_cql_protocol_extensions() {
|
||||
@@ -30,6 +32,8 @@ std::vector<seastar::sstring> additional_options_for_proto_ext(cql_protocol_exte
|
||||
switch (ext) {
|
||||
case cql_protocol_extension::LWT_ADD_METADATA_MARK:
|
||||
return {format("LWT_OPTIMIZATION_META_BIT_MASK={:d}", cql3::prepared_metadata::LWT_FLAG_MASK)};
|
||||
case cql_protocol_extension::RATE_LIMIT_ERROR:
|
||||
return {format("ERROR_CODE={:d}", exceptions::exception_code::RATE_LIMIT_ERROR)};
|
||||
default:
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -28,11 +28,13 @@ namespace cql_transport {
|
||||
* `docs/protocol-extensions.md`.
|
||||
*/
|
||||
enum class cql_protocol_extension {
|
||||
LWT_ADD_METADATA_MARK
|
||||
LWT_ADD_METADATA_MARK,
|
||||
RATE_LIMIT_ERROR
|
||||
};
|
||||
|
||||
using cql_protocol_extension_enum = super_enum<cql_protocol_extension,
|
||||
cql_protocol_extension::LWT_ADD_METADATA_MARK>;
|
||||
cql_protocol_extension::LWT_ADD_METADATA_MARK,
|
||||
cql_protocol_extension::RATE_LIMIT_ERROR>;
|
||||
|
||||
using cql_protocol_extension_enum_set = enum_set<cql_protocol_extension_enum>;
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "utils/result_try.hh"
|
||||
#include "utils/result_combinators.hh"
|
||||
#include "db/operation_type.hh"
|
||||
|
||||
#include "enum_set.hh"
|
||||
#include "service/query_state.hh"
|
||||
@@ -483,6 +484,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
}), utils::result_catch<exceptions::function_execution_exception>([&] (const auto& ex) {
|
||||
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
|
||||
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
|
||||
}), utils::result_catch<exceptions::rate_limit_exception>([&] (const auto& ex) {
|
||||
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
|
||||
return make_rate_limit_error(stream, ex.code(), ex.what(), ex.op_type, ex.rejected_by_coordinator, trace_state, client_state);
|
||||
}), utils::result_catch<exceptions::cassandra_exception>([&] (const auto& ex) {
|
||||
// Note: the CQL protocol specifies that many types of errors have
|
||||
// mandatory parameters. These cassandra_exception subclasses MUST
|
||||
@@ -1275,6 +1279,20 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_function_fail
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const
|
||||
{
|
||||
if (!client_state.is_protocol_extension_set(cql_protocol_extension::RATE_LIMIT_ERROR)) {
|
||||
return make_error(stream, exceptions::exception_code::CONFIG_ERROR, std::move(msg), tr_state);
|
||||
}
|
||||
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
response->write_string(msg);
|
||||
response->write_byte(static_cast<uint8_t>(op_type));
|
||||
response->write_byte(static_cast<uint8_t>(rejected_by_coordinator));
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "exceptions/coordinator_result.hh"
|
||||
#include "db/operation_type.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -247,6 +248,7 @@ private:
|
||||
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, const service::client_state& client_state) const;
|
||||
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
|
||||
Reference in New Issue
Block a user