Merge 'alternator: make BatchGetItem group reads by partition'

from Nadav Har'El

This small series improves Alternator's BatchGetItem performance by
grouping requests to the same partition together (Fixes #10753) and also
improves error checking when the same item is requested more than once
(Fixes #10757).

Closes #10834

* github.com:scylladb/scylla:
  alternator: make BatchGetItem group reads by partition
  test/alternator: additional test for BatchGetItem
This commit is contained in:
Piotr Sarna
2022-06-20 10:07:19 +02:00
3 changed files with 152 additions and 78 deletions

View File

@@ -2354,14 +2354,32 @@ std::optional<rjson::value> executor::describe_single_item(schema_ptr schema,
// object without an Item member - not one with an empty Item member
return {};
}
// FIXME: I think this can't really be a loop, there should be exactly
// one result after above we handled the 0 result case
for (auto& result_row : result_set->rows()) {
describe_single_item(selection, result_row, attrs_to_get, item);
if (result_set->size() > 1) {
// If the result set contains multiple rows, the code should have
// called describe_multi_item(), not this function.
throw std::logic_error("describe_single_item() asked to describe multiple items");
}
describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item);
return item;
}
std::vector<rjson::value> executor::describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get) {
cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest());
query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
describe_single_item(selection, result_row, attrs_to_get, item);
ret.push_back(std::move(item));
}
return ret;
}
static bool check_needs_read_before_write(const parsed::value& v) {
return std::visit(overloaded_functor {
[&] (const parsed::constant& c) -> bool {
@@ -3167,123 +3185,145 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
// We need to validate all the parameters before starting any asynchronous
// query, and fail the entire request on any parse error. So we parse all
// the input into our own vector "requests".
// the input into our own vector "requests", each element a table_requests
// listing all the request aimed at a single table. For efficiency, inside
// each table_requests we further group together all reads going to the
// same partition, so we can later send them together.
struct table_requests {
schema_ptr schema;
db::consistency_level cl;
::shared_ptr<const std::optional<alternator::attrs_to_get>> attrs_to_get;
struct single_request {
partition_key pk;
clustering_key ck;
};
std::vector<single_request> requests;
// clustering_keys keeps a sorted set of clustering keys. It must
// be sorted for the read below (see #10827). Additionally each
// clustering key is mapped to the original rjson::value "Key".
using clustering_keys = std::map<clustering_key, rjson::value*, clustering_key::less_compare>;
std::unordered_map<partition_key, clustering_keys, partition_key::hashing, partition_key::equality> requests;
table_requests(schema_ptr s)
: schema(std::move(s))
, requests(8, partition_key::hashing(*schema), partition_key::equality(*schema))
{}
void add(rjson::value& key) {
auto pk = pk_from_json(key, schema);
auto it = requests.find(pk);
if (it == requests.end()) {
it = requests.emplace(pk, clustering_key::less_compare(*schema)).first;
}
auto ck = ck_from_json(key, schema);
if (auto [_, inserted] = it->second.emplace(ck, &key); !inserted) {
throw api_error::validation("Provided list of item keys contains duplicates");
}
}
};
std::vector<table_requests> requests;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs;
rs.schema = get_table_from_batch_request(_proxy, it);
table_requests rs(get_table_from_batch_request(_proxy, it));
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
rs.cl = get_read_consistency(it->value);
std::unordered_set<std::string> used_attribute_names;
rs.attrs_to_get = ::make_shared<const std::optional<attrs_to_get>>(calculate_attrs_to_get(it->value, used_attribute_names));
verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "GetItem");
auto& keys = (it->value)["Keys"];
for (const rjson::value& key : keys.GetArray()) {
rs.requests.push_back({pk_from_json(key, rs.schema), ck_from_json(key, rs.schema)});
for (rjson::value& key : keys.GetArray()) {
rs.add(key);
check_key(key, rs.schema);
}
requests.emplace_back(std::move(rs));
}
// If got here, all "requests" are valid, so let's start them all
// in parallel. The requests object are then immediately destroyed.
std::vector<future<std::tuple<std::string, std::optional<rjson::value>>>> response_futures;
// If we got here, all "requests" are valid, so let's start the
// requests for the different partitions all in parallel.
std::vector<future<std::vector<rjson::value>>> response_futures;
for (const auto& rs : requests) {
for (const auto &r : rs.requests) {
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, std::move(r.pk)))};
auto& pk = r.first;
auto& cks = r.second;
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, pk))};
std::vector<query::clustering_range> bounds;
if (rs.schema->clustering_key_size() == 0) {
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
} else {
bounds.push_back(query::clustering_range::make_singular(std::move(r.ck)));
for (auto& ck : cks) {
bounds.push_back(query::clustering_range::make_singular(ck.first));
}
}
auto regular_columns = boost::copy_range<query::column_id_vector>(
rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; }));
auto selection = cql3::selection::selection::wildcard(rs.schema);
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
future<std::tuple<std::string, std::optional<rjson::value>>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
std::optional<rjson::value> json = describe_single_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get);
return make_ready_future<std::tuple<std::string, std::optional<rjson::value>>>(
std::make_tuple(schema->cf_name(), std::move(json)));
std::vector<rjson::value> jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get);
return make_ready_future<std::vector<rjson::value>>(std::move(jsons));
});
response_futures.push_back(std::move(f));
}
}
// Wait for all requests to complete, and then return the response.
return when_all(response_futures.begin(), response_futures.end()).then(
[request_items = std::move(request_items)] (std::vector<future<std::tuple<std::string, std::optional<rjson::value>>>> responses) mutable {
rjson::value response = rjson::empty_object();
rjson::add(response, "Responses", rjson::empty_object());
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
// In case of full failure (no reads succeeded), an arbitrary error
// from one of the operations will be returned.
bool some_succeeded = false;
std::exception_ptr eptr;
// These iterators are used to match keys from the requests with their corresponding responses.
// If any of the responses failed, the key iterator will be used to insert
// an entry into the UnprocessedKeys object, which will ultimately be returned
// to the user in case of partial success of BatchGetItem operation.
rjson::value::MemberIterator table_items_it = request_items.MemberBegin();
rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin();
// In case of full failure (no reads succeeded), an arbitrary error
// from one of the operations will be returned.
bool some_succeeded = false;
std::exception_ptr eptr;
for (auto& fut : responses) {
if (fut.failed()) {
eptr = fut.get_exception();
if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) {
rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object());
rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name];
for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) {
rjson::value response = rjson::empty_object();
rjson::add(response, "Responses", rjson::empty_object());
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
auto fut_it = response_futures.begin();
for (const auto& rs : requests) {
auto table = table_name(*rs.schema);
for (const auto &r : rs.requests) {
auto& pk = r.first;
auto& cks = r.second;
auto& fut = *fut_it;
++fut_it;
try {
std::vector<rjson::value> results = co_await std::move(fut);
some_succeeded = true;
if (!response["Responses"].HasMember(table)) {
rjson::add_with_string_name(response["Responses"], table, rjson::empty_array());
}
for (rjson::value& json : results) {
rjson::push_back(response["Responses"][table], std::move(json));
}
} catch(...) {
eptr = std::current_exception();
// This read of potentially several rows in one partition,
// failed. We need to add the row key(s) to UnprocessedKeys.
if (!response["UnprocessedKeys"].HasMember(table)) {
// Add the table's entry in UnprocessedKeys. Need to copy
// all the table's parameters from the request except the
// Keys field, which we start empty and then build below.
rjson::add_with_string_name(response["UnprocessedKeys"], table, rjson::empty_object());
rjson::value& unprocessed_item = response["UnprocessedKeys"][table];
rjson::value& request_item = request_items[table];
for (auto it = request_item.MemberBegin(); it != request_item.MemberEnd(); ++it) {
if (it->name != "Keys") {
rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value));
rjson::add_with_string_name(unprocessed_item,
rjson::to_string_view(it->name), rjson::copy(it->value));
}
}
rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array());
}
rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it));
} else {
auto t = fut.get();
some_succeeded = true;
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
}
if (std::get<1>(t)) {
rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t)));
}
}
key_it++;
if (key_it == table_items_it->value["Keys"].End()) {
table_items_it++;
if (table_items_it != request_items.MemberEnd()) {
key_it = table_items_it->value["Keys"].Begin();
for (auto& ck : cks) {
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
}
}
}
elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]);
if (!some_succeeded && eptr) {
return make_exception_future<executor::request_return_type>(eptr);
}
if (is_big(response)) {
return make_ready_future<executor::request_return_type>(make_streamed(std::move(response)));
} else {
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(response)));
}
});
}
elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]);
if (!some_succeeded && eptr) {
co_return coroutine::make_exception(std::move(eptr));
}
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {
co_return make_jsonable(std::move(response));
}
}
// "filter" represents a condition that can be applied to individual items

