From 0be06e0bdf93715ce1caf790d00d863b0d30f362 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 15 Jun 2022 16:11:59 +0300 Subject: [PATCH 1/2] test/alternator: additional test for BatchGetItem Our simple test for BatchGetItem on a table with sort keys still has requests with just one sort key per partition, so if BatchGetItem has a bug with requesting multiple sort keys from the same partition, such bug won't be caught by the simple tests. So in this test we add a test that does. This will be useful for the next patch, we are planning to refactor BatchGetItem's handling of multiple sort keys in the same partition - so it will be useful to have more regression tests. The tests test_batch_get_item_large and test_batch_get_item_partial would actually also catch such bugs, but they are more elaborate tests and it's nice to have smaller tests more focused on checking specific features. Signed-off-by: Nadav Har'El --- test/alternator/test_batch.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/test/alternator/test_batch.py b/test/alternator/test_batch.py index 2e9f241bca..7b9f908347 100644 --- a/test/alternator/test_batch.py +++ b/test/alternator/test_batch.py @@ -234,10 +234,28 @@ def test_batch_get_item(test_table): # We use the low-level batch_get_item API for lack of a more convenient # API. At least it spares us the need to encode the key's types... reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}}) - print(reply) got_items = reply['Responses'][test_table.name] assert multiset(got_items) == multiset(items) +# Like the previous test, schema has both hash and sort keys, this time +# we ask to fetch several sort keys in the same partition key. +def test_batch_get_item_same_partition_key(test_table): + p = random_string() + items = [{'p': p, 'c': random_string(), 'val': random_string()} for i in range(10)] + with test_table.batch_writer() as batch: + for item in items: + batch.put_item(item) + keys = [{k: x[k] for k in ('p', 'c')} for x in items] + reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}}) + got_items = reply['Responses'][test_table.name] + assert multiset(got_items) == multiset(items) + # Above we fetched all the keys, let's try now only half of them + keys_half = keys[::2] + items_half = items[::2] + reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys_half, 'ConsistentRead': True}}) + got_items = reply['Responses'][test_table.name] + assert multiset(got_items) == multiset(items_half) + # Same, with schema has just hash key. def test_batch_get_item_hash(test_table_s): items = [{'p': random_string(), 'val': random_string()} for i in range(10)] From 3aca1ca57218885aadcac1b104234f7898fdbc3f Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Thu, 16 Jun 2022 18:16:51 +0300 Subject: [PATCH 2/2] alternator: make BatchGetItem group reads by partition DynamoDB API's BatchGetItem invokes a number (up to 25) of read requests in parallel, returning when all results are available. Alternator naively implemented this by sending all read requests in parallel, no matter which requests these were. That implementation was inefficient when all the requests are to different items (clustering rows) of the same partition. In a multi-node setup this will end up sending 25 separate requests to the same remote node(s). Even on a single-node setup, this may result in reading from disk more than once, and even if the partition is cached - doing an O(logN) search in each multiple times. What we do in this patch, instead, is to group all the BatchGetItem requests that aimed at the same partition into a single read request asking for a (sorted) list of clustering keys. This is similar to an "IN" request in CQL. As an example of the performance benefit of this patch, I tried a BatchGetItem request asking for 20 random items from a 10-million item partition. I measured the latency of this request on a single-node Scylla. Before this patch, I saw a latency of 17-21 ms (the lower number is when the request is retried and the requested items are already in the cache). After this patch, the latency is 10-14 ms. The performance improvement on multi-node clusters are expected to be even higher. Unfortunately the patch is less trivial than I hoped it would be, because some of the old code was organized under the assumption that each read request only returned one item (and if it failed, it means only one item failed), so this part of the code had to be reorganized (and, for making the code more readable, coroutinized). An unintended benefit of the code reorganization is that it also gave me an opportunity to fail an attempt to ask BatchGetItem the same item more than once (issue #10757). The patch also adds a few more corner cases in the tests, to be even more sure that the code reorganization doesn't introduce a regression in BatchGetItem. Fixes #10753 Fixes #10757 Signed-off-by: Nadav Har'El --- alternator/executor.cc | 184 +++++++++++++++++++++------------- alternator/executor.hh | 8 +- test/alternator/test_batch.py | 18 +++- 3 files changed, 133 insertions(+), 77 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 3106b4e83d..bd5521e8ef 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -2354,14 +2354,32 @@ std::optional executor::describe_single_item(schema_ptr schema, // object without an Item member - not one with an empty Item member return {}; } - // FIXME: I think this can't really be a loop, there should be exactly - // one result after above we handled the 0 result case - for (auto& result_row : result_set->rows()) { - describe_single_item(selection, result_row, attrs_to_get, item); + if (result_set->size() > 1) { + // If the result set contains multiple rows, the code should have + // called describe_multi_item(), not this function. + throw std::logic_error("describe_single_item() asked to describe multiple items"); } + describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item); return item; } +std::vector executor::describe_multi_item(schema_ptr schema, + const query::partition_slice& slice, + const cql3::selection::selection& selection, + const query::result& query_result, + const std::optional& attrs_to_get) { + cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest()); + query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); + auto result_set = builder.build(); + std::vector ret; + for (auto& result_row : result_set->rows()) { + rjson::value item = rjson::empty_object(); + describe_single_item(selection, result_row, attrs_to_get, item); + ret.push_back(std::move(item)); + } + return ret; +} + static bool check_needs_read_before_write(const parsed::value& v) { return std::visit(overloaded_functor { [&] (const parsed::constant& c) -> bool { @@ -3167,123 +3185,145 @@ future executor::batch_get_item(client_state& cli // We need to validate all the parameters before starting any asynchronous // query, and fail the entire request on any parse error. So we parse all - // the input into our own vector "requests". + // the input into our own vector "requests", each element a table_requests + // listing all the request aimed at a single table. For efficiency, inside + // each table_requests we further group together all reads going to the + // same partition, so we can later send them together. struct table_requests { schema_ptr schema; db::consistency_level cl; ::shared_ptr> attrs_to_get; - struct single_request { - partition_key pk; - clustering_key ck; - }; - std::vector requests; + // clustering_keys keeps a sorted set of clustering keys. It must + // be sorted for the read below (see #10827). Additionally each + // clustering key is mapped to the original rjson::value "Key". + using clustering_keys = std::map; + std::unordered_map requests; + table_requests(schema_ptr s) + : schema(std::move(s)) + , requests(8, partition_key::hashing(*schema), partition_key::equality(*schema)) + {} + void add(rjson::value& key) { + auto pk = pk_from_json(key, schema); + auto it = requests.find(pk); + if (it == requests.end()) { + it = requests.emplace(pk, clustering_key::less_compare(*schema)).first; + } + auto ck = ck_from_json(key, schema); + if (auto [_, inserted] = it->second.emplace(ck, &key); !inserted) { + throw api_error::validation("Provided list of item keys contains duplicates"); + } + } }; std::vector requests; for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { - table_requests rs; - rs.schema = get_table_from_batch_request(_proxy, it); + table_requests rs(get_table_from_batch_request(_proxy, it)); tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name()); rs.cl = get_read_consistency(it->value); std::unordered_set used_attribute_names; rs.attrs_to_get = ::make_shared>(calculate_attrs_to_get(it->value, used_attribute_names)); verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "GetItem"); auto& keys = (it->value)["Keys"]; - for (const rjson::value& key : keys.GetArray()) { - rs.requests.push_back({pk_from_json(key, rs.schema), ck_from_json(key, rs.schema)}); + for (rjson::value& key : keys.GetArray()) { + rs.add(key); check_key(key, rs.schema); } requests.emplace_back(std::move(rs)); } - // If got here, all "requests" are valid, so let's start them all - // in parallel. The requests object are then immediately destroyed. - std::vector>>> response_futures; + // If we got here, all "requests" are valid, so let's start the + // requests for the different partitions all in parallel. + std::vector>> response_futures; for (const auto& rs : requests) { for (const auto &r : rs.requests) { - dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, std::move(r.pk)))}; + auto& pk = r.first; + auto& cks = r.second; + dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, pk))}; std::vector bounds; if (rs.schema->clustering_key_size() == 0) { bounds.push_back(query::clustering_range::make_open_ended_both_sides()); } else { - bounds.push_back(query::clustering_range::make_singular(std::move(r.ck))); + for (auto& ck : cks) { + bounds.push_back(query::clustering_range::make_singular(ck.first)); + } } auto regular_columns = boost::copy_range( rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto selection = cql3::selection::selection::wildcard(rs.schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); - future>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, + future> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); }); - std::optional json = describe_single_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); - return make_ready_future>>( - std::make_tuple(schema->cf_name(), std::move(json))); + std::vector jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); + return make_ready_future>(std::move(jsons)); }); response_futures.push_back(std::move(f)); } } // Wait for all requests to complete, and then return the response. - return when_all(response_futures.begin(), response_futures.end()).then( - [request_items = std::move(request_items)] (std::vector>>> responses) mutable { - rjson::value response = rjson::empty_object(); - rjson::add(response, "Responses", rjson::empty_object()); - rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - // In case of full failure (no reads succeeded), an arbitrary error - // from one of the operations will be returned. - bool some_succeeded = false; - std::exception_ptr eptr; - // These iterators are used to match keys from the requests with their corresponding responses. - // If any of the responses failed, the key iterator will be used to insert - // an entry into the UnprocessedKeys object, which will ultimately be returned - // to the user in case of partial success of BatchGetItem operation. - rjson::value::MemberIterator table_items_it = request_items.MemberBegin(); - rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin(); + // In case of full failure (no reads succeeded), an arbitrary error + // from one of the operations will be returned. + bool some_succeeded = false; + std::exception_ptr eptr; - for (auto& fut : responses) { - if (fut.failed()) { - eptr = fut.get_exception(); - if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) { - rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object()); - rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name]; - for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) { + rjson::value response = rjson::empty_object(); + rjson::add(response, "Responses", rjson::empty_object()); + rjson::add(response, "UnprocessedKeys", rjson::empty_object()); + + auto fut_it = response_futures.begin(); + for (const auto& rs : requests) { + auto table = table_name(*rs.schema); + for (const auto &r : rs.requests) { + auto& pk = r.first; + auto& cks = r.second; + auto& fut = *fut_it; + ++fut_it; + try { + std::vector results = co_await std::move(fut); + some_succeeded = true; + if (!response["Responses"].HasMember(table)) { + rjson::add_with_string_name(response["Responses"], table, rjson::empty_array()); + } + for (rjson::value& json : results) { + rjson::push_back(response["Responses"][table], std::move(json)); + } + } catch(...) { + eptr = std::current_exception(); + // This read of potentially several rows in one partition, + // failed. We need to add the row key(s) to UnprocessedKeys. + if (!response["UnprocessedKeys"].HasMember(table)) { + // Add the table's entry in UnprocessedKeys. Need to copy + // all the table's parameters from the request except the + // Keys field, which we start empty and then build below. + rjson::add_with_string_name(response["UnprocessedKeys"], table, rjson::empty_object()); + rjson::value& unprocessed_item = response["UnprocessedKeys"][table]; + rjson::value& request_item = request_items[table]; + for (auto it = request_item.MemberBegin(); it != request_item.MemberEnd(); ++it) { if (it->name != "Keys") { - rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value)); + rjson::add_with_string_name(unprocessed_item, + rjson::to_string_view(it->name), rjson::copy(it->value)); } } rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array()); } - rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it)); - } else { - auto t = fut.get(); - some_succeeded = true; - if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { - rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); - } - if (std::get<1>(t)) { - rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); - } - } - key_it++; - if (key_it == table_items_it->value["Keys"].End()) { - table_items_it++; - if (table_items_it != request_items.MemberEnd()) { - key_it = table_items_it->value["Keys"].Begin(); + for (auto& ck : cks) { + rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second)); } } } - elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); - if (!some_succeeded && eptr) { - return make_exception_future(eptr); - } - if (is_big(response)) { - return make_ready_future(make_streamed(std::move(response))); - } else { - return make_ready_future(make_jsonable(std::move(response))); - } - }); + } + elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); + if (!some_succeeded && eptr) { + co_return coroutine::make_exception(std::move(eptr)); + } + if (is_big(response)) { + co_return make_streamed(std::move(response)); + } else { + co_return make_jsonable(std::move(response)); + } } // "filter" represents a condition that can be applied to individual items diff --git a/alternator/executor.hh b/alternator/executor.hh index 5a9734beda..7e9c89a9df 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -216,13 +216,19 @@ private: static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map * = nullptr); static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map&); -public: +public: static std::optional describe_single_item(schema_ptr, const query::partition_slice&, const cql3::selection::selection&, const query::result&, const std::optional&); + static std::vector describe_multi_item(schema_ptr schema, + const query::partition_slice& slice, + const cql3::selection::selection& selection, + const query::result& query_result, + const std::optional& attrs_to_get); + static void describe_single_item(const cql3::selection::selection&, const std::vector&, const std::optional&, diff --git a/test/alternator/test_batch.py b/test/alternator/test_batch.py index 7b9f908347..3298353648 100644 --- a/test/alternator/test_batch.py +++ b/test/alternator/test_batch.py @@ -329,7 +329,7 @@ def test_batch_unprocessed(test_table_s): # that confirm this. The BatchGetItem documentation does not mention this # constraint - but in fact it exists too: Trying to retrieve the same key # multiple times is not just wasteful - it is outright forbidden. -@pytest.mark.xfail(reason="Issue #10757") +# Reproduces issue #10757 def test_batch_get_item_duplicate(test_table, test_table_s): p = random_string() with pytest.raises(ClientError, match='ValidationException.*duplicates'): @@ -337,6 +337,9 @@ def test_batch_get_item_duplicate(test_table, test_table_s): c = random_string() with pytest.raises(ClientError, match='ValidationException.*duplicates'): test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c}]}}) + # Not a duplicate: + c2 = random_string() + test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c2}]}}) # According to the DynamoDB document, a single BatchWriteItem operation is # limited to 25 update requests, up to 400 KB each, or 16 MB total (25*400 @@ -413,26 +416,33 @@ def test_batch_get_item_large(test_table_sn): def test_batch_get_item_partial(scylla_only, dynamodb, rest_api, test_table_sn): p = random_string() content = random_string() + # prepare "count" rows in "partitions" partitions count = 10 + partitions = 3 with test_table_sn.batch_writer() as batch: for i in range(count): batch.put_item(Item={ - 'p': p, 'c': i, 'content': content}) + 'p': p + str(i % partitions), 'c': i, 'content': content}) responses = [] - to_read = { test_table_sn.name: {'Keys': [{'p': p, 'c': c} for c in range(count)], 'ConsistentRead': True } } + to_read = { test_table_sn.name: {'Keys': [{'p': p + str(c % partitions), 'c': c} for c in range(count)], 'ConsistentRead': True } } with scylla_inject_error(rest_api, "alternator_batch_get_item", one_shot=True): some_keys_were_unprocessed = False while to_read: reply = test_table_sn.meta.client.batch_get_item(RequestItems = to_read) assert 'UnprocessedKeys' in reply to_read = reply['UnprocessedKeys'] + # The UnprocessedKeys should not only list the keys, it should + # also copy the additional parameters used in the original read. + # In this example this was "ConsistentRead". + for tbl in to_read: + assert 'ConsistentRead' in to_read[tbl] some_keys_were_unprocessed = some_keys_were_unprocessed or len(to_read) > 0 print("Left to read:", to_read) assert 'Responses' in reply assert test_table_sn.name in reply['Responses'] responses.extend(reply['Responses'][test_table_sn.name]) assert multiset(responses) == multiset( - [{'p': p, 'c': i, 'content': content} for i in range(count)]) + [{'p': p + str(i % partitions), 'c': i, 'content': content} for i in range(count)]) assert some_keys_were_unprocessed # Test that if the batch read failure is total, i.e. all read requests