From 8bb4ee49da92c8e26b039ac690980e5a5538a35b Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Wed, 9 Apr 2025 01:10:29 +0300 Subject: [PATCH 1/3] alternator/consumed_capacity: make functionality public The consumed_capacity_counter is not completely applicable for batch operations. This patch makes some of its functionality public so that batch get item can use the components to decide if it needs to send consumed capacity in the reply, to get the half units used by the metrics and returned result, and to allow an empty constructor for the RCU counter. (cherry picked from commit 0eabf8b38888680b477df4c90f551534842386b1) --- alternator/consumed_capacity.cc | 7 +++++-- alternator/consumed_capacity.hh | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/alternator/consumed_capacity.cc b/alternator/consumed_capacity.cc index a306b1441a..753dd07980 100644 --- a/alternator/consumed_capacity.cc +++ b/alternator/consumed_capacity.cc @@ -24,7 +24,7 @@ static constexpr uint64_t KB = 1024ULL; static constexpr uint64_t RCU_BLOCK_SIZE_LENGTH = 4*KB; static constexpr uint64_t WCU_BLOCK_SIZE_LENGTH = 1*KB; -static bool should_add_capacity(const rjson::value& request) { +bool consumed_capacity_counter::should_add_capacity(const rjson::value& request) { const rjson::value* return_consumed = rjson::find(request, "ReturnConsumedCapacity"); if (!return_consumed) { return false; @@ -62,9 +62,12 @@ static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_by rcu_consumed_capacity_counter::rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum) : consumed_capacity_counter(should_add_capacity(request)),_is_quorum(is_quorum) { } +uint64_t rcu_consumed_capacity_counter::get_half_units(uint64_t total_bytes, bool is_quorum) noexcept { + return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, total_bytes, is_quorum); +} uint64_t rcu_consumed_capacity_counter::get_half_units() const noexcept { - return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, _total_bytes, _is_quorum); + return get_half_units(_total_bytes, _is_quorum); } uint64_t wcu_consumed_capacity_counter::get_half_units() const noexcept { diff --git a/alternator/consumed_capacity.hh b/alternator/consumed_capacity.hh index 3dcbb65355..688a8dbfb3 100644 --- a/alternator/consumed_capacity.hh +++ b/alternator/consumed_capacity.hh @@ -42,15 +42,18 @@ public: */ virtual uint64_t get_half_units() const noexcept = 0; uint64_t _total_bytes = 0; + static bool should_add_capacity(const rjson::value& request); protected: bool _should_add_to_reponse = false; }; class rcu_consumed_capacity_counter : public consumed_capacity_counter { - virtual uint64_t get_half_units() const noexcept; bool _is_quorum = false; public: rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum); + rcu_consumed_capacity_counter(): consumed_capacity_counter(false), _is_quorum(false){} + virtual uint64_t get_half_units() const noexcept; + static uint64_t get_half_units(uint64_t total_bytes, bool is_quorum) noexcept; }; class wcu_consumed_capacity_counter : public consumed_capacity_counter { From 0761eacf68025e2f2440ec2494311bf907b3d7cc Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Wed, 9 Apr 2025 01:02:09 +0300 Subject: [PATCH 2/3] alternator/executor: Add RCU support for batch get items MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds RCU support for batch get items. With batch requests, multiple objects are read from multiple tables. While the criterion for adding the units is per the batch request, the units are calculated per table—and so is the read consistency. (cherry picked from commit 88095919d0f5d4075c9d4d448d0e4939b07dc4a9) --- alternator/executor.cc | 41 +++++++++++++++++++++++++++++++++++------ alternator/executor.hh | 3 ++- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 5bd89e93a2..bbd5d04735 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3206,14 +3206,17 @@ future> executor::describe_multi_item(schema_ptr schem const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, - shared_ptr> attrs_to_get) { + shared_ptr> attrs_to_get, + uint64_t& rcu_half_units) { cql3::selection::result_set_builder builder(*selection, gc_clock::now()); query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection)); auto result_set = builder.build(); std::vector ret; for (auto& result_row : result_set->rows()) { rjson::value item = rjson::empty_object(); - describe_single_item(*selection, result_row, *attrs_to_get, item); + rcu_consumed_capacity_counter consumed_capacity; + describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes); + rcu_half_units += consumed_capacity.get_half_units(); ret.push_back(std::move(item)); co_await coroutine::maybe_yield(); } @@ -4114,6 +4117,7 @@ future executor::batch_get_item(client_state& cli // listing all the request aimed at a single table. For efficiency, inside // each table_requests we further group together all reads going to the // same partition, so we can later send them together. + bool should_add_rcu = rcu_consumed_capacity_counter::should_add_capacity(request); struct table_requests { schema_ptr schema; db::consistency_level cl; @@ -4140,6 +4144,7 @@ future executor::batch_get_item(client_state& cli } }; std::vector requests; + std::vector> responses_sizes; uint batch_size = 0; for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { table_requests rs(get_table_from_batch_request(_proxy, it)); @@ -4166,7 +4171,11 @@ future executor::batch_get_item(client_state& cli // If we got here, all "requests" are valid, so let's start the // requests for the different partitions all in parallel. std::vector>> response_futures; + responses_sizes.resize(requests.size()); + size_t responses_sizes_pos = 0; for (const auto& rs : requests) { + responses_sizes[responses_sizes_pos].resize(rs.requests.size()); + size_t pos = 0; for (const auto &r : rs.requests) { auto& pk = r.first; auto& cks = r.second; @@ -4189,12 +4198,14 @@ future executor::batch_get_item(client_state& cli command->allow_limit = db::allow_per_partition_rate_limit::yes; future> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( - [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { + [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable { utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); }); - return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get)); + return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size); }); + pos++; response_futures.push_back(std::move(f)); } + responses_sizes_pos++; } // Wait for all requests to complete, and then return the response. @@ -4206,10 +4217,14 @@ future executor::batch_get_item(client_state& cli rjson::value response = rjson::empty_object(); rjson::add(response, "Responses", rjson::empty_object()); rjson::add(response, "UnprocessedKeys", rjson::empty_object()); - + size_t rcu_half_units; auto fut_it = response_futures.begin(); + responses_sizes_pos = 0; + rjson::value consumed_capacity = rjson::empty_array(); for (const auto& rs : requests) { - auto table = table_name(*rs.schema); + std::string table = table_name(*rs.schema); + size_t pos = 0; + rcu_half_units = 0; for (const auto &r : rs.requests) { auto& pk = r.first; auto& cks = r.second; @@ -4224,6 +4239,7 @@ future executor::batch_get_item(client_state& cli for (rjson::value& json : results) { rjson::push_back(response["Responses"][table], std::move(json)); } + rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM); } catch(...) { eptr = std::current_exception(); // This read of potentially several rows in one partition, @@ -4247,7 +4263,20 @@ future executor::batch_get_item(client_state& cli rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second)); } } + pos++; } + _stats.rcu_total += rcu_half_units; + if (should_add_rcu) { + rjson::value entry = rjson::empty_object(); + rjson::add(entry, "TableName", table); + rjson::add(entry, "CapacityUnits", rcu_half_units*0.5); + rjson::push_back(consumed_capacity, std::move(entry)); + } + responses_sizes_pos++; + } + + if (should_add_rcu) { + rjson::add(response, "ConsumedCapacity", std::move(consumed_capacity)); } elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]); if (!some_succeeded && eptr) { diff --git a/alternator/executor.hh b/alternator/executor.hh index 1e0d82a945..74a50e2bfb 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -241,7 +241,8 @@ public: const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, - shared_ptr> attrs_to_get); + shared_ptr> attrs_to_get, + uint64_t& rcu_half_units); static void describe_single_item(const cql3::selection::selection&, const std::vector&, From 9434bd81b3cd12293fec2060d94d0affc0a56108 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 7 Apr 2025 18:00:55 +0300 Subject: [PATCH 3/3] test_returnconsumedcapacity.py: test RCU for batch get item This patch adds tests for consumed capacity in batch get item. It tests both the simple case and the multi-item, multi-table case that combines consistent and non-consistent reads. (cherry picked from commit 3acde5f904d7bb5c97c086bf84214042ac86f2ce) --- .../alternator/test_returnconsumedcapacity.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/alternator/test_returnconsumedcapacity.py b/test/alternator/test_returnconsumedcapacity.py index 46f19a7b23..a08cc2c1f1 100644 --- a/test/alternator/test_returnconsumedcapacity.py +++ b/test/alternator/test_returnconsumedcapacity.py @@ -291,3 +291,52 @@ def test_long_update(test_table): ExpressionAttributeValues={':val1': val1}, ReturnConsumedCapacity='TOTAL', ReturnValues='ALL_OLD') assert 3 == response['ConsumedCapacity']["CapacityUnits"] + +# A simple batch getItem test +# This test validates that when two items are fetched from the same table using BatchGetItem, +# the ReturnConsumedCapacity field reflects the sum of independent RCU calculations for each item. +# Consistency is defined per table in the BatchGetItem request, so both items share the same +# consistency setting. The test ensures that RCU is calculated independently for each item +# according to that setting, and the total consumed capacity is their sum. +def test_simple_batch_get_items(test_table_sb): + p1 = random_string() + val = random_string() + c1 = random_bytes() + test_table_sb.put_item(Item={'p': p1, 'c': c1}) + + p2 = random_string() + c2 = random_bytes() + test_table_sb.put_item(Item={'p': p2, 'c': c2}) + + response = test_table_sb.meta.client.batch_get_item(RequestItems = { + test_table_sb.name: {'Keys': [{'p': p1, 'c': c1}, {'p': p2, 'c': c2}], 'ConsistentRead': True}}, ReturnConsumedCapacity='TOTAL') + assert 'ConsumedCapacity' in response + assert 'TableName' in response['ConsumedCapacity'][0] + assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name + assert 2 == response['ConsumedCapacity'][0]['CapacityUnits'] + +# Validate that when getting a batch of requests +# From multiple tables we get an RCU for each of the tables +# We also validate that the eventual consistency return half the units +def test_multi_table_batch_get_items(test_table_s, test_table): + keys1 = [] + for i in range(5): + p = random_string() + test_table_s.put_item(Item={'p': p}) + keys1.append({'p': p}) + keys2 = [] + for i in range(3): + p = random_string() + c = random_string() + test_table.put_item(Item={'p': p, 'c': c}, ReturnConsumedCapacity='TOTAL') + keys2.append({'p': p, 'c': c}) + + response = test_table.meta.client.batch_get_item(RequestItems = { + test_table_s.name: {'Keys': keys1, 'ConsistentRead': True}, + test_table.name: {'Keys': keys2, 'ConsistentRead': False}}, ReturnConsumedCapacity='TOTAL') + for cc in response['ConsumedCapacity']: + if cc['TableName'] == test_table_s.name: + assert cc["CapacityUnits"] == 5 + else: + assert cc['TableName'] == test_table.name + assert cc["CapacityUnits"] == 1.5