alternator: guard pending alternator requests with a gate

In order to make sure that pending alternator requests are processed
during shutdown, a gate for each shard is introduced. On shutdown,
each gate will be closed and all in-progress operations will be waited upon.

Fixes #5781
This commit is contained in:
Piotr Sarna
2020-02-16 11:15:35 +01:00
parent c8ab9b3ae4
commit acfed880cc
2 changed files with 17 additions and 7 deletions

View File

@@ -263,12 +263,15 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
throw api_error("UnknownOperationException",
format("Unsupported operation {}", op));
}
//FIXME: Client state can provide more context, e.g. client's endpoint address
// We use unique_ptr because client_state cannot be moved or copied
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content);
tracing::trace(trace_state, op);
return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {});
return with_gate(_pending_requests.local(), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] () mutable {
//FIXME: Client state can provide more context, e.g. client's endpoint address
// We use unique_ptr because client_state cannot be moved or copied
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()),
[this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content);
tracing::trace(trace_state, op);
return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {});
});
});
});
}
@@ -298,7 +301,7 @@ void server::set_routes(routes& r) {
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
server::server(seastar::sharded<executor>& e)
: _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false), _enabled_servers{}
: _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false), _enabled_servers{}, _pending_requests{}
, _callbacks{
{"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.create_table(client_state, std::move(trace_state), req->content); }},
{"DescribeTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr<request> req) { return e.describe_table(client_state, std::move(trace_state), req->content); }},
@@ -326,6 +329,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
" must be specified in order to init an alternator HTTP server instance"));
}
return seastar::async([this, addr, port, https_port, creds] {
_pending_requests.start().get();
try {
_executor.invoke_on_all([] (executor& e) {
return e.start();
@@ -365,7 +369,12 @@ future<> server::stop() {
}
return parallel_for_each(_enabled_servers, [] (httpd::http_server_control& control) {
return control.server().stop();
}).then([this] {
return _pending_requests.invoke_on_all([] (seastar::gate& pending) {
return pending.close();
});
});
}
}

View File

@@ -41,6 +41,7 @@ class server {
key_cache _key_cache;
bool _enforce_authorization;
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server_control>, 2> _enabled_servers;
seastar::sharded<seastar::gate> _pending_requests;
alternator_callbacks_map _callbacks;
public:
server(seastar::sharded<executor>& executor);