View File

@@ -216,13 +216,19 @@ private:
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr);
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&);
public:
public:
static std::optional<rjson::value> describe_single_item(schema_ptr,
const query::partition_slice&,
const cql3::selection::selection&,
const query::result&,
const std::optional<attrs_to_get>&);
static std::vector<rjson::value> describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get);
static void describe_single_item(const cql3::selection::selection&,
const std::vector<bytes_opt>&,
const std::optional<attrs_to_get>&,

View File

@@ -234,10 +234,28 @@ def test_batch_get_item(test_table):
# We use the low-level batch_get_item API for lack of a more convenient
# API. At least it spares us the need to encode the key's types...
reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}})
print(reply)
got_items = reply['Responses'][test_table.name]
assert multiset(got_items) == multiset(items)
# Like the previous test, schema has both hash and sort keys, this time
# we ask to fetch several sort keys in the same partition key.
def test_batch_get_item_same_partition_key(test_table):
p = random_string()
items = [{'p': p, 'c': random_string(), 'val': random_string()} for i in range(10)]
with test_table.batch_writer() as batch:
for item in items:
batch.put_item(item)
keys = [{k: x[k] for k in ('p', 'c')} for x in items]
reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}})
got_items = reply['Responses'][test_table.name]
assert multiset(got_items) == multiset(items)
# Above we fetched all the keys, let's try now only half of them
keys_half = keys[::2]
items_half = items[::2]
reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys_half, 'ConsistentRead': True}})
got_items = reply['Responses'][test_table.name]
assert multiset(got_items) == multiset(items_half)
# Same, with schema has just hash key.
def test_batch_get_item_hash(test_table_s):
items = [{'p': random_string(), 'val': random_string()} for i in range(10)]
@@ -311,7 +329,7 @@ def test_batch_unprocessed(test_table_s):
# that confirm this. The BatchGetItem documentation does not mention this
# constraint - but in fact it exists too: Trying to retrieve the same key
# multiple times is not just wasteful - it is outright forbidden.
@pytest.mark.xfail(reason="Issue #10757")
# Reproduces issue #10757
def test_batch_get_item_duplicate(test_table, test_table_s):
p = random_string()
with pytest.raises(ClientError, match='ValidationException.*duplicates'):
@@ -319,6 +337,9 @@ def test_batch_get_item_duplicate(test_table, test_table_s):
c = random_string()
with pytest.raises(ClientError, match='ValidationException.*duplicates'):
test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c}]}})
# Not a duplicate:
c2 = random_string()
test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c2}]}})
# According to the DynamoDB document, a single BatchWriteItem operation is
# limited to 25 update requests, up to 400 KB each, or 16 MB total (25*400
@@ -395,26 +416,33 @@ def test_batch_get_item_large(test_table_sn):
def test_batch_get_item_partial(scylla_only, dynamodb, rest_api, test_table_sn):
p = random_string()
content = random_string()
# prepare "count" rows in "partitions" partitions
count = 10
partitions = 3
with test_table_sn.batch_writer() as batch:
for i in range(count):
batch.put_item(Item={
'p': p, 'c': i, 'content': content})
'p': p + str(i % partitions), 'c': i, 'content': content})
responses = []
to_read = { test_table_sn.name: {'Keys': [{'p': p, 'c': c} for c in range(count)], 'ConsistentRead': True } }
to_read = { test_table_sn.name: {'Keys': [{'p': p + str(c % partitions), 'c': c} for c in range(count)], 'ConsistentRead': True } }
with scylla_inject_error(rest_api, "alternator_batch_get_item", one_shot=True):
some_keys_were_unprocessed = False
while to_read:
reply = test_table_sn.meta.client.batch_get_item(RequestItems = to_read)
assert 'UnprocessedKeys' in reply
to_read = reply['UnprocessedKeys']
# The UnprocessedKeys should not only list the keys, it should
# also copy the additional parameters used in the original read.
# In this example this was "ConsistentRead".
for tbl in to_read:
assert 'ConsistentRead' in to_read[tbl]
some_keys_were_unprocessed = some_keys_were_unprocessed or len(to_read) > 0
print("Left to read:", to_read)
assert 'Responses' in reply
assert test_table_sn.name in reply['Responses']
responses.extend(reply['Responses'][test_table_sn.name])
assert multiset(responses) == multiset(
[{'p': p, 'c': i, 'content': content} for i in range(count)])
[{'p': p + str(i % partitions), 'c': i, 'content': content} for i in range(count)])
assert some_keys_were_unprocessed
# Test that if the batch read failure is total, i.e. all read requests