alternator: add client state

Keeping an instance of client_state is a convenient way of being able
to use tracing for alternator. It's also currently used in paging,
so adding a client state to executor removes the need of keeping
a dummy value.
This commit is contained in:
Piotr Sarna
2019-08-22 10:46:55 +02:00
committed by Avi Kivity
parent 1ca9dc5d47
commit 6c8c31bfc9
3 changed files with 57 additions and 48 deletions

View File

@@ -214,7 +214,7 @@ static std::string get_string_attribute(const rjson::value& value, rjson::string
}
future<json::json_return_type> executor::describe_table(std::string content) {
future<json::json_return_type> executor::describe_table(client_state& client_state, std::string content) {
_stats.api_operations.describe_table++;
rjson::value request = rjson::parse(content);
elogger.trace("Describing table {}", request);
@@ -242,7 +242,7 @@ future<json::json_return_type> executor::describe_table(std::string content) {
return make_ready_future<json::json_return_type>(make_jsonable(std::move(response)));
}
future<json::json_return_type> executor::delete_table(std::string content) {
future<json::json_return_type> executor::delete_table(client_state& client_state, std::string content) {
_stats.api_operations.delete_table++;
rjson::value request = rjson::parse(content);
elogger.trace("Deleting table {}", request);
@@ -336,7 +336,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
}
future<json::json_return_type> executor::create_table(std::string content) {
future<json::json_return_type> executor::create_table(client_state& client_state, std::string content) {
_stats.api_operations.create_table++;
rjson::value table_info = rjson::parse(content);
elogger.trace("Creating table {}", table_info);
@@ -514,7 +514,7 @@ static db::timeout_clock::time_point default_timeout() {
return db::timeout_clock::now() + 10s;
}
future<json::json_return_type> executor::put_item(std::string content) {
future<json::json_return_type> executor::put_item(client_state& client_state, std::string content) {
_stats.api_operations.put_item++;
rjson::value update_info = rjson::parse(content);
elogger.trace("Updating value {}", update_info);
@@ -550,7 +550,7 @@ static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr sc
return m;
}
future<json::json_return_type> executor::delete_item(std::string content) {
future<json::json_return_type> executor::delete_item(client_state& client_state, std::string content) {
_stats.api_operations.delete_item++;
rjson::value update_info = rjson::parse(content);
@@ -591,7 +591,7 @@ struct primary_key_equal {
}
};
future<json::json_return_type> executor::batch_write_item(std::string content) {
future<json::json_return_type> executor::batch_write_item(client_state& client_state, std::string content) {
_stats.api_operations.batch_write_item++;
rjson::value batch_info = rjson::parse(content);
rjson::value& request_items = batch_info["RequestItems"];
@@ -1172,7 +1172,7 @@ static future<std::unique_ptr<rjson::value>> maybe_get_previous_item(service::st
});
}
future<json::json_return_type> executor::update_item(std::string content) {
future<json::json_return_type> executor::update_item(client_state& client_state, std::string content) {
_stats.api_operations.update_item++;
rjson::value update_info = rjson::parse(content);
elogger.trace("update_item {}", update_info);
@@ -1350,7 +1350,7 @@ static db::consistency_level get_read_consistency(const rjson::value& request) {
return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE;
}
future<json::json_return_type> executor::get_item(std::string content) {
future<json::json_return_type> executor::get_item(client_state& client_state, std::string content) {
_stats.api_operations.get_item++;
rjson::value table_info = rjson::parse(content);
elogger.trace("Getting item {}", table_info);
@@ -1388,7 +1388,7 @@ future<json::json_return_type> executor::get_item(std::string content) {
});
}
future<json::json_return_type> executor::batch_get_item(std::string content) {
future<json::json_return_type> executor::batch_get_item(client_state& client_state, std::string content) {
// FIXME: In this implementation, an unbounded batch size can cause
// unbounded response JSON object to be buffered in memory, unbounded
// parallelism of the requests, and unbounded amount of non-preemptable
@@ -1579,6 +1579,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
uint32_t limit,
db::consistency_level cl,
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions,
service::client_state& client_state,
cql3::cql_stats& cql_stats) {
::shared_ptr<service::pager::paging_state> paging_state = nullptr;
@@ -1596,18 +1597,16 @@ static future<json::json_return_type> do_query(schema_ptr schema,
auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, query::max_partitions);
//FIXME(sarna): This context will need to be provided once we start gathering statistics, authenticating, etc. Right now these are just stubs.
static thread_local service::client_state dummy_client_state{service::client_state::internal_tag()};
static thread_local service::query_state dummy_query_state(dummy_client_state, empty_service_permit());
auto query_state_ptr = std::make_unique<service::query_state>(client_state, empty_service_permit());
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, infinite_timeout_config, std::vector<cql3::raw_value>{});
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
auto p = service::pager::query_pagers::pager(schema, selection, dummy_query_state, *query_options, command, std::move(partition_ranges), cql_stats, filtering_restrictions);
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), cql_stats, filtering_restrictions);
return p->fetch_page(limit, gc_clock::now(), default_timeout()).then(
[p, schema, cql_stats, partition_slice = std::move(partition_slice),
selection = std::move(selection),
selection = std::move(selection), query_state_ptr = std::move(query_state_ptr),
attrs_to_get = std::move(attrs_to_get),
query_options = std::move(query_options),
filtering_restrictions = std::move(filtering_restrictions)] (std::unique_ptr<cql3::result_set> rs) mutable {
@@ -1630,7 +1629,7 @@ static future<json::json_return_type> do_query(schema_ptr schema,
// 2. Filtering - by passing appropriately created restrictions to pager as a last parameter
// 3. Proper timeouts instead of gc_clock::now() and db::no_timeout
// 4. Implement parallel scanning via Segments
future<json::json_return_type> executor::scan(std::string content) {
future<json::json_return_type> executor::scan(client_state& client_state, std::string content) {
_stats.api_operations.scan++;
rjson::value request_info = rjson::parse(content);
elogger.trace("Scanning {}", request_info);
@@ -1659,7 +1658,7 @@ future<json::json_return_type> executor::scan(std::string content) {
partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options);
ck_bounds = filtering_restrictions->get_clustering_bounds(query_options);
}
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), _stats.cql_stats);
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) {
@@ -1782,7 +1781,7 @@ calculate_bounds(schema_ptr schema, const rjson::value& conditions) {
return {std::move(partition_ranges), std::move(ck_bounds)};
}
future<json::json_return_type> executor::query(std::string content) {
future<json::json_return_type> executor::query(client_state& client_state, std::string content) {
_stats.api_operations.query++;
rjson::value request_info = rjson::parse(content);
elogger.trace("Querying {}", request_info);
@@ -1818,7 +1817,7 @@ future<json::json_return_type> executor::query(std::string content) {
throw api_error("ValidationException", format("QueryFilter can only contain non-primary key attributes: Primary key attribute: {}", ck_defs.front()->name_as_text()));
}
}
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), _stats.cql_stats);
return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats);
}
static void validate_limit(int limit) {
@@ -1827,7 +1826,7 @@ static void validate_limit(int limit) {
}
}
future<json::json_return_type> executor::list_tables(std::string content) {
future<json::json_return_type> executor::list_tables(client_state& client_state, std::string content) {
_stats.api_operations.list_tables++;
rjson::value table_info = rjson::parse(content);
elogger.trace("Listing tables {}", table_info);
@@ -1878,7 +1877,7 @@ future<json::json_return_type> executor::list_tables(std::string content) {
return make_ready_future<json::json_return_type>(make_jsonable(std::move(response)));
}
future<json::json_return_type> executor::describe_endpoints(std::string content, std::string host_header) {
future<json::json_return_type> executor::describe_endpoints(client_state& client_state, std::string content, std::string host_header) {
_stats.api_operations.describe_endpoints++;
rjson::value response = rjson::empty_object();
// Without having any configuration parameter to say otherwise, we tell

View File

@@ -17,6 +17,7 @@
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
#include "service/client_state.hh"
#include "stats.hh"
@@ -25,26 +26,28 @@ namespace alternator {
class executor {
service::storage_proxy& _proxy;
service::migration_manager& _mm;
public:
using client_state = service::client_state;
stats _stats;
static constexpr auto ATTRS_COLUMN_NAME = "attrs";
static constexpr auto KEYSPACE_NAME = "alternator";
executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {}
future<json::json_return_type> create_table(std::string content);
future<json::json_return_type> describe_table(std::string content);
future<json::json_return_type> delete_table(std::string content);
future<json::json_return_type> put_item(std::string content);
future<json::json_return_type> get_item(std::string content);
future<json::json_return_type> delete_item(std::string content);
future<json::json_return_type> update_item(std::string content);
future<json::json_return_type> list_tables(std::string content);
future<json::json_return_type> scan(std::string content);
future<json::json_return_type> describe_endpoints(std::string content, std::string host_header);
future<json::json_return_type> batch_write_item(std::string content);
future<json::json_return_type> batch_get_item(std::string content);
future<json::json_return_type> query(std::string content);
future<json::json_return_type> create_table(client_state& client_state, std::string content);
future<json::json_return_type> describe_table(client_state& client_state, std::string content);
future<json::json_return_type> delete_table(client_state& client_state, std::string content);
future<json::json_return_type> put_item(client_state& client_state, std::string content);
future<json::json_return_type> get_item(client_state& client_state, std::string content);
future<json::json_return_type> delete_item(client_state& client_state, std::string content);
future<json::json_return_type> update_item(client_state& client_state, std::string content);
future<json::json_return_type> list_tables(client_state& client_state, std::string content);
future<json::json_return_type> scan(client_state& client_state, std::string content);
future<json::json_return_type> describe_endpoints(client_state& client_state, std::string content, std::string host_header);
future<json::json_return_type> batch_write_item(client_state& client_state, std::string content);
future<json::json_return_type> batch_get_item(client_state& client_state, std::string content);
future<json::json_return_type> query(client_state& client_state, std::string content);
future<> start();
future<> stop() { return make_ready_future<>(); }

View File

@@ -97,21 +97,23 @@ protected:
};
void server::set_routes(routes& r) {
using alternator_callback = std::function<future<json::json_return_type>(executor&, std::unique_ptr<request>)>;
using alternator_callback = std::function<future<json::json_return_type>(executor&, executor::client_state&, std::unique_ptr<request>)>;
std::unordered_map<std::string, alternator_callback> routes{
{"CreateTable", [] (executor& e, std::unique_ptr<request> req) { return e.maybe_create_keyspace().then([&e, req = std::move(req)] { return e.create_table(req->content); }); }},
{"DescribeTable", [] (executor& e, std::unique_ptr<request> req) { return e.describe_table(req->content); }},
{"DeleteTable", [] (executor& e, std::unique_ptr<request> req) { return e.delete_table(req->content); }},
{"PutItem", [] (executor& e, std::unique_ptr<request> req) { return e.put_item(req->content); }},
{"UpdateItem", [] (executor& e, std::unique_ptr<request> req) { return e.update_item(req->content); }},
{"GetItem", [] (executor& e, std::unique_ptr<request> req) { return e.get_item(req->content); }},
{"DeleteItem", [] (executor& e, std::unique_ptr<request> req) { return e.delete_item(req->content); }},
{"ListTables", [] (executor& e, std::unique_ptr<request> req) { return e.list_tables(req->content); }},
{"Scan", [] (executor& e, std::unique_ptr<request> req) { return e.scan(req->content); }},
{"DescribeEndpoints", [] (executor& e, std::unique_ptr<request> req) { return e.describe_endpoints(req->content, req->get_header("Host")); }},
{"BatchWriteItem", [] (executor& e, std::unique_ptr<request> req) { return e.batch_write_item(req->content); }},
{"BatchGetItem", [] (executor& e, std::unique_ptr<request> req) { return e.batch_get_item(req->content); }},
{"Query", [] (executor& e, std::unique_ptr<request> req) { return e.query(req->content); }},
{"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) {
return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); }
},
{"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_table(client_state, req->content); }},
{"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_table(client_state, req->content); }},
{"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.put_item(client_state, req->content); }},
{"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.update_item(client_state, req->content); }},
{"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.get_item(client_state, req->content); }},
{"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.delete_item(client_state, req->content); }},
{"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.list_tables(client_state, req->content); }},
{"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.scan(client_state, req->content); }},
{"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }},
{"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_write_item(client_state, req->content); }},
{"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.batch_get_item(client_state, req->content); }},
{"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr<request> req) { return e.query(client_state, req->content); }},
};
api_handler* handler = new api_handler([this, routes = std::move(routes)](std::unique_ptr<request> req) -> future<json::json_return_type> {
@@ -129,7 +131,12 @@ void server::set_routes(routes& r) {
throw api_error("UnknownOperationException",
format("Unsupported operation {}", op));
}
return callback_it->second(_executor.local(), std::move(req));
//FIXME: Client state can provide more context, e.g. client's endpoint address
// We use unique_ptr because client_state cannot be moved or copied
return do_with(std::make_unique<executor::client_state>(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), req = std::move(req)] (std::unique_ptr<executor::client_state>& client_state) mutable {
client_state->set_raw_keyspace(executor::KEYSPACE_NAME);
return callback_it->second(_executor.local(), *client_state, std::move(req));
});
});
r.add(operation_type::POST, url("/"), handler);