alternator: yield during BatchGetItem operation

This commit is contained in:
Marcin Maliszkiewicz
2022-12-19 10:43:12 +01:00
parent 3d78dbd9f2
commit f2788f5391
2 changed files with 16 additions and 16 deletions

View File

@@ -2369,21 +2369,22 @@ std::optional<rjson::value> executor::describe_single_item(schema_ptr schema,
return item;
}
std::vector<rjson::value> executor::describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get) {
cql3::selection::result_set_builder builder(selection, gc_clock::now());
query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get) {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
describe_single_item(selection, result_row, attrs_to_get, item);
describe_single_item(*selection, result_row, *attrs_to_get, item);
ret.push_back(std::move(item));
co_await coroutine::maybe_yield();
}
return ret;
co_return ret;
}
static bool check_needs_read_before_write(const parsed::value& v) {
@@ -3255,8 +3256,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
std::vector<rjson::value> jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get);
return make_ready_future<std::vector<rjson::value>>(std::move(jsons));
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get));
});
response_futures.push_back(std::move(f));
}

View File

@@ -222,11 +222,11 @@ public:
const query::result&,
const std::optional<attrs_to_get>&);
static std::vector<rjson::value> describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get);
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get);
static void describe_single_item(const cql3::selection::selection&,
const std::vector<bytes_opt>&,