diff --git a/alternator/server.cc b/alternator/server.cc index 6d5e61a85e..8f8c0cbf1a 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -278,6 +278,10 @@ future server::handle_api_request(std::unique_ptr _executor._stats.unsupported_operations++; throw api_error::unknown_operation(format("Unsupported operation {}", op)); } + if (_pending_requests.get_count() >= _max_concurrent_requests) { + return make_ready_future( + api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()))); + } return with_gate(_pending_requests, [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable { //FIXME: Client state can provide more context, e.g. client's endpoint address // We use unique_ptr because client_state cannot be moved or copied @@ -405,9 +409,10 @@ server::server(executor& exec, cql3::query_processor& qp) } future<> server::init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - bool enforce_authorization, semaphore* memory_limiter) { + bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests) { _memory_limiter = memory_limiter; _enforce_authorization = enforce_authorization; + _max_concurrent_requests = std::move(max_concurrent_requests); if (!port && !https_port) { return make_exception_future<>(std::runtime_error("Either regular port or TLS port" " must be specified in order to init an alternator HTTP server instance")); diff --git a/alternator/server.hh b/alternator/server.hh index b5f1c55279..3c43ca4997 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -28,6 +28,7 @@ #include #include "alternator/auth.hh" #include "utils/small_vector.hh" +#include "utils/updateable_value.hh" #include namespace alternator { @@ -50,6 +51,7 @@ class server { alternator_callbacks_map _callbacks; semaphore* _memory_limiter; + utils::updateable_value _max_concurrent_requests; class json_parser { static constexpr size_t yieldable_parsing_threshold = 16*KB; @@ -72,7 +74,7 @@ public: server(executor& executor, cql3::query_processor& qp); future<> init(net::inet_address addr, std::optional port, std::optional https_port, std::optional creds, - bool enforce_authorization, semaphore* memory_limiter); + bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value max_concurrent_requests); future<> stop(); private: void set_routes(seastar::httpd::routes& r); diff --git a/main.cc b/main.cc index aadeb51fab..232fb5cf6e 100644 --- a/main.cc +++ b/main.cc @@ -1284,11 +1284,12 @@ int main(int ac, char** av) { } bool alternator_enforce_authorization = cfg->alternator_enforce_authorization(); with_scheduling_group(dbcfg.statement_scheduling_group, - [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] () mutable { + [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] () mutable { return alternator_server.invoke_on_all( - [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (alternator::server& server) mutable { + [addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] (alternator::server& server) mutable { auto& ss = service::get_local_storage_service(); - return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter()); + return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter(), + ss.db().local().get_config().max_concurrent_requests_per_shard); }).then([addr, alternator_port, alternator_https_port] { startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}", addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");