diff --git a/alternator/executor.cc b/alternator/executor.cc index 9b98b781ed..d612a3fd75 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -214,7 +214,7 @@ static std::string get_string_attribute(const rjson::value& value, rjson::string } -future executor::describe_table(std::string content) { +future executor::describe_table(client_state& client_state, std::string content) { _stats.api_operations.describe_table++; rjson::value request = rjson::parse(content); elogger.trace("Describing table {}", request); @@ -242,7 +242,7 @@ future executor::describe_table(std::string content) { return make_ready_future(make_jsonable(std::move(response))); } -future executor::delete_table(std::string content) { +future executor::delete_table(client_state& client_state, std::string content) { _stats.api_operations.delete_table++; rjson::value request = rjson::parse(content); elogger.trace("Deleting table {}", request); @@ -336,7 +336,7 @@ static std::pair parse_key_schema(const rjson::value& } -future executor::create_table(std::string content) { +future executor::create_table(client_state& client_state, std::string content) { _stats.api_operations.create_table++; rjson::value table_info = rjson::parse(content); elogger.trace("Creating table {}", table_info); @@ -514,7 +514,7 @@ static db::timeout_clock::time_point default_timeout() { return db::timeout_clock::now() + 10s; } -future executor::put_item(std::string content) { +future executor::put_item(client_state& client_state, std::string content) { _stats.api_operations.put_item++; rjson::value update_info = rjson::parse(content); elogger.trace("Updating value {}", update_info); @@ -550,7 +550,7 @@ static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr sc return m; } -future executor::delete_item(std::string content) { +future executor::delete_item(client_state& client_state, std::string content) { _stats.api_operations.delete_item++; rjson::value update_info = rjson::parse(content); @@ -591,7 +591,7 @@ struct primary_key_equal { } }; -future executor::batch_write_item(std::string content) { +future executor::batch_write_item(client_state& client_state, std::string content) { _stats.api_operations.batch_write_item++; rjson::value batch_info = rjson::parse(content); rjson::value& request_items = batch_info["RequestItems"]; @@ -1172,7 +1172,7 @@ static future> maybe_get_previous_item(service::st }); } -future executor::update_item(std::string content) { +future executor::update_item(client_state& client_state, std::string content) { _stats.api_operations.update_item++; rjson::value update_info = rjson::parse(content); elogger.trace("update_item {}", update_info); @@ -1350,7 +1350,7 @@ static db::consistency_level get_read_consistency(const rjson::value& request) { return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE; } -future executor::get_item(std::string content) { +future executor::get_item(client_state& client_state, std::string content) { _stats.api_operations.get_item++; rjson::value table_info = rjson::parse(content); elogger.trace("Getting item {}", table_info); @@ -1388,7 +1388,7 @@ future executor::get_item(std::string content) { }); } -future executor::batch_get_item(std::string content) { +future executor::batch_get_item(client_state& client_state, std::string content) { // FIXME: In this implementation, an unbounded batch size can cause // unbounded response JSON object to be buffered in memory, unbounded // parallelism of the requests, and unbounded amount of non-preemptable @@ -1579,6 +1579,7 @@ static future do_query(schema_ptr schema, uint32_t limit, db::consistency_level cl, ::shared_ptr filtering_restrictions, + service::client_state& client_state, cql3::cql_stats& cql_stats) { ::shared_ptr paging_state = nullptr; @@ -1596,18 +1597,16 @@ static future do_query(schema_ptr schema, auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options()); auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); - //FIXME(sarna): This context will need to be provided once we start gathering statistics, authenticating, etc. Right now these are just stubs. - static thread_local service::client_state dummy_client_state{service::client_state::internal_tag()}; - static thread_local service::query_state dummy_query_state(dummy_client_state, empty_service_permit()); + auto query_state_ptr = std::make_unique(client_state, empty_service_permit()); command->slice.options.set(); auto query_options = std::make_unique(cl, infinite_timeout_config, std::vector{}); query_options = std::make_unique(std::move(query_options), std::move(paging_state)); - auto p = service::pager::query_pagers::pager(schema, selection, dummy_query_state, *query_options, command, std::move(partition_ranges), cql_stats, filtering_restrictions); + auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), cql_stats, filtering_restrictions); return p->fetch_page(limit, gc_clock::now(), default_timeout()).then( [p, schema, cql_stats, partition_slice = std::move(partition_slice), - selection = std::move(selection), + selection = std::move(selection), query_state_ptr = std::move(query_state_ptr), attrs_to_get = std::move(attrs_to_get), query_options = std::move(query_options), filtering_restrictions = std::move(filtering_restrictions)] (std::unique_ptr rs) mutable { @@ -1630,7 +1629,7 @@ static future do_query(schema_ptr schema, // 2. Filtering - by passing appropriately created restrictions to pager as a last parameter // 3. Proper timeouts instead of gc_clock::now() and db::no_timeout // 4. Implement parallel scanning via Segments -future executor::scan(std::string content) { +future executor::scan(client_state& client_state, std::string content) { _stats.api_operations.scan++; rjson::value request_info = rjson::parse(content); elogger.trace("Scanning {}", request_info); @@ -1659,7 +1658,7 @@ future executor::scan(std::string content) { partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options); ck_bounds = filtering_restrictions->get_clustering_bounds(query_options); } - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), _stats.cql_stats); + return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats); } static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) { @@ -1782,7 +1781,7 @@ calculate_bounds(schema_ptr schema, const rjson::value& conditions) { return {std::move(partition_ranges), std::move(ck_bounds)}; } -future executor::query(std::string content) { +future executor::query(client_state& client_state, std::string content) { _stats.api_operations.query++; rjson::value request_info = rjson::parse(content); elogger.trace("Querying {}", request_info); @@ -1818,7 +1817,7 @@ future executor::query(std::string content) { throw api_error("ValidationException", format("QueryFilter can only contain non-primary key attributes: Primary key attribute: {}", ck_defs.front()->name_as_text())); } } - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), _stats.cql_stats); + return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats); } static void validate_limit(int limit) { @@ -1827,7 +1826,7 @@ static void validate_limit(int limit) { } } -future executor::list_tables(std::string content) { +future executor::list_tables(client_state& client_state, std::string content) { _stats.api_operations.list_tables++; rjson::value table_info = rjson::parse(content); elogger.trace("Listing tables {}", table_info); @@ -1878,7 +1877,7 @@ future executor::list_tables(std::string content) { return make_ready_future(make_jsonable(std::move(response))); } -future executor::describe_endpoints(std::string content, std::string host_header) { +future executor::describe_endpoints(client_state& client_state, std::string content, std::string host_header) { _stats.api_operations.describe_endpoints++; rjson::value response = rjson::empty_object(); // Without having any configuration parameter to say otherwise, we tell diff --git a/alternator/executor.hh b/alternator/executor.hh index 4cdcb69236..b64da17c72 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -17,6 +17,7 @@ #include "service/storage_proxy.hh" #include "service/migration_manager.hh" +#include "service/client_state.hh" #include "stats.hh" @@ -25,26 +26,28 @@ namespace alternator { class executor { service::storage_proxy& _proxy; service::migration_manager& _mm; + public: + using client_state = service::client_state; stats _stats; static constexpr auto ATTRS_COLUMN_NAME = "attrs"; static constexpr auto KEYSPACE_NAME = "alternator"; executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {} - future create_table(std::string content); - future describe_table(std::string content); - future delete_table(std::string content); - future put_item(std::string content); - future get_item(std::string content); - future delete_item(std::string content); - future update_item(std::string content); - future list_tables(std::string content); - future scan(std::string content); - future describe_endpoints(std::string content, std::string host_header); - future batch_write_item(std::string content); - future batch_get_item(std::string content); - future query(std::string content); + future create_table(client_state& client_state, std::string content); + future describe_table(client_state& client_state, std::string content); + future delete_table(client_state& client_state, std::string content); + future put_item(client_state& client_state, std::string content); + future get_item(client_state& client_state, std::string content); + future delete_item(client_state& client_state, std::string content); + future update_item(client_state& client_state, std::string content); + future list_tables(client_state& client_state, std::string content); + future scan(client_state& client_state, std::string content); + future describe_endpoints(client_state& client_state, std::string content, std::string host_header); + future batch_write_item(client_state& client_state, std::string content); + future batch_get_item(client_state& client_state, std::string content); + future query(client_state& client_state, std::string content); future<> start(); future<> stop() { return make_ready_future<>(); } diff --git a/alternator/server.cc b/alternator/server.cc index b3d8531c05..637be238f4 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -97,21 +97,23 @@ protected: }; void server::set_routes(routes& r) { - using alternator_callback = std::function(executor&, std::unique_ptr)>; + using alternator_callback = std::function(executor&, executor::client_state&, std::unique_ptr)>; std::unordered_map routes{ - {"CreateTable", [] (executor& e, std::unique_ptr req) { return e.maybe_create_keyspace().then([&e, req = std::move(req)] { return e.create_table(req->content); }); }}, - {"DescribeTable", [] (executor& e, std::unique_ptr req) { return e.describe_table(req->content); }}, - {"DeleteTable", [] (executor& e, std::unique_ptr req) { return e.delete_table(req->content); }}, - {"PutItem", [] (executor& e, std::unique_ptr req) { return e.put_item(req->content); }}, - {"UpdateItem", [] (executor& e, std::unique_ptr req) { return e.update_item(req->content); }}, - {"GetItem", [] (executor& e, std::unique_ptr req) { return e.get_item(req->content); }}, - {"DeleteItem", [] (executor& e, std::unique_ptr req) { return e.delete_item(req->content); }}, - {"ListTables", [] (executor& e, std::unique_ptr req) { return e.list_tables(req->content); }}, - {"Scan", [] (executor& e, std::unique_ptr req) { return e.scan(req->content); }}, - {"DescribeEndpoints", [] (executor& e, std::unique_ptr req) { return e.describe_endpoints(req->content, req->get_header("Host")); }}, - {"BatchWriteItem", [] (executor& e, std::unique_ptr req) { return e.batch_write_item(req->content); }}, - {"BatchGetItem", [] (executor& e, std::unique_ptr req) { return e.batch_get_item(req->content); }}, - {"Query", [] (executor& e, std::unique_ptr req) { return e.query(req->content); }}, + {"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { + return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); } + }, + {"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.describe_table(client_state, req->content); }}, + {"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.delete_table(client_state, req->content); }}, + {"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.put_item(client_state, req->content); }}, + {"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.update_item(client_state, req->content); }}, + {"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.get_item(client_state, req->content); }}, + {"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.delete_item(client_state, req->content); }}, + {"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.list_tables(client_state, req->content); }}, + {"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.scan(client_state, req->content); }}, + {"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }}, + {"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.batch_write_item(client_state, req->content); }}, + {"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.batch_get_item(client_state, req->content); }}, + {"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.query(client_state, req->content); }}, }; api_handler* handler = new api_handler([this, routes = std::move(routes)](std::unique_ptr req) -> future { @@ -129,7 +131,12 @@ void server::set_routes(routes& r) { throw api_error("UnknownOperationException", format("Unsupported operation {}", op)); } - return callback_it->second(_executor.local(), std::move(req)); + //FIXME: Client state can provide more context, e.g. client's endpoint address + // We use unique_ptr because client_state cannot be moved or copied + return do_with(std::make_unique(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), req = std::move(req)] (std::unique_ptr& client_state) mutable { + client_state->set_raw_keyspace(executor::KEYSPACE_NAME); + return callback_it->second(_executor.local(), *client_state, std::move(req)); + }); }); r.add(operation_type::POST, url("/"), handler);