diff --git a/alternator/server.cc b/alternator/server.cc index 3685e16bdf..d84c928c01 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -263,12 +263,15 @@ future server::handle_api_request(std::unique_ptr throw api_error("UnknownOperationException", format("Unsupported operation {}", op)); } - //FIXME: Client state can provide more context, e.g. client's endpoint address - // We use unique_ptr because client_state cannot be moved or copied - return do_with(std::make_unique(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr& client_state) mutable { - tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content); - tracing::trace(trace_state, op); - return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {}); + return with_gate(_pending_requests.local(), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable { + //FIXME: Client state can provide more context, e.g. client's endpoint address + // We use unique_ptr because client_state cannot be moved or copied + return do_with(std::make_unique(executor::client_state::internal_tag()), + [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr& client_state) mutable { + tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content); + tracing::trace(trace_state, op); + return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {}); + }); }); }); } @@ -298,7 +301,7 @@ void server::set_routes(routes& r) { // e.g. when the system table which stores the keys is changed. // For now, this propagation may take up to 1 minute. server::server(seastar::sharded& e) - : _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false), _enabled_servers{} + : _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false), _enabled_servers{}, _pending_requests{} , _callbacks{ {"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.create_table(client_state, std::move(trace_state), req->content); }}, {"DescribeTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.describe_table(client_state, std::move(trace_state), req->content); }}, @@ -326,6 +329,7 @@ future<> server::init(net::inet_address addr, std::optional port, std: " must be specified in order to init an alternator HTTP server instance")); } return seastar::async([this, addr, port, https_port, creds] { + _pending_requests.start().get(); try { _executor.invoke_on_all([] (executor& e) { return e.start(); @@ -365,7 +369,12 @@ future<> server::stop() { } return parallel_for_each(_enabled_servers, [] (httpd::http_server_control& control) { return control.server().stop(); + }).then([this] { + return _pending_requests.invoke_on_all([] (seastar::gate& pending) { + return pending.close(); + }); }); + } } diff --git a/alternator/server.hh b/alternator/server.hh index bce3def5aa..dc335bb8ee 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -41,6 +41,7 @@ class server { key_cache _key_cache; bool _enforce_authorization; utils::small_vector, 2> _enabled_servers; + seastar::sharded _pending_requests; alternator_callbacks_map _callbacks; public: server(seastar::sharded& executor);