diff --git a/alternator/executor.cc b/alternator/executor.cc index 9253ecd03d..bf237da181 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3139,25 +3139,58 @@ future executor::batch_get_item(client_state& cli } // Wait for all requests to complete, and then return the response. - // FIXME: If one of the requests failed this will fail the entire request. - // What we should do instead is to return the failed key in the array - // UnprocessedKeys (which the BatchGetItem API supports) and let the user - // try again. Note that simply a missing key is *not* an error (we already - // handled it above), but this case does include things like timeouts, - // unavailable CL, etc. - return when_all_succeed(response_futures.begin(), response_futures.end()).then( - [] (std::vector>> responses) { + return when_all(response_futures.begin(), response_futures.end()).then( + [request_items = std::move(request_items)] (std::vector>>> responses) mutable { rjson::value response = rjson::empty_object(); rjson::add(response, "Responses", rjson::empty_object()); rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - for (auto& t : responses) { - if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { - rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); + // In case of full failure (no reads succeeded), an arbitrary error + // from one of the operations will be returned. + bool some_succeeded = false; + std::exception_ptr eptr; + // These iterators are used to match keys from the requests with their corresponding responses. + // If any of the responses failed, the key iterator will be used to insert + // an entry into the UnprocessedKeys object, which will ultimately be returned + // to the user in case of partial success of BatchGetItem operation. + rjson::value::MemberIterator table_items_it = request_items.MemberBegin(); + rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin(); + + for (auto& fut : responses) { + if (fut.failed()) { + eptr = fut.get_exception(); + if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) { + rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object()); + rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name]; + for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) { + if (it->name != "Keys") { + rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value)); + } + } + rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array()); + } + rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it)); + } else { + auto t = fut.get(); + some_succeeded = true; + if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { + rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); + } + if (std::get<1>(t)) { + rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); + } } - if (std::get<1>(t)) { - rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); + key_it++; + if (key_it == table_items_it->value["Keys"].End()) { + table_items_it++; + if (table_items_it != request_items.MemberEnd()) { + key_it = table_items_it->value["Keys"].Begin(); + } } } + elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); + if (!some_succeeded && eptr) { + return make_exception_future(eptr); + } if (is_big(response)) { return make_ready_future(make_streamed(std::move(response))); } else {