alternator: add handling max_concurrent_requests_per_shard

The config value is already used to set an upper limit of concurrent
CQL requests, and now it's also abided by alternator.
Excessive requests result in returning RequestLimitExceeded error
to the client.

Tests: manual
Running multiple concurrent requests via the test suite results in:
botocore.errorfactory.RequestLimitExceeded: An error occurred (RequestLimitExceeded) when calling the CreateTable operation: too many in-flight requests: 17
This commit is contained in:
Piotr Sarna
2020-09-29 12:03:27 +02:00
parent 32dc692b8b
commit 1b8c946ad7
3 changed files with 13 additions and 5 deletions

View File

@@ -278,6 +278,10 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
_executor._stats.unsupported_operations++;
throw api_error::unknown_operation(format("Unsupported operation {}", op));
}
if (_pending_requests.get_count() >= _max_concurrent_requests) {
return make_ready_future<executor::request_return_type>(
api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count())));
}
return with_gate(_pending_requests, [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable {
//FIXME: Client state can provide more context, e.g. client's endpoint address
// We use unique_ptr because client_state cannot be moved or copied
@@ -405,9 +409,10 @@ server::server(executor& exec, cql3::query_processor& qp)
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
bool enforce_authorization, semaphore* memory_limiter) {
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
_enforce_authorization = enforce_authorization;
_max_concurrent_requests = std::move(max_concurrent_requests);
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));

View File

@@ -28,6 +28,7 @@
#include <optional>
#include "alternator/auth.hh"
#include "utils/small_vector.hh"
#include "utils/updateable_value.hh"
#include <seastar/core/units.hh>
namespace alternator {
@@ -50,6 +51,7 @@ class server {
alternator_callbacks_map _callbacks;
semaphore* _memory_limiter;
utils::updateable_value<uint32_t> _max_concurrent_requests;
class json_parser {
static constexpr size_t yieldable_parsing_threshold = 16*KB;
@@ -72,7 +74,7 @@ public:
server(executor& executor, cql3::query_processor& qp);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
bool enforce_authorization, semaphore* memory_limiter);
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
private:
void set_routes(seastar::httpd::routes& r);

View File

@@ -1284,11 +1284,12 @@ int main(int ac, char** av) {
}
bool alternator_enforce_authorization = cfg->alternator_enforce_authorization();
with_scheduling_group(dbcfg.statement_scheduling_group,
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] () mutable {
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] () mutable {
return alternator_server.invoke_on_all(
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization] (alternator::server& server) mutable {
[addr, alternator_port, alternator_https_port, creds = std::move(creds), alternator_enforce_authorization, cfg] (alternator::server& server) mutable {
auto& ss = service::get_local_storage_service();
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter());
return server.init(addr, alternator_port, alternator_https_port, creds, alternator_enforce_authorization, &ss.service_memory_limiter(),
ss.db().local().get_config().max_concurrent_requests_per_shard);
}).then([addr, alternator_port, alternator_https_port] {
startlog.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");