Files
scylladb/alternator/server.cc
Nadav Har'El aad5eeab51 alternator: better error messages when Alternator port is taken
If Alternator is requested to be enabled on a specific port but the port is
already taken, the boot fails as expected - but the error log is confusing;
It currently looks something like this:

WARN  2019-12-24 11:22:57,303 [shard 0] alternator-server - Failed to set up Alternator HTTP server on 0.0.0.0 port 8000, TLS port 8043: std::system_error (error system:98, posix_listen failed for address 0.0.0.0:8000: Address already in use)
... (many more messages about the server shutting down)
INFO  2019-12-24 11:22:58,008 [shard 0] init - Startup failed: std::system_error (error system:98, posix_listen failed for address 0.0.0.0:8000: Address already in use)

There are two problems here. First, the "WARN" should really be an "ERROR",
because it causes the server to be shut down and the user must see this error.
Second, the final line in the log, something the user is likely to see first,
contains only the ultimate cause for the exception (an address already in use)
but not the information what this address was needed for.

This patch solves both issues, and the log now looks like:

ERROR 2019-12-24 14:00:54,496 [shard 0] alternator-server - Failed to set up Alterna
tor HTTP server on 0.0.0.0 port 8000, TLS port 8043: std::system_error (error system
:98, posix_listen failed for address 0.0.0.0:8000: Address already in use)
...
INFO  2019-12-24 14:00:55,056 [shard 0] init - Startup failed: std::_Nested_exception<std::runtime_error> (Failed to set up Alternator HTTP server on 0.0.0.0 port 8000, TLS port 8043): std::system_error (error system:98, posix_listen failed for address 0.0.0.0:8000: Address already in use)

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20191224124127.7093-1-nyh@scylladb.com>
2020-01-03 15:48:20 +02:00

314 lines
16 KiB
C++

