From 31f4f062a26bfcbf8a0f9bf85ce9d8d1bbfd115a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 28 Jan 2022 10:20:54 +0100 Subject: [PATCH] alternator: fill UnprocessedKeys for failed batch reads DynamoDB protocol specifies that when getting items in a batch failed only partially, unprocessed keys can be returned so that the user can perform a retry. Alternator used to fail the whole request if any of the reads failed, but right now it instead produces the list of unprocessed keys and returns them to the user, as long as at least 1 read was successful. NOTE: tested manually by compiling Scylla with error injection, which fails every nth request. It's rather hard to figure out an automatic test case for this scenario. Fixes #9984 --- alternator/executor.cc | 59 ++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 9253ecd03d..bf237da181 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3139,25 +3139,58 @@ future executor::batch_get_item(client_state& cli } // Wait for all requests to complete, and then return the response. - // FIXME: If one of the requests failed this will fail the entire request. - // What we should do instead is to return the failed key in the array - // UnprocessedKeys (which the BatchGetItem API supports) and let the user - // try again. Note that simply a missing key is *not* an error (we already - // handled it above), but this case does include things like timeouts, - // unavailable CL, etc. - return when_all_succeed(response_futures.begin(), response_futures.end()).then( - [] (std::vector>> responses) { + return when_all(response_futures.begin(), response_futures.end()).then( + [request_items = std::move(request_items)] (std::vector>>> responses) mutable { rjson::value response = rjson::empty_object(); rjson::add(response, "Responses", rjson::empty_object()); rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - for (auto& t : responses) { - if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { - rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); + // In case of full failure (no reads succeeded), an arbitrary error + // from one of the operations will be returned. + bool some_succeeded = false; + std::exception_ptr eptr; + // These iterators are used to match keys from the requests with their corresponding responses. + // If any of the responses failed, the key iterator will be used to insert + // an entry into the UnprocessedKeys object, which will ultimately be returned + // to the user in case of partial success of BatchGetItem operation. + rjson::value::MemberIterator table_items_it = request_items.MemberBegin(); + rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin(); + + for (auto& fut : responses) { + if (fut.failed()) { + eptr = fut.get_exception(); + if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) { + rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object()); + rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name]; + for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) { + if (it->name != "Keys") { + rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value)); + } + } + rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array()); + } + rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it)); + } else { + auto t = fut.get(); + some_succeeded = true; + if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { + rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); + } + if (std::get<1>(t)) { + rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); + } } - if (std::get<1>(t)) { - rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); + key_it++; + if (key_it == table_items_it->value["Keys"].End()) { + table_items_it++; + if (table_items_it != request_items.MemberEnd()) { + key_it = table_items_it->value["Keys"].Begin(); + } } } + elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); + if (!some_succeeded && eptr) { + return make_exception_future(eptr); + } if (is_big(response)) { return make_ready_future(make_streamed(std::move(response))); } else {