mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
alternator: add minimal HTTP interface
The interface works on port 8000 by default and provides the most basic alternator operations - it's an incomplete set without validation, meant to allow testing as early as possible.
This commit is contained in:
committed by
Nadav Har'El
parent
a09479e63c
commit
2ec78164bc
309
alternator/executor.cc
Normal file
309
alternator/executor.cc
Normal file
@@ -0,0 +1,309 @@
|
||||
/*
|
||||
* Copyright 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* See the LICENSE.PROPRIETARY file in the top-level directory for licensing information.
|
||||
*/
|
||||
|
||||
#include "alternator/executor.hh"
|
||||
#include "log.hh"
|
||||
#include "json.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "timestamp.hh"
|
||||
#include "database.hh"
|
||||
#include "types/map.hh"
|
||||
#include "schema.hh"
|
||||
#include "query-request.hh"
|
||||
#include "query-result-reader.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "bytes.hh"
|
||||
#include "cql3/update_parameters.hh"
|
||||
|
||||
static logging::logger elogger("alternator-executor");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
static constexpr auto ATTRIBUTE_DEFINITIONS = "AttributeDefinitions";
|
||||
static constexpr auto KEY_SCHEMA = "KeySchema";
|
||||
static constexpr auto TABLE_NAME = "TableName";
|
||||
static constexpr auto TABLE_DESCRIPTION = "TableDescription";
|
||||
static constexpr auto ATTRIBUTE_NAME = "AttributeName";
|
||||
static constexpr auto ATTRIBUTE_TYPE = "AttributeType";
|
||||
static constexpr auto KEY_TYPE = "KeyType";
|
||||
static constexpr auto HASH = "HASH";
|
||||
static constexpr auto RANGE = "RANGE";
|
||||
static constexpr auto CREATION_DATE_TIME = "CreationDateTime";
|
||||
static constexpr auto TABLE_ID = "TableId";
|
||||
static constexpr auto TABLE_STATUS = "TableStatus";
|
||||
static constexpr auto ACTIVE = "ACTIVE";
|
||||
static constexpr auto ATTRIBUTES_TO_GET = "AttributesToGet";
|
||||
static constexpr auto KEY = "Key";
|
||||
static constexpr auto CONSISTENT_READ = "ConsistentRead";
|
||||
static constexpr auto ATTRS = "attrs";
|
||||
static constexpr auto ITEM = "Item";
|
||||
|
||||
static map_type attrs_type() {
|
||||
static auto t = map_type_impl::get_instance(utf8_type, utf8_type, true);
|
||||
return t;
|
||||
}
|
||||
|
||||
struct make_jsonable : public json::jsonable {
|
||||
Json::Value _value;
|
||||
public:
|
||||
explicit make_jsonable(Json::Value&& value) : _value(std::move(value)) {}
|
||||
virtual std::string to_json() const override {
|
||||
return _value.toStyledString();
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
* Full representation should cover:
|
||||
"B": blob,
|
||||
"BOOL": boolean,
|
||||
"BS": [ blob ],
|
||||
"L": [
|
||||
"AttributeValue"
|
||||
],
|
||||
"M": {
|
||||
"string" : "AttributeValue"
|
||||
},
|
||||
"N": "string",
|
||||
"NS": [ "string" ],
|
||||
"NULL": boolean,
|
||||
"S": "string",
|
||||
"SS": [ "string" ]
|
||||
|
||||
TODO(sarna): boost::bimap
|
||||
*/
|
||||
static data_type parse_type(sstring type) {
|
||||
static thread_local std::unordered_map<sstring, data_type> types = {
|
||||
{"S", utf8_type},
|
||||
{"B", bytes_type},
|
||||
{"BOOL", boolean_type},
|
||||
{"N", long_type}, //FIXME(sarna): It's actually a special generic number type, not long
|
||||
};
|
||||
auto it = types.find(type);
|
||||
if (it == types.end()) {
|
||||
throw std::runtime_error(format("Unknown type {}", type));
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
static sstring type_to_sstring(data_type type) {
|
||||
static thread_local std::unordered_map<data_type, sstring> types = {
|
||||
{utf8_type, "S"},
|
||||
{bytes_type, "B"},
|
||||
{boolean_type, "BOOL"},
|
||||
{long_type, "N"},
|
||||
};
|
||||
auto it = types.find(type);
|
||||
if (it == types.end()) {
|
||||
throw std::runtime_error(format("Unknown type {}", type->name()));
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
static void add_column(schema_builder& builder, sstring name, sstring type, column_kind kind) {
|
||||
builder.with_column(to_bytes(name), parse_type(type), kind);
|
||||
}
|
||||
|
||||
static void supplement_table_info(Json::Value& descr, const schema& schema) {
|
||||
descr[CREATION_DATE_TIME] = std::chrono::duration_cast<std::chrono::milliseconds>(gc_clock::now().time_since_epoch()).count();
|
||||
descr[TABLE_STATUS] = ACTIVE;
|
||||
descr[TABLE_ID] = schema.id().to_sstring().c_str();
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::create_table(sstring content) {
|
||||
Json::Value table_info = json::to_json_value(content);
|
||||
elogger.warn("Creating table {}", table_info.toStyledString());
|
||||
|
||||
sstring table_name = table_info[TABLE_NAME].asString();
|
||||
const Json::Value& key_schema = table_info[KEY_SCHEMA];
|
||||
const Json::Value& attribute_definitions = table_info[ATTRIBUTE_DEFINITIONS];
|
||||
|
||||
schema_builder builder(KEYSPACE, table_name);
|
||||
sstring pk_name;
|
||||
sstring ck_name;
|
||||
|
||||
for (const Json::Value& key_info : key_schema) {
|
||||
if (key_info[KEY_TYPE] == HASH) {
|
||||
if (!pk_name.empty()) {
|
||||
throw std::runtime_error(format("Only one partition key can be specified in {}", key_info.toStyledString()));
|
||||
}
|
||||
pk_name = key_info[ATTRIBUTE_NAME].asString();
|
||||
} else if (key_info[KEY_TYPE] == RANGE) {
|
||||
if (!ck_name.empty()) {
|
||||
throw std::runtime_error(format("Only one clustering key can be specified in {}", key_info.toStyledString()));
|
||||
}
|
||||
ck_name = key_info[ATTRIBUTE_NAME].asString();
|
||||
}
|
||||
}
|
||||
|
||||
for (const Json::Value& attribute_info : attribute_definitions) {
|
||||
sstring attr_name = attribute_info[ATTRIBUTE_NAME].asString();
|
||||
sstring attr_type = attribute_info[ATTRIBUTE_TYPE].asString();
|
||||
column_kind kind = (attr_name == pk_name) ? column_kind::partition_key : (attr_name == ck_name) ? column_kind::clustering_key : column_kind::regular_column;
|
||||
if (kind != column_kind::regular_column) {
|
||||
add_column(builder, attr_name, attr_type, kind);
|
||||
}
|
||||
}
|
||||
|
||||
builder.with_column(bytes(ATTRS), attrs_type(), column_kind::regular_column);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
|
||||
return _mm.announce_new_column_family(schema, false).then([table_info = std::move(table_info), schema] () mutable {
|
||||
Json::Value status(Json::objectValue);
|
||||
supplement_table_info(table_info, *schema);
|
||||
status[TABLE_DESCRIPTION] = std::move(table_info);
|
||||
return make_ready_future<json::json_return_type>(make_jsonable(std::move(status)));
|
||||
});
|
||||
}
|
||||
|
||||
static partition_key pk_from_json(const Json::Value& item, schema_ptr schema) {
|
||||
std::vector<bytes> raw_pk;
|
||||
for (const column_definition& cdef : schema->partition_key_columns()) {
|
||||
sstring value_str = item[cdef.name_as_text()][type_to_sstring(cdef.type)].asString();
|
||||
bytes raw_value = cdef.type->from_string(value_str);
|
||||
raw_pk.push_back(std::move(raw_value));
|
||||
}
|
||||
return partition_key::from_exploded(raw_pk);
|
||||
}
|
||||
|
||||
static clustering_key ck_from_json(const Json::Value& item, schema_ptr schema) {
|
||||
assert(schema->clustering_key_size() > 0);
|
||||
std::vector<bytes> raw_ck;
|
||||
for (const column_definition& cdef : schema->clustering_key_columns()) {
|
||||
sstring value_str = item[cdef.name_as_text()][type_to_sstring(cdef.type)].asString();
|
||||
bytes raw_value = cdef.type->from_string(value_str);
|
||||
raw_ck.push_back(std::move(raw_value));
|
||||
}
|
||||
|
||||
return clustering_key::from_exploded(raw_ck);
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::put_item(sstring content) {
|
||||
Json::Value update_info = json::to_json_value(content);
|
||||
elogger.debug("Updating value {}", update_info.toStyledString());
|
||||
|
||||
sstring table_name = update_info[TABLE_NAME].asString();
|
||||
const Json::Value& item = update_info[ITEM];
|
||||
schema_ptr schema = _proxy.get_db().local().find_schema(KEYSPACE, table_name);
|
||||
|
||||
partition_key pk = pk_from_json(item, schema);
|
||||
clustering_key ck = (schema->clustering_key_size() > 0) ? ck_from_json(item, schema) : clustering_key::make_empty();
|
||||
|
||||
mutation m(schema, pk);
|
||||
collection_type_impl::mutation attrs_mut;
|
||||
|
||||
for (auto it = item.begin(); it != item.end(); ++it) {
|
||||
bytes column_name = to_bytes(it.key().asString());
|
||||
const column_definition* cdef = schema->get_column_definition(column_name);
|
||||
if (!cdef || !cdef->is_primary_key()) {
|
||||
bytes value = utf8_type->decompose(sstring(it->toStyledString()));
|
||||
attrs_mut.cells.emplace_back(column_name, atomic_cell::make_live(*utf8_type, api::new_timestamp(), value, atomic_cell::collection_member::yes));
|
||||
}
|
||||
elogger.warn("{}: {}", it.key().asString(), it->toStyledString());
|
||||
}
|
||||
const column_definition* attrs_cdef = schema->get_column_definition(bytes(ATTRS));
|
||||
|
||||
auto serialized_map = attrs_type()->serialize_mutation_form(std::move(attrs_mut));
|
||||
m.set_cell(ck, *attrs_cdef, std::move(serialized_map));
|
||||
elogger.warn("Applying mutation {}", m);
|
||||
|
||||
return _proxy.mutate(std::vector<mutation>{std::move(m)}, db::consistency_level::QUORUM, db::no_timeout, tracing::trace_state_ptr(), empty_service_permit()).then([] () {
|
||||
return make_ready_future<json::json_return_type>("{}");
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
static Json::Value describe_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr<lw_shared_ptr<query::result>> query_result, std::unordered_set<sstring>&& attrs_to_get) {
|
||||
Json::Value item_descr(Json::objectValue);
|
||||
item_descr[ITEM] = Json::Value(Json::objectValue);
|
||||
|
||||
cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest());
|
||||
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
|
||||
|
||||
auto result_set = builder.build();
|
||||
for (auto& result_row : result_set->rows()) {
|
||||
const auto& columns = selection.get_columns();
|
||||
auto column_it = columns.begin();
|
||||
for (const bytes_opt& cell : result_row) {
|
||||
sstring column_name = (*column_it)->name_as_text();
|
||||
if (column_name != ATTRS) {
|
||||
if (attrs_to_get.count(column_name) > 0) {
|
||||
Json::Value& field = item_descr[ITEM][column_name.c_str()];
|
||||
field[type_to_sstring((*column_it)->type)] = json::to_json_value((*column_it)->type->to_json_string(cell));
|
||||
}
|
||||
} else if (cell) {
|
||||
auto deserialized = attrs_type()->deserialize(*cell, cql_serialization_format::latest());
|
||||
auto keys_and_values = value_cast<map_type_impl::native_type>(deserialized);
|
||||
for (auto entry : keys_and_values) {
|
||||
sstring attr_name = value_cast<sstring>(entry.first);
|
||||
if (attrs_to_get.count(attr_name) > 0) {
|
||||
item_descr[ITEM][attr_name] = json::to_json_value(value_cast<sstring>(entry.second));
|
||||
}
|
||||
}
|
||||
}
|
||||
++column_it;
|
||||
}
|
||||
}
|
||||
return item_descr;
|
||||
}
|
||||
|
||||
future<json::json_return_type> executor::get_item(sstring content) {
|
||||
Json::Value table_info = json::to_json_value(content);
|
||||
elogger.warn("Getting item {}", table_info.toStyledString());
|
||||
|
||||
sstring table_name = table_info[TABLE_NAME].asString();
|
||||
//FIXME(sarna): AttributesToGet is deprecated with more generic ProjectionExpression in the newest API
|
||||
Json::Value attributes_to_get = table_info[ATTRIBUTES_TO_GET];
|
||||
Json::Value query_key = table_info[KEY];
|
||||
db::consistency_level cl = table_info[CONSISTENT_READ].asBool() ? db::consistency_level::QUORUM : db::consistency_level::ONE;
|
||||
|
||||
schema_ptr schema = _proxy.get_db().local().find_schema(KEYSPACE, table_name);
|
||||
|
||||
partition_key pk = pk_from_json(query_key, schema);
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::global_partitioner().decorate_key(*schema, pk))};
|
||||
|
||||
std::vector<query::clustering_range> bounds;
|
||||
if (schema->clustering_key_size() == 0) {
|
||||
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
|
||||
} else {
|
||||
clustering_key ck = ck_from_json(query_key, schema);
|
||||
bounds.push_back(query::clustering_range::make_singular(std::move(ck)));
|
||||
}
|
||||
|
||||
//TODO(sarna): It would be better to fetch only some attributes of the map, not all
|
||||
query::column_id_vector regular_columns{schema->get_column_definition(bytes(ATTRS))->id};
|
||||
|
||||
auto selection = cql3::selection::selection::wildcard(schema);
|
||||
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
|
||||
|
||||
auto attrs_to_get = boost::copy_range<std::unordered_set<sstring>>(attributes_to_get | boost::adaptors::transformed(std::bind(&Json::Value::asString, std::placeholders::_1)));
|
||||
|
||||
return _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(db::no_timeout, empty_service_permit())).then(
|
||||
[schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = std::move(attrs_to_get)] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
return make_ready_future<json::json_return_type>(make_jsonable(describe_item(schema, partition_slice, *selection, std::move(qr.query_result), std::move(attrs_to_get))));
|
||||
});
|
||||
|
||||
return make_ready_future<json::json_return_type>("");
|
||||
}
|
||||
|
||||
future<> executor::start() {
|
||||
if (engine().cpu_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto ksm = keyspace_metadata::new_keyspace(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", {{"replication_factor", "1"}}, true);
|
||||
return _mm.announce_new_keyspace(ksm, api::min_timestamp, false).handle_exception_type([] (exceptions::already_exists_exception& ignored) {});
|
||||
}
|
||||
|
||||
}
|
||||
40
alternator/executor.hh
Normal file
40
alternator/executor.hh
Normal file
@@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* See the LICENSE.PROPRIETARY file in the top-level directory for licensing information.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/http/httpd.hh>
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/json/json_elements.hh>
|
||||
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
class executor {
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_manager& _mm;
|
||||
|
||||
public:
|
||||
static constexpr auto KEYSPACE = "alternator";
|
||||
|
||||
executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {}
|
||||
|
||||
future<json::json_return_type> create_table(sstring content);
|
||||
future<json::json_return_type> put_item(sstring content);
|
||||
future<json::json_return_type> get_item(sstring content);
|
||||
|
||||
future<> start();
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
}
|
||||
75
alternator/server.cc
Normal file
75
alternator/server.cc
Normal file
@@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* See the LICENSE.PROPRIETARY file in the top-level directory for licensing information.
|
||||
*/
|
||||
|
||||
#include "alternator/server.hh"
|
||||
#include "log.hh"
|
||||
#include <seastar/http/function_handlers.hh>
|
||||
#include <seastar/json/json_elements.hh>
|
||||
#include <seastarx.hh>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
using namespace httpd;
|
||||
|
||||
namespace alternator {
|
||||
|
||||
static constexpr auto TARGET = "X-Amz-Target";
|
||||
|
||||
inline std::vector<sstring> split(const sstring& text, const char* separator) {
|
||||
if (text == "") {
|
||||
return std::vector<sstring>();
|
||||
}
|
||||
std::vector<sstring> tokens;
|
||||
return boost::split(tokens, text, boost::is_any_of(separator));
|
||||
}
|
||||
|
||||
|
||||
void server::set_routes(routes& r) {
|
||||
function_handler* handler = new function_handler([this](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
slogger.warn("REQ {} {}", req->content, req->content_length);
|
||||
sstring target = req->get_header(TARGET);
|
||||
std::vector<sstring> split_target = split(target, ".");
|
||||
//NOTICE(sarna): Target consists of Dynamo API version folllowed by a dot '.' and operation type (e.g. CreateTable)
|
||||
sstring op = split_target.empty() ? sstring() : split_target.back();
|
||||
|
||||
slogger.warn("Got Request <{}>", op);
|
||||
if (op == "CreateTable") {
|
||||
return _executor.local().create_table(req->content);
|
||||
} else if (op == "PutItem") {
|
||||
return _executor.local().put_item(req->content);
|
||||
} else if (op == "GetItem") {
|
||||
return _executor.local().get_item(req->content);
|
||||
}
|
||||
throw std::runtime_error(format("Operation not supported: {}", req->content));
|
||||
});
|
||||
|
||||
r.add(operation_type::POST, url("/"), handler);
|
||||
}
|
||||
|
||||
future<> server::init(uint16_t port) {
|
||||
return _executor.invoke_on_all([] (executor& e) {
|
||||
return e.start();
|
||||
}).then([this] {
|
||||
return _control.start();
|
||||
}).then([this] {
|
||||
return _control.set_routes(std::bind(&server::set_routes, this, std::placeholders::_1));
|
||||
}).then([this, port] {
|
||||
return _control.listen(port);
|
||||
}).then([port] {
|
||||
slogger.info("Alternator HTTP server listening on port {}", port);
|
||||
}).handle_exception([port] (std::exception_ptr e) {
|
||||
slogger.warn("Failed to set up Alternator HTTP server on port {}: {}", port, e);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
34
alternator/server.hh
Normal file
34
alternator/server.hh
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* See the LICENSE.PROPRIETARY file in the top-level directory for licensing information.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "log.hh"
|
||||
#include "alternator/executor.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/http/httpd.hh>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
class server {
|
||||
seastar::httpd::http_server_control _control;
|
||||
seastar::sharded<executor>& _executor;
|
||||
public:
|
||||
static constexpr int DEFAULT_PORT = 8000;
|
||||
|
||||
server(seastar::sharded<executor>& executor) : _executor(executor) {}
|
||||
|
||||
seastar::future<> init(uint16_t port);
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -771,6 +771,8 @@ api = ['api/api.cc',
|
||||
'api/system.cc',
|
||||
'api/config.cc',
|
||||
'api/api-doc/config.json',
|
||||
'alternator/server.cc',
|
||||
'alternator/executor.cc',
|
||||
]
|
||||
|
||||
idls = ['idl/gossip_digest.idl.hh',
|
||||
|
||||
9
main.cc
9
main.cc
@@ -71,6 +71,8 @@
|
||||
#include "distributed_loader.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
#include "alternator/server.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
seastar::metrics::metric_groups app_metrics;
|
||||
@@ -1059,6 +1061,13 @@ int main(int ac, char** av) {
|
||||
return service::get_local_storage_service().start_rpc_server();
|
||||
}).get();
|
||||
}
|
||||
|
||||
static sharded<alternator::executor> alternator_executor;
|
||||
alternator_executor.start(std::ref(proxy), std::ref(mm)).get();
|
||||
static alternator::server alternator_server(alternator_executor);
|
||||
alternator_server.init(alternator::server::DEFAULT_PORT).get();
|
||||
startlog.info("Alternator server listening on {}", alternator::server::DEFAULT_PORT);
|
||||
|
||||
if (cfg->defragment_memory_on_idle()) {
|
||||
smp::invoke_on_all([] () {
|
||||
engine().set_idle_cpu_handler([] (reactor::work_waiting_on_reactor check_for_work) {
|
||||
|
||||
Reference in New Issue
Block a user