From ce8db6e19eb1acc4a5a61c2ae5ffceb2000225db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Cybulski?= Date: Tue, 28 Oct 2025 15:30:37 +0100 Subject: [PATCH] Add table name to tracing in alternator Add a table name to Alternator's tracing output, as some clients would like to consistently receive this information. - add missing `tracing::add_table_name` in `executor::scan` - add emiting tables' names in `trace_state::build_parameters_map` - update tests, so when tracing is looked for it is filtered by table's name, which confirms table is being outputed. - change `struct one_session_records` declaration to `class one_session_records`, as `one_session_records` is later defined as class. Refs #26618 Fixes #24031 Closes scylladb/scylladb#26634 --- alternator/executor.cc | 23 +++---- docs/dev/tracing.md | 6 ++ test/alternator/test_tracing.py | 104 ++++++++++++++++++++++++-------- tracing/trace_state.cc | 10 +++ tracing/trace_state.hh | 7 +++ tracing/tracing.hh | 2 +- 6 files changed, 116 insertions(+), 36 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index cd281c3933..409430d6c2 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -888,7 +888,7 @@ future executor::describe_table(client_state& cli schema_ptr schema = get_table(_proxy, request); get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++; - tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); + tracing::add_alternator_table_name(trace_state, schema->cf_name()); rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit); rjson::value response = rjson::empty_object(); @@ -989,7 +989,7 @@ future executor::delete_table(client_state& clien std::string table_name = get_table_name(request); std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name; - tracing::add_table_name(trace_state, keyspace_name, table_name); + tracing::add_alternator_table_name(trace_state, table_name); auto& p = _proxy.container(); schema_ptr schema = get_table(_proxy, request); @@ -1583,7 +1583,7 @@ static future create_table_on_shard0(service::cli std::unordered_set unused_attribute_definitions = validate_attribute_definitions("", *attribute_definitions); - tracing::add_table_name(trace_state, keyspace_name, table_name); + tracing::add_alternator_table_name(trace_state, table_name); schema_builder builder(keyspace_name, table_name); auto [hash_key, range_key] = parse_key_schema(request, ""); @@ -1930,7 +1930,7 @@ future executor::update_table(client_state& clien schema_ptr tab = get_table(p.local(), request); - tracing::add_table_name(gt, tab->ks_name(), tab->cf_name()); + tracing::add_alternator_table_name(gt, tab->cf_name()); // the ugly but harmless conversion to string_view here is because // Seastar's sstring is missing a find(std::string_view) :-() @@ -2856,7 +2856,7 @@ future executor::put_item(client_state& client_st elogger.trace("put_item {}", request); auto op = make_shared(*_parsed_expression_cache, _proxy, std::move(request)); - tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); + tracing::add_alternator_table_name(trace_state, op->schema()->cf_name()); const bool needs_read_before_write = op->needs_read_before_write(); co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); @@ -2960,7 +2960,7 @@ future executor::delete_item(client_state& client auto op = make_shared(*_parsed_expression_cache, _proxy, std::move(request)); lw_shared_ptr per_table_stats = get_stats_from_schema(_proxy, *(op->schema())); - tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); + tracing::add_alternator_table_name(trace_state, op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); @@ -3204,7 +3204,7 @@ future executor::batch_write_item(client_state& c per_table_stats->api_operations.batch_write_item++; per_table_stats->api_operations.batch_write_item_batch_total += it->value.Size(); per_table_stats->api_operations.batch_write_item_histogram.add(it->value.Size()); - tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); + tracing::add_alternator_table_name(trace_state, schema->cf_name()); std::unordered_set used_keys( 1, primary_key_hash{schema}, primary_key_equal{schema}); @@ -4464,7 +4464,7 @@ future executor::update_item(client_state& client elogger.trace("update_item {}", request); auto op = make_shared(*_parsed_expression_cache, _proxy, std::move(request)); - tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name()); + tracing::add_alternator_table_name(trace_state, op->schema()->cf_name()); const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write(); co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats); @@ -4545,7 +4545,7 @@ future executor::get_item(client_state& client_st schema_ptr schema = get_table(_proxy, request); lw_shared_ptr per_table_stats = get_stats_from_schema(_proxy, *schema); per_table_stats->api_operations.get_item++; - tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); + tracing::add_alternator_table_name(trace_state, schema->cf_name()); rjson::value& query_key = request["Key"]; db::consistency_level cl = get_read_consistency(request); @@ -4694,7 +4694,7 @@ future executor::batch_get_item(client_state& cli uint batch_size = 0; for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { table_requests rs(get_table_from_batch_request(_proxy, it)); - tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name()); + tracing::add_alternator_table_name(trace_state, rs.schema->cf_name()); rs.cl = get_read_consistency(it->value); std::unordered_set used_attribute_names; rs.attrs_to_get = ::make_shared>(calculate_attrs_to_get(it->value, *_parsed_expression_cache, used_attribute_names)); @@ -5296,6 +5296,7 @@ future executor::scan(client_state& client_state, elogger.trace("Scanning {}", request); auto [schema, table_type] = get_table_or_view(_proxy, request); + tracing::add_alternator_table_name(trace_state, schema->cf_name()); get_stats_from_schema(_proxy, *schema)->api_operations.scan++; auto segment = get_int_attribute(request, "Segment"); auto total_segments = get_int_attribute(request, "TotalSegments"); @@ -5775,7 +5776,7 @@ future executor::query(client_state& client_state auto [schema, table_type] = get_table_or_view(_proxy, request); get_stats_from_schema(_proxy, *schema)->api_operations.query++; - tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); + tracing::add_alternator_table_name(trace_state, schema->cf_name()); rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey"); db::consistency_level cl = get_read_consistency(request); diff --git a/docs/dev/tracing.md b/docs/dev/tracing.md index 15e4453c46..e1af5bfa9f 100644 --- a/docs/dev/tracing.md +++ b/docs/dev/tracing.md @@ -125,6 +125,12 @@ Note that there is no guaranteed way to know when the tracing of a particular se * `request`: a short string describing the current query, like "Execute CQL3 query" * `started_at`: is a timestamp taken when tracing session has began +##### Alternator specific +Alternator commands will add following information to traces: +* `alternator_op` key in `parameters` map - operation type, for example `CreateTable` +* `table` key in `parameters` map - table used in given session if there was exactly one +* `table[0]`, `table[1]`, ... in `parameters` map - tables used in given session, if there were more than one. Names will be sorted before inserting, names will not repeat + ### Slow queries logging #### The motivation Many times in real life installations one of the most important parameters of the system is the longest response time. Naturally, the shorter it is - the better. Therefore capturing the request that take a long time and understanding why it took it so long is a very critical and challenging task. diff --git a/test/alternator/test_tracing.py b/test/alternator/test_tracing.py index 19ecaf26ad..cdc3177460 100644 --- a/test/alternator/test_tracing.py +++ b/test/alternator/test_tracing.py @@ -16,6 +16,7 @@ import json import time +import re import pytest import requests @@ -117,10 +118,15 @@ def iterate_tracing_sessions(dynamodb): time.sleep(0.3) last_scan = full_scan(trace_sessions_table, ConsistentRead=False) - -def find_tracing_session(dynamodb, str): +def iterate_tracing_sessions_with_table_filter(dynamodb, table_name): + expected_table_re = re.compile(fr'table(\[\d+\])? : {table_name}}}') for entry in iterate_tracing_sessions(dynamodb): - if str in entry['parameters']: + if expected_table_re.search(entry["parameters"]): + yield entry + +def find_tracing_session(dynamodb, session_key, table_name): + for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table_name): + if session_key in entry['parameters']: return entry['session_id'] pytest.fail("Couldn't find tracing session") @@ -142,6 +148,10 @@ def get_tracing_events(dynamodb, session_id): ret.append(result['activity']) return ret +def expect_tracing_events(dynamodb, session_key, expected_events, table_name): + session = find_tracing_session(dynamodb, session_key, table_name) + expect_tracing_events_with_session(dynamodb, session, expected_events) + # We have no way to know whether the tracing events returned by # get_tracing_events() is the entire trace. Even though we have already seen # the tracing session, it doesn't guarantee that all the events were fully @@ -149,8 +159,7 @@ def get_tracing_events(dynamodb, session_id): # expect_tracing_events() has to retry the get_tracing_events() call until # a timeout. In the successful case, we'll finish very quickly (usually, # even immediately). -def expect_tracing_events(dynamodb, str, expected_events): - session = find_tracing_session(dynamodb, str) +def expect_tracing_events_with_session(dynamodb, session, expected_events): start = time.time() while time.time() - start < 100: events = get_tracing_events(dynamodb, session) @@ -171,8 +180,7 @@ def expect_tracing_events(dynamodb, str, expected_events): assert event in events # A test table based on test_table_s, but with isolation level defined to 'always' -@pytest.fixture(scope="module") -def test_table_s_isolation_always(dynamodb): +def create_yield_destroy_test_table_s_isolation_always(dynamodb): table = create_test_table(dynamodb, KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ], @@ -180,6 +188,14 @@ def test_table_s_isolation_always(dynamodb): yield table table.delete() +@pytest.fixture(scope="module") +def test_table_s_isolation_always(dynamodb): + yield from create_yield_destroy_test_table_s_isolation_always(dynamodb) + +@pytest.fixture(scope="module") +def test_table_s_isolation_always_2(dynamodb): + yield from create_yield_destroy_test_table_s_isolation_always(dynamodb) + # Because tracing is asynchronous and usually appears as much as two # seconds after the request, it is inefficient to have separate tests # for separate request types - each of these tests will have a latency @@ -187,9 +203,14 @@ def test_table_s_isolation_always(dynamodb): # different request types. This test enables tracing, runs a bunch of # different requests - and only then looks for all of them in the trace # table. -def test_tracing_all(with_tracing, test_table_s_isolation_always, dynamodb): +def test_tracing_all(with_tracing, test_table_s_isolation_always, test_table_s_isolation_always_2, dynamodb): # Run the different requests, each one containing a long random string - # that we can later use to find with find_tracing_session(): + # that we can later use to find with find_tracing_session() + + if test_table_s_isolation_always.name > test_table_s_isolation_always_2.name: + # we don't care about those names, but we care about the order name wise, as later we check + # for `table[0]` and `table[1]` in tracing and tables in tracing are outputed sorted + test_table_s_isolation_always, test_table_s_isolation_always_2 = test_table_s_isolation_always_2, test_table_s_isolation_always table = test_table_s_isolation_always # PutItem: @@ -229,32 +250,67 @@ def test_tracing_all(with_tracing, test_table_s_isolation_always, dynamodb): with scylla_config_temporary(dynamodb, 'alternator_max_users_query_size_in_trace_output', str(len(p_putitem_really_long) + 1024 * 256)): table.put_item(Item={'p': '_' + p_putitem_really_long}) + p_batchwriteitem1 = random_string(20) + p_batchwriteitem2 = random_string(20) + p_batchgetitem1 = random_string(20) + p_batchgetitem2 = random_string(20) + + table.meta.client.batch_write_item(RequestItems = { + test_table_s_isolation_always.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem1}}}], + test_table_s_isolation_always_2.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem2}}}], + }) + table.meta.client.batch_get_item(RequestItems = { + test_table_s_isolation_always.name: {'Keys': [{'p': p_batchgetitem1}]}, + test_table_s_isolation_always_2.name: {'Keys': [{'p': p_batchgetitem2}]}, + }) # Check the traces. NOTE: the following checks are fairly arbitrary, and # may break in the future if we change the tracing messages... - expect_tracing_events(dynamodb, p_putitem, ['PutItem', 'CAS successful']) - expect_tracing_events(dynamodb, p_getitem, ['GetItem', 'Querying is done']) - expect_tracing_events(dynamodb, p_deleteitem, ['DeleteItem', 'CAS successful']) - expect_tracing_events(dynamodb, p_updateitem, ['UpdateItem', 'CAS successful']) - expect_tracing_events(dynamodb, p_batchgetitem, ['BatchGetItem', 'Querying is done']) - expect_tracing_events(dynamodb, p_batchwriteitem, ['BatchWriteItem', 'CAS successful']) - expect_tracing_events(dynamodb, p_query, ['Query', 'Querying is done']) - expect_tracing_events(dynamodb, p_scan, ['Scan', 'Performing a database query']) + expect_tracing_events(dynamodb, p_putitem, ['PutItem', 'CAS successful'], table.name) + expect_tracing_events(dynamodb, p_getitem, ['GetItem', 'Querying is done'], table.name) + expect_tracing_events(dynamodb, p_deleteitem, ['DeleteItem', 'CAS successful'], table.name) + expect_tracing_events(dynamodb, p_updateitem, ['UpdateItem', 'CAS successful'], table.name) + expect_tracing_events(dynamodb, p_batchgetitem, ['BatchGetItem', 'Querying is done'], table.name) + expect_tracing_events(dynamodb, p_batchwriteitem, ['BatchWriteItem', 'CAS successful'], table.name) + expect_tracing_events(dynamodb, p_query, ['Query', 'Querying is done'], table.name) + expect_tracing_events(dynamodb, p_scan, ['Scan', 'Performing a database query'], table.name) - op = 'Alternator PutItem' + op_1 = 'Alternator PutItem' + op_2 = 'Alternator BatchWriteItem' + op_3 = 'Alternator BatchGetItem' found_truncated = False found_not_truncated = False - for entry in iterate_tracing_sessions(dynamodb): - if entry['request'] == op: - p = entry['parameters'] + batch_get_item_double_tables_session_id = None + batch_write_item_double_tables_session_id = None + for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table.name): + p = entry['parameters'] + if entry['request'] == op_1: if beginning_of_p_putitem_really_long in p: if '' in p: found_truncated = True else: found_not_truncated = True - if found_truncated and found_not_truncated: - break - assert found_truncated and found_not_truncated + elif entry['request'] == op_2: + if p_batchwriteitem1 in p and p_batchwriteitem2 in p: + assert f'table[0] : {test_table_s_isolation_always.name}' in p + assert f'table[1] : {test_table_s_isolation_always_2.name}' in p + batch_write_item_double_tables_session_id = entry['session_id'] + elif entry['request'] == op_3: + if p_batchgetitem1 in p and p_batchgetitem2 in p: + assert f'table[0] : {test_table_s_isolation_always.name}' in p + assert f'table[1] : {test_table_s_isolation_always_2.name}' in p + batch_get_item_double_tables_session_id = entry['session_id'] + if found_truncated and found_not_truncated and batch_get_item_double_tables_session_id and batch_write_item_double_tables_session_id: + break + + assert found_truncated + assert found_not_truncated + assert batch_get_item_double_tables_session_id + assert batch_write_item_double_tables_session_id + + expect_tracing_events_with_session(dynamodb, batch_get_item_double_tables_session_id, ['BatchGetItem', 'Querying is done']) + expect_tracing_events_with_session(dynamodb, batch_write_item_double_tables_session_id, ['BatchWriteItem', 'CAS successful']) + # TODO: # We could use traces to show that the right things actually happen during a diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index a9f82005f7..5a36f7c5ca 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -114,6 +114,7 @@ void trace_state::build_parameters_map() { auto& params_map = _records->session_rec.parameters; params_values& vals = *_params_ptr; + const auto &tables = _records->session_rec.tables; if (vals.batchlog_endpoints) { auto batch_endpoints = fmt::format("{}", fmt::join(*vals.batchlog_endpoints | std::views::transform([](locator::host_id ep) {return seastar::format("/{}", ep);}), ",")); @@ -132,6 +133,15 @@ void trace_state::build_parameters_map() { params_map.emplace("page_size", seastar::format("{:d}", *vals.page_size)); } + if (tables.size() == 1) { + params_map.emplace("table", *tables.begin()); + } else { + size_t index = 0; + for (const auto& table : tables) { + params_map.emplace(format("table[{:d}]", index++), table); + } + } + auto& queries = vals.queries; if (!queries.empty()) { if (queries.size() == 1) { diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 867ba0842d..d17b43db22 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -501,6 +501,7 @@ private: friend void add_prepared_statement(const trace_state_ptr& p, prepared_checked_weak_ptr& prepared); friend void set_username(const trace_state_ptr& p, const std::optional& user); friend void add_table_name(const trace_state_ptr& p, const sstring& ks_name, const sstring& cf_name); + friend void add_alternator_table_name(const trace_state_ptr& p, std::string_view table_name); friend void add_prepared_query_options(const trace_state_ptr& state, const cql3::query_options& prepared_options_ptr); friend void stop_foreground(const trace_state_ptr& state) noexcept; }; @@ -677,6 +678,12 @@ inline void add_table_name(const trace_state_ptr& p, const sstring& ks_name, con } } +inline void add_alternator_table_name(const trace_state_ptr& p, std::string_view table_name) { + if (p) { + p->add_table_name(sstring{ table_name }); + } +} + inline bool should_return_id_in_response(const trace_state_ptr& p) { if (p) { return p->write_on_close(); diff --git a/tracing/tracing.hh b/tracing/tracing.hh index 1209c384bf..b15101a132 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -134,7 +134,7 @@ public: } }; -struct one_session_records; +class one_session_records; using records_bulk = std::deque>; struct backend_session_state_base {