/*
* Copyright 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "alternator/server.hh"
#include "log.hh"
#include <seastar/http/function_handlers.hh>
#include <seastar/json/json_elements.hh>
#include <seastarx.hh>
#include "error.hh"
#include "rjson.hh"
#include "auth.hh"
#include <cctype>
#include "cql3/query_processor.hh"
static logging::logger slogger("alternator-server");
using namespace httpd;
namespace alternator {
static constexpr auto TARGET = "X-Amz-Target";
inline std::vector<std::string_view> split(std::string_view text, char separator) {
std::vector<std::string_view> tokens;
if (text == "") {
return tokens;
}
while (true) {
auto pos = text.find_first_of(separator);
if (pos != std::string_view::npos) {
tokens.emplace_back(text.data(), pos);
text.remove_prefix(pos + 1);
} else {
tokens.emplace_back(text);
break;
}
}
return tokens;
}
// DynamoDB HTTP error responses are structured as follows
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html
// Our handlers throw an exception to report an error. If the exception
// is of type alternator::api_error, it unwrapped and properly reported to
// the user directly. Other exceptions are unexpected, and reported as
// Internal Server Error.
class api_handler : public handler_base {
public:
api_handler(const future_json_function& _handle) : _f_handle(
[_handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
return seastar::futurize_apply(_handle, std::move(req)).then_wrapped([rep = std::move(rep)](future<json::json_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of
// exceptions are unexpected, and returned to the user
// as an internal server error:
api_error ret;
try {
resf.get();
} catch (api_error &ae) {
ret = ae;
} catch (rjson::error & re) {
ret = api_error("ValidationException", re.what());
} catch (...) {
ret = api_error(
"Internal Server Error",
format("Internal server error: {}", std::current_exception()),
reply::status_type::internal_server_error);
}
// FIXME: what is this version number?
rep->_content += "{\"__type\":\"com.amazonaws.dynamodb.v20120810#" + ret._type + "\"," +
"\"message\":\"" + ret._msg + "\"}";
rep->_status = ret._http_code;
slogger.trace("api_handler error case: {}", rep->_content);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
slogger.trace("api_handler success case");
auto res = resf.get0();
if (res._body_writer) {
rep->write_body("json", std::move(res._body_writer));
} else {
rep->_content += res._res;
}
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}), _type("json") { }
api_handler(const api_handler&) = default;
future<std::unique_ptr<reply>> handle(const sstring& path,
std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
return _f_handle(std::move(req), std::move(rep)).then(
[this](std::unique_ptr<reply> rep) {
rep->done(_type);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}
protected:
future_handler_function _f_handle;
sstring _type;
};
class health_handler : public handler_base {
virtual future<std::unique_ptr<reply>> handle(const sstring& path, std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
rep->set_status(reply::status_type::ok);
rep->write_body("txt", format("healthy: {}", req->get_header("Host")));
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
};
future<> server::verify_signature(const request& req) {
if (!_enforce_authorization) {
slogger.debug("Skipping authorization");
return make_ready_future<>();
}
auto host_it = req._headers.find("Host");
if (host_it == req._headers.end()) {
throw api_error("InvalidSignatureException", "Host header is mandatory for signature verification");
}
auto authorization_it = req._headers.find("Authorization");
if (host_it == req._headers.end()) {
throw api_error("InvalidSignatureException", "Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
std::string credential;
std::string user_signature;
std::string signed_headers_str;
std::vector<std::string_view> signed_headers;
for (std::string_view entry : credentials_raw) {
std::vector<std::string_view> entry_split = split(entry, '=');
if (entry_split.size() != 2) {
if (entry != "AWS4-HMAC-SHA256") {
throw api_error("InvalidSignatureException", format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
}
continue;
}
std::string_view auth_value = entry_split[1];
// Commas appear as an additional (quite redundant) delimiter
if (auth_value.back() == ',') {
auth_value.remove_suffix(1);
}
if (entry_split[0] == "Credential") {
credential = std::string(auth_value);
} else if (entry_split[0] == "Signature") {
user_signature = std::string(auth_value);
} else if (entry_split[0] == "SignedHeaders") {
signed_headers_str = std::string(auth_value);
signed_headers = split(auth_value, ';');
std::sort(signed_headers.begin(), signed_headers.end());
}
}
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error("ValidationException", format("Incorrect credential information format: {}", credential));
}
std::string user(credential_split[0]);
std::string datestamp(credential_split[1]);
std::string region(credential_split[2]);
std::string service(credential_split[3]);
std::map<std::string_view, std::string_view> signed_headers_map;
for (const auto& header : signed_headers) {
signed_headers_map.emplace(header, std::string_view());
}
for (auto& header : req._headers) {
std::string header_str;
header_str.resize(header.first.size());
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
auto it = signed_headers_map.find(header_str);
if (it != signed_headers_map.end()) {
it->second = std::string_view(header.second);
}
}
auto cache_getter = [] (std::string username) {
return get_key_from_roles(cql3::get_query_processor().local(), std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then([this, &req,
user = std::move(user),
host = std::move(host),
datestamp = std::move(datestamp),
signed_headers_str = std::move(signed_headers_str),
signed_headers_map = std::move(signed_headers_map),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
std::string signature = get_signature(user, *key_ptr, std::string_view(host), req._method,
datestamp, signed_headers_str, signed_headers_map, req.content, region, service, "");
if (signature != std::string_view(user_signature)) {
_key_cache.remove(user);
throw api_error("UnrecognizedClientException", "The security token included in the request is invalid.");
}
});
}
future<json::json_return_type> server::handle_api_request(std::unique_ptr<request>&& req) {
sstring target = req->get_header(TARGET);
std::vector<std::string_view> split_target = split(target, '.');
//NOTICE(sarna): Target consists of Dynamo API version followed by a dot '.' and operation type (e.g. CreateTable)
std::string op = split_target.empty() ? std::string() : std::string(split_target.back());
slogger.trace("Request: {} {}", op, req->content);
return verify_signature(*req).then([this, op, req = std::move(req)] () mutable {
auto callback_it = _callbacks.find(op);
if (callback_it == _callbacks.end()) {
_executor.local()._stats.unsupported_operations++;
throw api_error("UnknownOperationException",
format("Unsupported operation {}", op));
}
//FIXME: Client state can provide more context, e.g. client's endpoint address
// We use unique_ptr because client_state cannot be moved or copied
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
client_state->set_raw_keyspace(executor::KEYSPACE_NAME);
executor::maybe_trace_query(*client_state, op, req->content);
tracing::trace(client_state->get_trace_state(), op);
return callback_it->second(_executor.local(), *client_state, std::move(req));
});
});
}
void server::set_routes(routes& r) {
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
return handle_api_request(std::move(req));
});
r.add(operation_type::POST, url("/"), req_handler);
r.add(operation_type::GET, url("/"), new health_handler);
}
//FIXME: A way to immediately invalidate the cache should be considered,
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
server::server(seastar::sharded<executor>& e)
: _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false)
, _callbacks{
{"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) {
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); }
},
{"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_table(client_state, req->content); }},
{"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_table(client_state, req->content); }},
{"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.put_item(client_state, req->content); }},
{"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.update_item(client_state, req->content); }},
{"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.get_item(client_state, req->content); }},
{"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_item(client_state, req->content); }},
{"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
{"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.scan(client_state, req->content); }},
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, req->content); }},
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, req->content); }},
{"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.query(client_state, req->content); }},
} {
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds, bool enforce_authorization) {
_enforce_authorization = enforce_authorization;
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));
}
return seastar::async([this, addr, port, https_port, creds] {
try {
_executor.invoke_on_all([] (executor& e) {
return e.start();
}).get();
if (port) {
_control.start().get();
_control.set_routes(std::bind(&server::set_routes, this, std::placeholders::_1)).get();
_control.listen(socket_address{addr, *port}).get();
slogger.info("Alternator HTTP server listening on {} port {}", addr, *port);
}
if (https_port) {
_https_control.start().get();
_https_control.set_routes(std::bind(&server::set_routes, this, std::placeholders::_1)).get();
_https_control.server().invoke_on_all([creds] (http_server& serv) {
return serv.set_tls_credentials(creds->build_server_credentials());
}).get();
_https_control.listen(socket_address{addr, *https_port}).get();
slogger.info("Alternator HTTPS server listening on {} port {}", addr, *https_port);
}
} catch (...) {
slogger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
addr, port ? std::to_string(*port) : "OFF", https_port ? std::to_string(*https_port) : "OFF", std::current_exception());
std::throw_with_nested(std::runtime_error(
format("Failed to set up Alternator HTTP server on {} port {}, TLS port {}",
addr, port ? std::to_string(*port) : "OFF", https_port ? std::to_string(*https_port) : "OFF")));
}
});
}
}