mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
alternator: fill UnprocessedKeys for failed batch reads
DynamoDB protocol specifies that when getting items in a batch failed only partially, unprocessed keys can be returned so that the user can perform a retry. Alternator used to fail the whole request if any of the reads failed, but right now it instead produces the list of unprocessed keys and returns them to the user, as long as at least 1 read was successful. NOTE: tested manually by compiling Scylla with error injection, which fails every nth request. It's rather hard to figure out an automatic test case for this scenario. Fixes #9984
This commit is contained in:
@@ -3139,25 +3139,58 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
}
|
||||
|
||||
// Wait for all requests to complete, and then return the response.
|
||||
// FIXME: If one of the requests failed this will fail the entire request.
|
||||
// What we should do instead is to return the failed key in the array
|
||||
// UnprocessedKeys (which the BatchGetItem API supports) and let the user
|
||||
// try again. Note that simply a missing key is *not* an error (we already
|
||||
// handled it above), but this case does include things like timeouts,
|
||||
// unavailable CL, etc.
|
||||
return when_all_succeed(response_futures.begin(), response_futures.end()).then(
|
||||
[] (std::vector<std::tuple<std::string, std::optional<rjson::value>>> responses) {
|
||||
return when_all(response_futures.begin(), response_futures.end()).then(
|
||||
[request_items = std::move(request_items)] (std::vector<future<std::tuple<std::string, std::optional<rjson::value>>>> responses) mutable {
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Responses", rjson::empty_object());
|
||||
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
|
||||
for (auto& t : responses) {
|
||||
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
||||
rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
||||
// In case of full failure (no reads succeeded), an arbitrary error
|
||||
// from one of the operations will be returned.
|
||||
bool some_succeeded = false;
|
||||
std::exception_ptr eptr;
|
||||
// These iterators are used to match keys from the requests with their corresponding responses.
|
||||
// If any of the responses failed, the key iterator will be used to insert
|
||||
// an entry into the UnprocessedKeys object, which will ultimately be returned
|
||||
// to the user in case of partial success of BatchGetItem operation.
|
||||
rjson::value::MemberIterator table_items_it = request_items.MemberBegin();
|
||||
rjson::value::ValueIterator key_it = table_items_it->value["Keys"].Begin();
|
||||
|
||||
for (auto& fut : responses) {
|
||||
if (fut.failed()) {
|
||||
eptr = fut.get_exception();
|
||||
if (!response["UnprocessedKeys"].HasMember(table_items_it->name)) {
|
||||
rjson::add_with_string_name(response["UnprocessedKeys"], rjson::to_string_view(table_items_it->name), rjson::empty_object());
|
||||
rjson::value& unprocessed_item = response["UnprocessedKeys"][table_items_it->name];
|
||||
for (auto it = table_items_it->value.MemberBegin(); it != table_items_it->value.MemberEnd(); ++it) {
|
||||
if (it->name != "Keys") {
|
||||
rjson::add_with_string_name(unprocessed_item, rjson::to_string_view(it->name), rjson::copy(it->value));
|
||||
}
|
||||
}
|
||||
rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array());
|
||||
}
|
||||
rjson::push_back(response["UnprocessedKeys"][table_items_it->name]["Keys"], std::move(*key_it));
|
||||
} else {
|
||||
auto t = fut.get();
|
||||
some_succeeded = true;
|
||||
if (!response["Responses"].HasMember(std::get<0>(t).c_str())) {
|
||||
rjson::add_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array());
|
||||
}
|
||||
if (std::get<1>(t)) {
|
||||
rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t)));
|
||||
}
|
||||
}
|
||||
if (std::get<1>(t)) {
|
||||
rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t)));
|
||||
key_it++;
|
||||
if (key_it == table_items_it->value["Keys"].End()) {
|
||||
table_items_it++;
|
||||
if (table_items_it != request_items.MemberEnd()) {
|
||||
key_it = table_items_it->value["Keys"].Begin();
|
||||
}
|
||||
}
|
||||
}
|
||||
elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]);
|
||||
if (!some_succeeded && eptr) {
|
||||
return make_exception_future<executor::request_return_type>(eptr);
|
||||
}
|
||||
if (is_big(response)) {
|
||||
return make_ready_future<executor::request_return_type>(make_streamed(std::move(response)));
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user