diff --git a/alternator/executor.cc b/alternator/executor.cc index 3106b4e83d..bd5521e8ef 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -2354,14 +2354,32 @@ std::optional executor::describe_single_item(schema_ptr schema, // object without an Item member - not one with an empty Item member return {}; } - // FIXME: I think this can't really be a loop, there should be exactly - // one result after above we handled the 0 result case - for (auto& result_row : result_set->rows()) { - describe_single_item(selection, result_row, attrs_to_get, item); + if (result_set->size() > 1) { + // If the result set contains multiple rows, the code should have + // called describe_multi_item(), not this function. + throw std::logic_error("describe_single_item() asked to describe multiple items"); } + describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item); return item; } +std::vector executor::describe_multi_item(schema_ptr schema, + const query::partition_slice& slice, + const cql3::selection::selection& selection, + const query::result& query_result, + const std::optional& attrs_to_get) { + cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest()); + query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); + auto result_set = builder.build(); + std::vector ret; + for (auto& result_row : result_set->rows()) { + rjson::value item = rjson::empty_object(); + describe_single_item(selection, result_row, attrs_to_get, item); + ret.push_back(std::move(item)); + } + return ret; +} + static bool check_needs_read_before_write(const parsed::value& v) { return std::visit(overloaded_functor { [&] (const parsed::constant& c) -> bool { @@ -3167,123 +3185,145 @@ future executor::batch_get_item(client_state& cli // We need to validate all the parameters before starting any asynchronous // query, and fail the entire request on any parse error. So we parse all - // the input into our own vector "requests". + // the input into our own vector "requests", each element a table_requests + // listing all the request aimed at a single table. For efficiency, inside + // each table_requests we further group together all reads going to the + // same partition, so we can later send them together. struct table_requests { schema_ptr schema; db::consistency_level cl; ::shared_ptr> attrs_to_get; - struct single_request { - partition_key pk; - clustering_key ck; - }; - std::vector requests; + // clustering_keys keeps a sorted set of clustering keys. It must + // be sorted for the read below (see #10827). Additionally each + // clustering key is mapped to the original rjson::value "Key". + using clustering_keys = std::map; + std::unordered_map requests; + table_requests(schema_ptr s) + : schema(std::move(s)) + , requests(8, partition_key::hashing(*schema), partition_key::equality(*schema)) + {} + void add(rjson::value& key) { + auto pk = pk_from_json(key, schema); + auto it = requests.find(pk); + if (it == requests.end()) { + it = requests.emplace(pk, clustering_key::less_compare(*schema)).first; + } + auto ck = ck_from_json(key, schema); + if (auto [_, inserted] = it->second.emplace(ck, &key); !inserted) { + throw api_error::validation("Provided list of item keys contains duplicates"); + } + } }; std::vector requests; for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { - table_requests rs; - rs.schema = get_table_from_batch_request(_proxy, it); + table_requests rs(get_table_from_batch_request(_proxy, it)); tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name()); rs.cl = get_read_consistency(it->value); std::unordered_set used_attribute_names; rs.attrs_to_get = ::make_shared>(calculate_attrs_to_get(it->value, used_attribute_names)); verify_all_are_used(request, "ExpressionAttributeNames", used_attribute_names, "GetItem"); auto& keys = (it->value)["Keys"]; - for (const rjson::value& key : keys.GetArray()) { - rs.requests.push_back({pk_from_json(key, rs.schema), ck_from_json(key, rs.schema)}); + for (rjson::value& key : keys.GetArray()) { + rs.add(key); check_key(key, rs.schema); } requests.emplace_back(std::move(rs)); } - // If got here, all "requests" are valid, so let's start them all - // in parallel. The requests object are then immediately destroyed. - std::vector>>> response_futures; + // If we got here, all "requests" are valid, so let's start the + // requests for the different partitions all in parallel. + std::vector>> response_futures; for (const auto& rs : requests) { for (const auto &r : rs.requests) { - dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, std::move(r.pk)))}; + auto& pk = r.first; + auto& cks = r.second; + dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, pk))}; std::vector bounds; if (rs.schema->clustering_key_size() == 0) { bounds.push_back(query::clustering_range::make_open_ended_both_sides()); } else { - bounds.push_back(query::clustering_range::make_singular(std::move(r.ck))); + for (auto& ck : cks) { + bounds.push_back(query::clustering_range::make_singular(ck.first)); + } } auto regular_columns = boost::copy_range( rs.schema->regular_columns() | boost::adaptors::transformed([] (const column_definition& cdef) { return cdef.id; })); auto selection = cql3::selection::selection::wildcard(rs.schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice)); - future>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, + future> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); }); - std::optional json = describe_single_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); - return make_ready_future>>( - std::make_tuple(schema->cf_name(), std::move(json))); + std::vector jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); + return make_ready_future>(std::move(jsons)); }); response_futures.push_back(std::move(f)); } } // Wait for all requests to complete, and then return the response. - return when_all(response_futures.begin(), response_futures.end()).then( - [request_items = std::move(request_items)] (std::vector>>> responses) mutable { - rjson::value response = rjson::empty_object(); - rjson::add(response, "Responses", rjson::empty_object()); - rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - // In case of full failure (no reads succeeded), an arbitrary error - // from one of the operations will be returned. - bool some_succeeded = false; - std::exception_ptr eptr; - // These iterators are used to match keys from the requests with their corresponding responses. - // If any of the responses failed, the key iterator will be used to insert - // an entry into the UnprocessedKeys object, which will ultimately be returned - // to the user in case of partial success of BatchGetItem operation. - rjson::value::MemberIterator table_items_it = request_items.MemberBegin(); - rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin(); + // In case of full failure (no reads succeeded), an arbitrary error + // from one of the operations will be returned. + bool some_succeeded = false; + std::exception_ptr eptr; - for (auto& fut : responses) { - if (fut.failed()) { - eptr = fut.get_exception(); - if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) { - rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object()); - rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name]; - for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) { + rjson::value response = rjson::empty_object(); + rjson::add(response, "Responses", rjson::empty_object()); + rjson::add(response, "UnprocessedKeys", rjson::empty_object()); + + auto fut_it = response_futures.begin(); + for (const auto& rs : requests) { + auto table = table_name(*rs.schema); + for (const auto &r : rs.requests) { + auto& pk = r.first; + auto& cks = r.second; + auto& fut = *fut_it; + ++fut_it; + try { + std::vector results = co_await std::move(fut); + some_succeeded = true; + if (!response["Responses"].HasMember(table)) { + rjson::add_with_string_name(response["Responses"], table, rjson::empty_array()); + } + for (rjson::value& json : results) { + rjson::push_back(response["Responses"][table], std::move(json)); + } + } catch(...) { + eptr = std::current_exception(); + // This read of potentially several rows in one partition, + // failed. We need to add the row key(s) to UnprocessedKeys. + if (!response["UnprocessedKeys"].HasMember(table)) { + // Add the table's entry in UnprocessedKeys. Need to copy + // all the table's parameters from the request except the + // Keys field, which we start empty and then build below. + rjson::add_with_string_name(response["UnprocessedKeys"], table, rjson::empty_object()); + rjson::value& unprocessed_item = response["UnprocessedKeys"][table]; + rjson::value& request_item = request_items[table]; + for (auto it = request_item.MemberBegin(); it != request_item.MemberEnd(); ++it) { if (it->name != "Keys") { - rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value)); + rjson::add_with_string_name(unprocessed_item, + rjson::to_string_view(it->name), rjson::copy(it->value)); } } rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array()); } - rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it)); - } else { - auto t = fut.get(); - some_succeeded = true; - if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { - rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); - } - if (std::get<1>(t)) { - rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); - } - } - key_it++; - if (key_it == table_items_it->value["Keys"].End()) { - table_items_it++; - if (table_items_it != request_items.MemberEnd()) { - key_it = table_items_it->value["Keys"].Begin(); + for (auto& ck : cks) { + rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second)); } } } - elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); - if (!some_succeeded && eptr) { - return make_exception_future(eptr); - } - if (is_big(response)) { - return make_ready_future(make_streamed(std::move(response))); - } else { - return make_ready_future(make_jsonable(std::move(response))); - } - }); + } + elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); + if (!some_succeeded && eptr) { + co_return coroutine::make_exception(std::move(eptr)); + } + if (is_big(response)) { + co_return make_streamed(std::move(response)); + } else { + co_return make_jsonable(std::move(response)); + } } // "filter" represents a condition that can be applied to individual items diff --git a/alternator/executor.hh b/alternator/executor.hh index 5a9734beda..7e9c89a9df 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -216,13 +216,19 @@ private: static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map * = nullptr); static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map&); -public: +public: static std::optional describe_single_item(schema_ptr, const query::partition_slice&, const cql3::selection::selection&, const query::result&, const std::optional&); + static std::vector describe_multi_item(schema_ptr schema, + const query::partition_slice& slice, + const cql3::selection::selection& selection, + const query::result& query_result, + const std::optional& attrs_to_get); + static void describe_single_item(const cql3::selection::selection&, const std::vector&, const std::optional&, diff --git a/test/alternator/test_batch.py b/test/alternator/test_batch.py index 2e9f241bca..3298353648 100644 --- a/test/alternator/test_batch.py +++ b/test/alternator/test_batch.py @@ -234,10 +234,28 @@ def test_batch_get_item(test_table): # We use the low-level batch_get_item API for lack of a more convenient # API. At least it spares us the need to encode the key's types... reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}}) - print(reply) got_items = reply['Responses'][test_table.name] assert multiset(got_items) == multiset(items) +# Like the previous test, schema has both hash and sort keys, this time +# we ask to fetch several sort keys in the same partition key. +def test_batch_get_item_same_partition_key(test_table): + p = random_string() + items = [{'p': p, 'c': random_string(), 'val': random_string()} for i in range(10)] + with test_table.batch_writer() as batch: + for item in items: + batch.put_item(item) + keys = [{k: x[k] for k in ('p', 'c')} for x in items] + reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys, 'ConsistentRead': True}}) + got_items = reply['Responses'][test_table.name] + assert multiset(got_items) == multiset(items) + # Above we fetched all the keys, let's try now only half of them + keys_half = keys[::2] + items_half = items[::2] + reply = test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': keys_half, 'ConsistentRead': True}}) + got_items = reply['Responses'][test_table.name] + assert multiset(got_items) == multiset(items_half) + # Same, with schema has just hash key. def test_batch_get_item_hash(test_table_s): items = [{'p': random_string(), 'val': random_string()} for i in range(10)] @@ -311,7 +329,7 @@ def test_batch_unprocessed(test_table_s): # that confirm this. The BatchGetItem documentation does not mention this # constraint - but in fact it exists too: Trying to retrieve the same key # multiple times is not just wasteful - it is outright forbidden. -@pytest.mark.xfail(reason="Issue #10757") +# Reproduces issue #10757 def test_batch_get_item_duplicate(test_table, test_table_s): p = random_string() with pytest.raises(ClientError, match='ValidationException.*duplicates'): @@ -319,6 +337,9 @@ def test_batch_get_item_duplicate(test_table, test_table_s): c = random_string() with pytest.raises(ClientError, match='ValidationException.*duplicates'): test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c}]}}) + # Not a duplicate: + c2 = random_string() + test_table.meta.client.batch_get_item(RequestItems = {test_table.name: {'Keys': [{'p': p, 'c': c}, {'p': p, 'c': c2}]}}) # According to the DynamoDB document, a single BatchWriteItem operation is # limited to 25 update requests, up to 400 KB each, or 16 MB total (25*400 @@ -395,26 +416,33 @@ def test_batch_get_item_large(test_table_sn): def test_batch_get_item_partial(scylla_only, dynamodb, rest_api, test_table_sn): p = random_string() content = random_string() + # prepare "count" rows in "partitions" partitions count = 10 + partitions = 3 with test_table_sn.batch_writer() as batch: for i in range(count): batch.put_item(Item={ - 'p': p, 'c': i, 'content': content}) + 'p': p + str(i % partitions), 'c': i, 'content': content}) responses = [] - to_read = { test_table_sn.name: {'Keys': [{'p': p, 'c': c} for c in range(count)], 'ConsistentRead': True } } + to_read = { test_table_sn.name: {'Keys': [{'p': p + str(c % partitions), 'c': c} for c in range(count)], 'ConsistentRead': True } } with scylla_inject_error(rest_api, "alternator_batch_get_item", one_shot=True): some_keys_were_unprocessed = False while to_read: reply = test_table_sn.meta.client.batch_get_item(RequestItems = to_read) assert 'UnprocessedKeys' in reply to_read = reply['UnprocessedKeys'] + # The UnprocessedKeys should not only list the keys, it should + # also copy the additional parameters used in the original read. + # In this example this was "ConsistentRead". + for tbl in to_read: + assert 'ConsistentRead' in to_read[tbl] some_keys_were_unprocessed = some_keys_were_unprocessed or len(to_read) > 0 print("Left to read:", to_read) assert 'Responses' in reply assert test_table_sn.name in reply['Responses'] responses.extend(reply['Responses'][test_table_sn.name]) assert multiset(responses) == multiset( - [{'p': p, 'c': i, 'content': content} for i in range(count)]) + [{'p': p + str(i % partitions), 'c': i, 'content': content} for i in range(count)]) assert some_keys_were_unprocessed # Test that if the batch read failure is total, i.e. all read requests