diff --git a/alternator/consumed_capacity.cc b/alternator/consumed_capacity.cc index a306b1441a..753dd07980 100644 --- a/alternator/consumed_capacity.cc +++ b/alternator/consumed_capacity.cc @@ -24,7 +24,7 @@ static constexpr uint64_t KB = 1024ULL; static constexpr uint64_t RCU_BLOCK_SIZE_LENGTH = 4*KB; static constexpr uint64_t WCU_BLOCK_SIZE_LENGTH = 1*KB; -static bool should_add_capacity(const rjson::value& request) { +bool consumed_capacity_counter::should_add_capacity(const rjson::value& request) { const rjson::value* return_consumed = rjson::find(request, "ReturnConsumedCapacity"); if (!return_consumed) { return false; @@ -62,9 +62,12 @@ static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_by rcu_consumed_capacity_counter::rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum) : consumed_capacity_counter(should_add_capacity(request)),_is_quorum(is_quorum) { } +uint64_t rcu_consumed_capacity_counter::get_half_units(uint64_t total_bytes, bool is_quorum) noexcept { + return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, total_bytes, is_quorum); +} uint64_t rcu_consumed_capacity_counter::get_half_units() const noexcept { - return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, _total_bytes, _is_quorum); + return get_half_units(_total_bytes, _is_quorum); } uint64_t wcu_consumed_capacity_counter::get_half_units() const noexcept { diff --git a/alternator/consumed_capacity.hh b/alternator/consumed_capacity.hh index 3dcbb65355..688a8dbfb3 100644 --- a/alternator/consumed_capacity.hh +++ b/alternator/consumed_capacity.hh @@ -42,15 +42,18 @@ public: */ virtual uint64_t get_half_units() const noexcept = 0; uint64_t _total_bytes = 0; + static bool should_add_capacity(const rjson::value& request); protected: bool _should_add_to_reponse = false; }; class rcu_consumed_capacity_counter : public consumed_capacity_counter { - virtual uint64_t get_half_units() const noexcept; bool _is_quorum = false; public: rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum); + rcu_consumed_capacity_counter(): consumed_capacity_counter(false), _is_quorum(false){} + virtual uint64_t get_half_units() const noexcept; + static uint64_t get_half_units(uint64_t total_bytes, bool is_quorum) noexcept; }; class wcu_consumed_capacity_counter : public consumed_capacity_counter { diff --git a/alternator/executor.cc b/alternator/executor.cc index 5bd89e93a2..bbd5d04735 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3206,14 +3206,17 @@ future> executor::describe_multi_item(schema_ptr schem const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, - shared_ptr> attrs_to_get) { + shared_ptr> attrs_to_get, + uint64_t& rcu_half_units) { cql3::selection::result_set_builder builder(*selection, gc_clock::now()); query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection)); auto result_set = builder.build(); std::vector ret; for (auto& result_row : result_set->rows()) { rjson::value item = rjson::empty_object(); - describe_single_item(*selection, result_row, *attrs_to_get, item); + rcu_consumed_capacity_counter consumed_capacity; + describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes); + rcu_half_units += consumed_capacity.get_half_units(); ret.push_back(std::move(item)); co_await coroutine::maybe_yield(); } @@ -4114,6 +4117,7 @@ future executor::batch_get_item(client_state& cli // listing all the request aimed at a single table. For efficiency, inside // each table_requests we further group together all reads going to the // same partition, so we can later send them together. + bool should_add_rcu = rcu_consumed_capacity_counter::should_add_capacity(request); struct table_requests { schema_ptr schema; db::consistency_level cl; @@ -4140,6 +4144,7 @@ future executor::batch_get_item(client_state& cli } }; std::vector requests; + std::vector> responses_sizes; uint batch_size = 0; for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { table_requests rs(get_table_from_batch_request(_proxy, it)); @@ -4166,7 +4171,11 @@ future executor::batch_get_item(client_state& cli // If we got here, all "requests" are valid, so let's start the // requests for the different partitions all in parallel. std::vector>> response_futures; + responses_sizes.resize(requests.size()); + size_t responses_sizes_pos = 0; for (const auto& rs : requests) { + responses_sizes[responses_sizes_pos].resize(rs.requests.size()); + size_t pos = 0; for (const auto &r : rs.requests) { auto& pk = r.first; auto& cks = r.second; @@ -4189,12 +4198,14 @@ future executor::batch_get_item(client_state& cli command->allow_limit = db::allow_per_partition_rate_limit::yes; future> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( - [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { + [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable { utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); }); - return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get)); + return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size); }); + pos++; response_futures.push_back(std::move(f)); } + responses_sizes_pos++; } // Wait for all requests to complete, and then return the response. @@ -4206,10 +4217,14 @@ future executor::batch_get_item(client_state& cli rjson::value response = rjson::empty_object(); rjson::add(response, "Responses", rjson::empty_object()); rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - + size_t rcu_half_units; auto fut_it = response_futures.begin(); + responses_sizes_pos = 0; + rjson::value consumed_capacity = rjson::empty_array(); for (const auto& rs : requests) { - auto table = table_name(*rs.schema); + std::string table = table_name(*rs.schema); + size_t pos = 0; + rcu_half_units = 0; for (const auto &r : rs.requests) { auto& pk = r.first; auto& cks = r.second; @@ -4224,6 +4239,7 @@ future executor::batch_get_item(client_state& cli for (rjson::value& json : results) { rjson::push_back(response["Responses"][table], std::move(json)); } + rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM); } catch(...) { eptr = std::current_exception(); // This read of potentially several rows in one partition, @@ -4247,7 +4263,20 @@ future executor::batch_get_item(client_state& cli rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second)); } } + pos++; } + _stats.rcu_total += rcu_half_units; + if (should_add_rcu) { + rjson::value entry = rjson::empty_object(); + rjson::add(entry, "TableName", table); + rjson::add(entry, "CapacityUnits", rcu_half_units*0.5); + rjson::push_back(consumed_capacity, std::move(entry)); + } + responses_sizes_pos++; + } + + if (should_add_rcu) { + rjson::add(response, "ConsumedCapacity", std::move(consumed_capacity)); } elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); if (!some_succeeded && eptr) { diff --git a/alternator/executor.hh b/alternator/executor.hh index 1e0d82a945..74a50e2bfb 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -241,7 +241,8 @@ public: const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, - shared_ptr> attrs_to_get); + shared_ptr> attrs_to_get, + uint64_t& rcu_half_units); static void describe_single_item(const cql3::selection::selection&, const std::vector&, diff --git a/test/alternator/test_returnconsumedcapacity.py b/test/alternator/test_returnconsumedcapacity.py index 46f19a7b23..a08cc2c1f1 100644 --- a/test/alternator/test_returnconsumedcapacity.py +++ b/test/alternator/test_returnconsumedcapacity.py @@ -291,3 +291,52 @@ def test_long_update(test_table): ExpressionAttributeValues={':val1': val1}, ReturnConsumedCapacity='TOTAL', ReturnValues='ALL_OLD') assert 3 == response['ConsumedCapacity']["CapacityUnits"] + +# A simple batch getItem test +# This test validates that when two items are fetched from the same table using BatchGetItem, +# the ReturnConsumedCapacity field reflects the sum of independent RCU calculations for each item. +# Consistency is defined per table in the BatchGetItem request, so both items share the same +# consistency setting. The test ensures that RCU is calculated independently for each item +# according to that setting, and the total consumed capacity is their sum. +def test_simple_batch_get_items(test_table_sb): + p1 = random_string() + val = random_string() + c1 = random_bytes() + test_table_sb.put_item(Item={'p': p1, 'c': c1}) + + p2 = random_string() + c2 = random_bytes() + test_table_sb.put_item(Item={'p': p2, 'c': c2}) + + response = test_table_sb.meta.client.batch_get_item(RequestItems = { + test_table_sb.name: {'Keys': [{'p': p1, 'c': c1}, {'p': p2, 'c': c2}], 'ConsistentRead': True}}, ReturnConsumedCapacity='TOTAL') + assert 'ConsumedCapacity' in response + assert 'TableName' in response['ConsumedCapacity'][0] + assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name + assert 2 == response['ConsumedCapacity'][0]['CapacityUnits'] + +# Validate that when getting a batch of requests +# From multiple tables we get an RCU for each of the tables +# We also validate that the eventual consistency return half the units +def test_multi_table_batch_get_items(test_table_s, test_table): + keys1 = [] + for i in range(5): + p = random_string() + test_table_s.put_item(Item={'p': p}) + keys1.append({'p': p}) + keys2 = [] + for i in range(3): + p = random_string() + c = random_string() + test_table.put_item(Item={'p': p, 'c': c}, ReturnConsumedCapacity='TOTAL') + keys2.append({'p': p, 'c': c}) + + response = test_table.meta.client.batch_get_item(RequestItems = { + test_table_s.name: {'Keys': keys1, 'ConsistentRead': True}, + test_table.name: {'Keys': keys2, 'ConsistentRead': False}}, ReturnConsumedCapacity='TOTAL') + for cc in response['ConsumedCapacity']: + if cc['TableName'] == test_table_s.name: + assert cc["CapacityUnits"] == 5 + else: + assert cc['TableName'] == test_table.name + assert cc["CapacityUnits"] == 1.5