mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
alternator: add handling max_concurrent_requests_per_shard
The config value is already used to set an upper limit of concurrent CQL requests, and now it's also abided by alternator. Excessive requests result in returning RequestLimitExceeded error to the client. Tests: manual Running multiple concurrent requests via the test suite results in: botocore.errorfactory.RequestLimitExceeded: An error occurred (RequestLimitExceeded) when calling the CreateTable operation: too many in-flight requests: 17
This commit is contained in:
@@ -278,6 +278,10 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
_executor._stats.unsupported_operations++;
|
||||
throw api_error::unknown_operation(format("Unsupported operation {}", op));
|
||||
}
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
return make_ready_future<executor::request_return_type>(
|
||||
api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count())));
|
||||
}
|
||||
return with_gate(_pending_requests, [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable {
|
||||
//FIXME: Client state can provide more context, e.g. client's endpoint address
|
||||
// We use unique_ptr because client_state cannot be moved or copied
|
||||
@@ -405,9 +409,10 @@ server::server(executor& exec, cql3::query_processor& qp)
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
bool enforce_authorization, semaphore* memory_limiter) {
|
||||
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
_enforce_authorization = enforce_authorization;
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
if (!port && !https_port) {
|
||||
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
|
||||
" must be specified in order to init an alternator HTTP server instance"));
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <optional>
|
||||
#include "alternator/auth.hh"
|
||||
#include "utils/small_vector.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <seastar/core/units.hh>
|
||||
|
||||
namespace alternator {
|
||||
@@ -50,6 +51,7 @@ class server {
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
semaphore* _memory_limiter;
|
||||
utils::updateable_value<uint32_t> _max_concurrent_requests;
|
||||
|
||||
class json_parser {
|
||||
static constexpr size_t yieldable_parsing_threshold = 16*KB;
|
||||
@@ -72,7 +74,7 @@ public:
|
||||
server(executor& executor, cql3::query_processor& qp);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
bool enforce_authorization, semaphore* memory_limiter);
|
||||
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
|
||||
7
main.cc
7
main.cc
@@ -1284,11 +1284,12 @@ int main(int ac, char** av) {
|
||||
}
|
||||
bool alternator_enforce_authorization = cfg->alternator_enforce_authorization();
|
||||
with_scheduling_group(dbcfg.statement_scheduling_group,
|
||||
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] () mutable {
|
||||
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] () mutable {
|
||||
return alternator_server.invoke_on_all(
|
||||
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (alternator::server& server) mutable {
|
||||
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] (alternator::server& server) mutable {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter());
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter(),
|
||||
ss.db().local().get_config().max_concurrent_requests_per_shard);
|
||||
}).then([addr, alternator_port, alternator_https_port] {
|
||||
startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
|
||||
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
|
||||
|
||||
Reference in New Issue
Block a user