From 1b8c946ad7a9672e5746f67ff06dc707db6cd593 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 29 Sep 2020 12:03:27 +0200 Subject: [PATCH] alternator: add handling max_concurrent_requests_per_shard The config value is already used to set an upper limit of concurrent CQL requests, and now it's also abided by alternator. Excessive requests result in returning RequestLimitExceeded error to the client. Tests: manual Running multiple concurrent requests via the test suite results in: botocore.errorfactory.RequestLimitExceeded: An error occurred (RequestLimitExceeded) when calling the CreateTable operation: too many in-flight requests: 17 --- alternator/server.cc | 7 ++++++- alternator/server.hh | 4 +++- main.cc | 7 ++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/alternator/server.cc b/alternator/server.cc index 6d5e61a85e..8f8c0cbf1a 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -278,6 +278,10 @@ future server::handle_api_request(std::unique_ptr _executor._stats.unsupported_operations++; throw api_error::unknown_operation(format("Unsupported operation {}", op)); } + if (_pending_requests.get_count() >= _max_concurrent_requests) { + return make_ready_future( + api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()))); + } return with_gate(_pending_requests, [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable { //FIXME: Client state can provide more context, e.g. client's endpoint address // We use unique_ptr because client_state cannot be moved or copied @@ -405,9 +409,10 @@ server::server(executor& exec, cql3::query_processor& qp) } future<> server::init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - bool enforce_authorization, semaphore* memory_limiter) { + bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests) { _memory_limiter = memory_limiter; _enforce_authorization = enforce_authorization; + _max_concurrent_requests = std::move(max_concurrent_requests); if (!port && !https_port) { return make_exception_future<>(std::runtime_error("Either regular port or TLS port" " must be specified in order to init an alternator HTTP server instance")); diff --git a/alternator/server.hh b/alternator/server.hh index b5f1c55279..3c43ca4997 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -28,6 +28,7 @@ #include #include "alternator/auth.hh" #include "utils/small_vector.hh" +#include "utils/updateable_value.hh" #include namespace alternator { @@ -50,6 +51,7 @@ class server { alternator_callbacks_map _callbacks; semaphore* _memory_limiter; + utils::updateable_value _max_concurrent_requests; class json_parser { static constexpr size_t yieldable_parsing_threshold = 16*KB; @@ -72,7 +74,7 @@ public: server(executor& executor, cql3::query_processor& qp); future<> init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - bool enforce_authorization, semaphore* memory_limiter); + bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests); future<> stop(); private: void set_routes(seastar::httpd::routes& r); diff --git a/main.cc b/main.cc index aadeb51fab..232fb5cf6e 100644 --- a/main.cc +++ b/main.cc @@ -1284,11 +1284,12 @@ int main(int ac, char** av) { } bool alternator_enforce_authorization = cfg->alternator_enforce_authorization(); with_scheduling_group(dbcfg.statement_scheduling_group, - [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] () mutable { + [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] () mutable { return alternator_server.invoke_on_all( - [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (alternator::server& server) mutable { + [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] (alternator::server& server) mutable { auto& ss = service::get_local_storage_service(); - return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter()); + return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter(), + ss.db().local().get_config().max_concurrent_requests_per_shard); }).then([addr, alternator_port, alternator_https_port] { startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}", addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");