From 2ec78164bc8122d72128a01acb8daa043aadd722 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Thu, 18 Apr 2019 11:19:56 +0200 Subject: [PATCH] alternator: add minimal HTTP interface The interface works on port 8000 by default and provides the most basic alternator operations - it's an incomplete set without validation, meant to allow testing as early as possible. --- alternator/executor.cc | 309 +++++++++++++++++++++++++++++++++++++++++ alternator/executor.hh | 40 ++++++ alternator/server.cc | 75 ++++++++++ alternator/server.hh | 34 +++++ configure.py | 2 + main.cc | 9 ++ 6 files changed, 469 insertions(+) create mode 100644 alternator/executor.cc create mode 100644 alternator/executor.hh create mode 100644 alternator/server.cc create mode 100644 alternator/server.hh diff --git a/alternator/executor.cc b/alternator/executor.cc new file mode 100644 index 0000000000..b550967148 --- /dev/null +++ b/alternator/executor.cc @@ -0,0 +1,309 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * See the LICENSE.PROPRIETARY file in the top-level directory for licensing information. + */ + +#include "alternator/executor.hh" +#include "log.hh" +#include "json.hh" +#include "schema_builder.hh" +#include "exceptions/exceptions.hh" +#include "timestamp.hh" +#include "database.hh" +#include "types/map.hh" +#include "schema.hh" +#include "query-request.hh" +#include "query-result-reader.hh" +#include "cql3/selection/selection.hh" +#include "cql3/result_set.hh" +#include "bytes.hh" +#include "cql3/update_parameters.hh" + +static logging::logger elogger("alternator-executor"); + +namespace alternator { + +static constexpr auto ATTRIBUTE_DEFINITIONS = "AttributeDefinitions"; +static constexpr auto KEY_SCHEMA = "KeySchema"; +static constexpr auto TABLE_NAME = "TableName"; +static constexpr auto TABLE_DESCRIPTION = "TableDescription"; +static constexpr auto ATTRIBUTE_NAME = "AttributeName"; +static constexpr auto ATTRIBUTE_TYPE = "AttributeType"; +static constexpr auto KEY_TYPE = "KeyType"; +static constexpr auto HASH = "HASH"; +static constexpr auto RANGE = "RANGE"; +static constexpr auto CREATION_DATE_TIME = "CreationDateTime"; +static constexpr auto TABLE_ID = "TableId"; +static constexpr auto TABLE_STATUS = "TableStatus"; +static constexpr auto ACTIVE = "ACTIVE"; +static constexpr auto ATTRIBUTES_TO_GET = "AttributesToGet"; +static constexpr auto KEY = "Key"; +static constexpr auto CONSISTENT_READ = "ConsistentRead"; +static constexpr auto ATTRS = "attrs"; +static constexpr auto ITEM = "Item"; + +static map_type attrs_type() { + static auto t = map_type_impl::get_instance(utf8_type, utf8_type, true); + return t; +} + +struct make_jsonable : public json::jsonable { + Json::Value _value; +public: + explicit make_jsonable(Json::Value&& value) : _value(std::move(value)) {} + virtual std::string to_json() const override { + return _value.toStyledString(); + } +}; + +/* + * Full representation should cover: + "B": blob, + "BOOL": boolean, + "BS": [ blob ], + "L": [ + "AttributeValue" + ], + "M": { + "string" : "AttributeValue" + }, + "N": "string", + "NS": [ "string" ], + "NULL": boolean, + "S": "string", + "SS": [ "string" ] + + TODO(sarna): boost::bimap + */ +static data_type parse_type(sstring type) { + static thread_local std::unordered_map types = { + {"S", utf8_type}, + {"B", bytes_type}, + {"BOOL", boolean_type}, + {"N", long_type}, //FIXME(sarna): It's actually a special generic number type, not long + }; + auto it = types.find(type); + if (it == types.end()) { + throw std::runtime_error(format("Unknown type {}", type)); + } + return it->second; +} + +static sstring type_to_sstring(data_type type) { + static thread_local std::unordered_map types = { + {utf8_type, "S"}, + {bytes_type, "B"}, + {boolean_type, "BOOL"}, + {long_type, "N"}, + }; + auto it = types.find(type); + if (it == types.end()) { + throw std::runtime_error(format("Unknown type {}", type->name())); + } + return it->second; +} + +static void add_column(schema_builder& builder, sstring name, sstring type, column_kind kind) { + builder.with_column(to_bytes(name), parse_type(type), kind); +} + +static void supplement_table_info(Json::Value& descr, const schema& schema) { + descr[CREATION_DATE_TIME] = std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count(); + descr[TABLE_STATUS] = ACTIVE; + descr[TABLE_ID] = schema.id().to_sstring().c_str(); +} + +future executor::create_table(sstring content) { + Json::Value table_info = json::to_json_value(content); + elogger.warn("Creating table {}", table_info.toStyledString()); + + sstring table_name = table_info[TABLE_NAME].asString(); + const Json::Value& key_schema = table_info[KEY_SCHEMA]; + const Json::Value& attribute_definitions = table_info[ATTRIBUTE_DEFINITIONS]; + + schema_builder builder(KEYSPACE, table_name); + sstring pk_name; + sstring ck_name; + + for (const Json::Value& key_info : key_schema) { + if (key_info[KEY_TYPE] == HASH) { + if (!pk_name.empty()) { + throw std::runtime_error(format("Only one partition key can be specified in {}", key_info.toStyledString())); + } + pk_name = key_info[ATTRIBUTE_NAME].asString(); + } else if (key_info[KEY_TYPE] == RANGE) { + if (!ck_name.empty()) { + throw std::runtime_error(format("Only one clustering key can be specified in {}", key_info.toStyledString())); + } + ck_name = key_info[ATTRIBUTE_NAME].asString(); + } + } + + for (const Json::Value& attribute_info : attribute_definitions) { + sstring attr_name = attribute_info[ATTRIBUTE_NAME].asString(); + sstring attr_type = attribute_info[ATTRIBUTE_TYPE].asString(); + column_kind kind = (attr_name == pk_name) ? column_kind::partition_key : (attr_name == ck_name) ? column_kind::clustering_key : column_kind::regular_column; + if (kind != column_kind::regular_column) { + add_column(builder, attr_name, attr_type, kind); + } + } + + builder.with_column(bytes(ATTRS), attrs_type(), column_kind::regular_column); + + schema_ptr schema = builder.build(); + + return _mm.announce_new_column_family(schema, false).then([table_info = std::move(table_info), schema] () mutable { + Json::Value status(Json::objectValue); + supplement_table_info(table_info, *schema); + status[TABLE_DESCRIPTION] = std::move(table_info); + return make_ready_future(make_jsonable(std::move(status))); + }); +} + +static partition_key pk_from_json(const Json::Value& item, schema_ptr schema) { + std::vector raw_pk; + for (const column_definition& cdef : schema->partition_key_columns()) { + sstring value_str = item[cdef.name_as_text()][type_to_sstring(cdef.type)].asString(); + bytes raw_value = cdef.type->from_string(value_str); + raw_pk.push_back(std::move(raw_value)); + } + return partition_key::from_exploded(raw_pk); +} + +static clustering_key ck_from_json(const Json::Value& item, schema_ptr schema) { + assert(schema->clustering_key_size() > 0); + std::vector raw_ck; + for (const column_definition& cdef : schema->clustering_key_columns()) { + sstring value_str = item[cdef.name_as_text()][type_to_sstring(cdef.type)].asString(); + bytes raw_value = cdef.type->from_string(value_str); + raw_ck.push_back(std::move(raw_value)); + } + + return clustering_key::from_exploded(raw_ck); +} + +future executor::put_item(sstring content) { + Json::Value update_info = json::to_json_value(content); + elogger.debug("Updating value {}", update_info.toStyledString()); + + sstring table_name = update_info[TABLE_NAME].asString(); + const Json::Value& item = update_info[ITEM]; + schema_ptr schema = _proxy.get_db().local().find_schema(KEYSPACE, table_name); + + partition_key pk = pk_from_json(item, schema); + clustering_key ck = (schema->clustering_key_size() > 0) ? ck_from_json(item, schema) : clustering_key::make_empty(); + + mutation m(schema, pk); + collection_type_impl::mutation attrs_mut; + + for (auto it = item.begin(); it != item.end(); ++it) { + bytes column_name = to_bytes(it.key().asString()); + const column_definition* cdef = schema->get_column_definition(column_name); + if (!cdef || !cdef->is_primary_key()) { + bytes value = utf8_type->decompose(sstring(it->toStyledString())); + attrs_mut.cells.emplace_back(column_name, atomic_cell::make_live(*utf8_type, api::new_timestamp(), value, atomic_cell::collection_member::yes)); + } + elogger.warn("{}: {}", it.key().asString(), it->toStyledString()); + } + const column_definition* attrs_cdef = schema->get_column_definition(bytes(ATTRS)); + + auto serialized_map = attrs_type()->serialize_mutation_form(std::move(attrs_mut)); + m.set_cell(ck, *attrs_cdef, std::move(serialized_map)); + elogger.warn("Applying mutation {}", m); + + return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::QUORUM, db::no_timeout, tracing::trace_state_ptr(), empty_service_permit()).then([] () { + return make_ready_future("{}"); + }); + +} + +static Json::Value describe_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr> query_result, std::unordered_set&& attrs_to_get) { + Json::Value item_descr(Json::objectValue); + item_descr[ITEM] = Json::Value(Json::objectValue); + + cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest()); + query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); + + auto result_set = builder.build(); + for (auto& result_row : result_set->rows()) { + const auto& columns = selection.get_columns(); + auto column_it = columns.begin(); + for (const bytes_opt& cell : result_row) { + sstring column_name = (*column_it)->name_as_text(); + if (column_name != ATTRS) { + if (attrs_to_get.count(column_name) > 0) { + Json::Value& field = item_descr[ITEM][column_name.c_str()]; + field[type_to_sstring((*column_it)->type)] = json::to_json_value((*column_it)->type->to_json_string(cell)); + } + } else if (cell) { + auto deserialized = attrs_type()->deserialize(*cell, cql_serialization_format::latest()); + auto keys_and_values = value_cast(deserialized); + for (auto entry : keys_and_values) { + sstring attr_name = value_cast(entry.first); + if (attrs_to_get.count(attr_name) > 0) { + item_descr[ITEM][attr_name] = json::to_json_value(value_cast(entry.second)); + } + } + } + ++column_it; + } + } + return item_descr; +} + +future executor::get_item(sstring content) { + Json::Value table_info = json::to_json_value(content); + elogger.warn("Getting item {}", table_info.toStyledString()); + + sstring table_name = table_info[TABLE_NAME].asString(); + //FIXME(sarna): AttributesToGet is deprecated with more generic ProjectionExpression in the newest API + Json::Value attributes_to_get = table_info[ATTRIBUTES_TO_GET]; + Json::Value query_key = table_info[KEY]; + db::consistency_level cl = table_info[CONSISTENT_READ].asBool() ? db::consistency_level::QUORUM : db::consistency_level::ONE; + + schema_ptr schema = _proxy.get_db().local().find_schema(KEYSPACE, table_name); + + partition_key pk = pk_from_json(query_key, schema); + dht::partition_range_vector partition_ranges{dht::partition_range(dht::global_partitioner().decorate_key(*schema, pk))}; + + std::vector bounds; + if (schema->clustering_key_size() == 0) { + bounds.push_back(query::clustering_range::make_open_ended_both_sides()); + } else { + clustering_key ck = ck_from_json(query_key, schema); + bounds.push_back(query::clustering_range::make_singular(std::move(ck))); + } + + //TODO(sarna): It would be better to fetch only some attributes of the map, not all + query::column_id_vector regular_columns{schema->get_column_definition(bytes(ATTRS))->id}; + + auto selection = cql3::selection::selection::wildcard(schema); + + auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); + auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); + + auto attrs_to_get = boost::copy_range>(attributes_to_get | boost::adaptors::transformed(std::bind(&Json::Value::asString, std::placeholders::_1))); + + return _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(db::no_timeout, empty_service_permit())).then( + [schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = std::move(attrs_to_get)] (service::storage_proxy::coordinator_query_result qr) mutable { + return make_ready_future(make_jsonable(describe_item(schema, partition_slice, *selection, std::move(qr.query_result), std::move(attrs_to_get)))); + }); + + return make_ready_future(""); +} + +future<> executor::start() { + if (engine().cpu_id() != 0) { + return make_ready_future<>(); + } + + auto ksm = keyspace_metadata::new_keyspace(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", {{"replication_factor", "1"}}, true); + return _mm.announce_new_keyspace(ksm, api::min_timestamp, false).handle_exception_type([] (exceptions::already_exists_exception& ignored) {}); +} + +} diff --git a/alternator/executor.hh b/alternator/executor.hh new file mode 100644 index 0000000000..3b1b2da2fc --- /dev/null +++ b/alternator/executor.hh @@ -0,0 +1,40 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * See the LICENSE.PROPRIETARY file in the top-level directory for licensing information. + */ + +#pragma once + +#include +#include +#include "seastarx.hh" +#include + +#include "service/storage_proxy.hh" +#include "service/migration_manager.hh" + +namespace alternator { + +class executor { + service::storage_proxy& _proxy; + service::migration_manager& _mm; + +public: + static constexpr auto KEYSPACE = "alternator"; + + executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {} + + future create_table(sstring content); + future put_item(sstring content); + future get_item(sstring content); + + future<> start(); + future<> stop() { return make_ready_future<>(); } +}; + +} diff --git a/alternator/server.cc b/alternator/server.cc new file mode 100644 index 0000000000..e089d80c49 --- /dev/null +++ b/alternator/server.cc @@ -0,0 +1,75 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * See the LICENSE.PROPRIETARY file in the top-level directory for licensing information. + */ + +#include "alternator/server.hh" +#include "log.hh" +#include +#include +#include +#include +#include + +static logging::logger slogger("alternator-server"); + +using namespace httpd; + +namespace alternator { + +static constexpr auto TARGET = "X-Amz-Target"; + +inline std::vector split(const sstring& text, const char* separator) { + if (text == "") { + return std::vector(); + } + std::vector tokens; + return boost::split(tokens, text, boost::is_any_of(separator)); +} + + +void server::set_routes(routes& r) { + function_handler* handler = new function_handler([this](std::unique_ptr req) -> future { + slogger.warn("REQ {} {}", req->content, req->content_length); + sstring target = req->get_header(TARGET); + std::vector split_target = split(target, "."); + //NOTICE(sarna): Target consists of Dynamo API version folllowed by a dot '.' and operation type (e.g. CreateTable) + sstring op = split_target.empty() ? sstring() : split_target.back(); + + slogger.warn("Got Request <{}>", op); + if (op == "CreateTable") { + return _executor.local().create_table(req->content); + } else if (op == "PutItem") { + return _executor.local().put_item(req->content); + } else if (op == "GetItem") { + return _executor.local().get_item(req->content); + } + throw std::runtime_error(format("Operation not supported: {}", req->content)); + }); + + r.add(operation_type::POST, url("/"), handler); +} + +future<> server::init(uint16_t port) { + return _executor.invoke_on_all([] (executor& e) { + return e.start(); + }).then([this] { + return _control.start(); + }).then([this] { + return _control.set_routes(std::bind(&server::set_routes, this, std::placeholders::_1)); + }).then([this, port] { + return _control.listen(port); + }).then([port] { + slogger.info("Alternator HTTP server listening on port {}", port); + }).handle_exception([port] (std::exception_ptr e) { + slogger.warn("Failed to set up Alternator HTTP server on port {}: {}", port, e); + }); +} + +} + diff --git a/alternator/server.hh b/alternator/server.hh new file mode 100644 index 0000000000..a5117eec80 --- /dev/null +++ b/alternator/server.hh @@ -0,0 +1,34 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * See the LICENSE.PROPRIETARY file in the top-level directory for licensing information. + */ + +#pragma once + +#include "log.hh" +#include "alternator/executor.hh" +#include +#include + +namespace alternator { + +class server { + seastar::httpd::http_server_control _control; + seastar::sharded& _executor; +public: + static constexpr int DEFAULT_PORT = 8000; + + server(seastar::sharded& executor) : _executor(executor) {} + + seastar::future<> init(uint16_t port); +private: + void set_routes(seastar::httpd::routes& r); +}; + +} + diff --git a/configure.py b/configure.py index ece836c33c..3cce54ff31 100755 --- a/configure.py +++ b/configure.py @@ -771,6 +771,8 @@ api = ['api/api.cc', 'api/system.cc', 'api/config.cc', 'api/api-doc/config.json', + 'alternator/server.cc', + 'alternator/executor.cc', ] idls = ['idl/gossip_digest.idl.hh', diff --git a/main.cc b/main.cc index 2b7db02139..d3e462c392 100644 --- a/main.cc +++ b/main.cc @@ -71,6 +71,8 @@ #include "distributed_loader.hh" #include "cql3/cql_config.hh" +#include "alternator/server.hh" + namespace fs = std::filesystem; seastar::metrics::metric_groups app_metrics; @@ -1059,6 +1061,13 @@ int main(int ac, char** av) { return service::get_local_storage_service().start_rpc_server(); }).get(); } + + static sharded alternator_executor; + alternator_executor.start(std::ref(proxy), std::ref(mm)).get(); + static alternator::server alternator_server(alternator_executor); + alternator_server.init(alternator::server::DEFAULT_PORT).get(); + startlog.info("Alternator server listening on {}", alternator::server::DEFAULT_PORT); + if (cfg->defragment_memory_on_idle()) { smp::invoke_on_all([] () { engine().set_idle_cpu_handler([] (reactor::work_waiting_on_reactor check_for_work) {