Files
scylladb/test/alternator/test_tracing.py
Radosław Cybulski ce8db6e19e Add table name to tracing in alternator
Add a table name to Alternator's tracing output, as some clients would
like to consistently receive this information.

- add missing `tracing::add_table_name` in `executor::scan`
- add emiting tables' names in `trace_state::build_parameters_map`
- update tests, so when tracing is looked for it is filtered by table's
  name, which confirms table is being outputed.
- change `struct one_session_records` declaration to `class one_session_records`,
  as `one_session_records` is later defined as class.

Refs #26618
Fixes #24031

Closes scylladb/scylladb#26634
2025-11-21 09:33:40 +02:00

341 lines
17 KiB
Python

# Copyright 2020-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
##############################################################################
# Tests for Scylla's "tracing" feature (see docs/dev/tracing.md) for Alternator
# queries. Reproduces issue #6891, where although tracing was implemented in
# Alternator, it only worked correctly for some operations but not others.
# In the tests here we attempt to ensure that the tracing feature continues
# to work for the relevant operations.
#
# Note that all tests in this file test Scylla-specific features, and are
# "skipped" when not running against Scylla, or when unable to enable tracing
# through out-of-band HTTP requests.
##############################################################################
import json
import time
import re
import pytest
import requests
from test.alternator.util import random_string, full_scan, full_query, create_test_table, scylla_config_temporary
# The "with_tracing" fixture ensures that tracing is enabled throughout
# the run of a test function, and disabled when it ends. If tracing cannot be
# enabled, the test is pytest.skip()ed. This will of course happens if we run
# the test with "--aws" (tracing is a Scylla-only feature).
# Note that to support (in the future) the ability for Alternator tests to
# run in parallel, the tests here need to be prepared that completely
# unrelated requests get traced during a test with with_tracing.
@pytest.fixture(scope="function")
def with_tracing(rest_api):
probability_resp = requests.get(rest_api+'/storage_service/trace_probability')
if probability_resp.status_code != 200:
pytest.skip('Failed to fetch tracing probability')
probability = probability_resp.text
response = requests.post(rest_api+'/storage_service/trace_probability?probability=1')
if response.status_code != 200:
pytest.skip('Failed to enable tracing')
# verify that tracing is really enabled
response = requests.get(rest_api+'/storage_service/trace_probability')
if response.status_code != 200 or response.content.decode('utf-8') != '1':
pytest.skip('Failed to verify tracing')
yield
print("with_tracing restoring tracing")
response = requests.post(rest_api+'/storage_service/trace_probability?probability='+probability)
if response.status_code != 200:
pytest.fail('Failed to disable tracing after with_tracing test')
response = requests.get(rest_api+'/storage_service/trace_probability')
if response.status_code != 200 or response.content.decode('utf-8') != '0':
pytest.skip('Failed to verify tracing disabled')
# Similarly to the fixture above, slow query logging is enabled only for the run of the
# test function. Slow logging is set up with threshold equal to 0 microseconds,
# which effectively means that each query will be qualified as slow and logged.
@pytest.fixture(scope="function")
def with_slow_query_logging(rest_api):
print("with_slow_query_logging enabling slow query logging")
slow_query_info = requests.get(rest_api+'/storage_service/slow_query')
if slow_query_info.status_code != 200:
pytest.skip('Failed to fetch slow query logging info')
slow_query_json = json.loads(slow_query_info.text)
print(slow_query_json)
response = requests.post(rest_api+'/storage_service/slow_query?enable=true')
if response.status_code != 200:
pytest.skip('Failed to enable slow query logging')
response = requests.post(rest_api+'/storage_service/slow_query?threshold=0')
if response.status_code != 200:
pytest.skip('Failed to enable slow query logging threshold')
# verify that logging is really enabled
response = requests.get(rest_api+'/storage_service/slow_query')
if response.status_code != 200:
pytest.skip('Failed to verify slow query logging')
response_json = json.loads(response.text)
if response_json['enable'] != True or response_json['threshold'] != 0:
pytest.skip('Failed to verify slow query logging values')
print(response_json)
yield
print("with_slow_query_logging restoring slow query logging")
response = requests.post(rest_api+'/storage_service/slow_query?enable='+str(slow_query_json['enable']))
if response.status_code != 200:
pytest.fail('Failed to restore slow query logging')
response = requests.post(rest_api+'/storage_service/slow_query?threshold='+str(slow_query_json['threshold']))
if response.status_code != 200:
pytest.fail('Failed to restore slow query logging threshold')
# Unfortunately, we currently have no way of directly finding the tracing
# session of a specific Alternator request. We could have returned the
# tracing session ID as a field of the response - but currently we don't.
# What we do instead is to scan the entire tracing session table looking for
# a given string in one of the commands' parameters. If this string is a long
# random string unique to the specific command, we'll find the right session.
# This approach is inefficient on a real production system (we don't even have
# a way to clear the sessions table after we use it so it grows longer and
# longer), but is good enough for tests on a throwable boot of Scylla as in
# test/alternator/run.
last_scan = None
def iterate_tracing_sessions(dynamodb):
# The tracing session table does not get updated immediately - we may need
# to sleep a bit until the requested string appears. We save the previous
# session table in last_scan, so if we're looking for sessions of number of
# different requests started together, it might be enough to read from disk
# the session table just once, and not re-read (and of course, not sleep)
# when looking for the other requests.
global last_scan
trace_sessions_table = dynamodb.Table('.scylla.alternator.system_traces.sessions')
start = time.time()
if last_scan == None:
# The trace tables have RF=2, even on a one-node test setup, and
# thus fail reads with ConsistentRead=True (Quorum)...
last_scan = full_scan(trace_sessions_table, ConsistentRead=False)
while time.time() - start < 100:
for entry in last_scan:
yield entry
time.sleep(0.3)
last_scan = full_scan(trace_sessions_table, ConsistentRead=False)
def iterate_tracing_sessions_with_table_filter(dynamodb, table_name):
expected_table_re = re.compile(fr'table(\[\d+\])? : {table_name}}}')
for entry in iterate_tracing_sessions(dynamodb):
if expected_table_re.search(entry["parameters"]):
yield entry
def find_tracing_session(dynamodb, session_key, table_name):
for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table_name):
if session_key in entry['parameters']:
return entry['session_id']
pytest.fail("Couldn't find tracing session")
# For the given tracing session_id of a request, read the list of "activities"
# that happened during this request. We discard the other information we have
# about each activity such as time, shard, source, and so on, and only keep
# the textual "activity" strings, and return them sorted like they were in
# the events table (i.e., by time).
def get_tracing_events(dynamodb, session_id):
ret = []
trace_events_table = dynamodb.Table('.scylla.alternator.system_traces.events')
results = full_query(trace_events_table, ConsistentRead=False,
KeyConditionExpression='session_id = :s',
ExpressionAttributeValues={':s': session_id})
for result in results:
# as explained above, we only save the 'activity' string. There are
# other interesting fields (like 'source_elapsed', the elapsed time)
# which we don't save.
ret.append(result['activity'])
return ret
def expect_tracing_events(dynamodb, session_key, expected_events, table_name):
session = find_tracing_session(dynamodb, session_key, table_name)
expect_tracing_events_with_session(dynamodb, session, expected_events)
# We have no way to know whether the tracing events returned by
# get_tracing_events() is the entire trace. Even though we have already seen
# the tracing session, it doesn't guarantee that all the events were fully
# written (both are written independently and asynchronously). So the
# expect_tracing_events() has to retry the get_tracing_events() call until
# a timeout. In the successful case, we'll finish very quickly (usually,
# even immediately).
def expect_tracing_events_with_session(dynamodb, session, expected_events):
start = time.time()
while time.time() - start < 100:
events = get_tracing_events(dynamodb, session)
for event in expected_events:
if not event in events:
break
else:
# Success!
# Besides the specific expected_events, we want to check that
# the event list isn't ridiculously short.
if len(events) > 3:
return
time.sleep(0.1)
# Failed until the timeout. Repeat the last tests with an assertion,
# to get a useful error report from pytest.
assert len(events) > 3
for event in expected_events:
assert event in events
# A test table based on test_table_s, but with isolation level defined to 'always'
def create_yield_destroy_test_table_s_isolation_always(dynamodb):
table = create_test_table(dynamodb,
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ],
Tags=[{'Key': 'system:write_isolation', 'Value': 'always'}])
yield table
table.delete()
@pytest.fixture(scope="module")
def test_table_s_isolation_always(dynamodb):
yield from create_yield_destroy_test_table_s_isolation_always(dynamodb)
@pytest.fixture(scope="module")
def test_table_s_isolation_always_2(dynamodb):
yield from create_yield_destroy_test_table_s_isolation_always(dynamodb)
# Because tracing is asynchronous and usually appears as much as two
# seconds after the request, it is inefficient to have separate tests
# for separate request types - each of these tests will have a latency
# of as much as one second. Instead, we write just one test for all the
# different request types. This test enables tracing, runs a bunch of
# different requests - and only then looks for all of them in the trace
# table.
def test_tracing_all(with_tracing, test_table_s_isolation_always, test_table_s_isolation_always_2, dynamodb):
# Run the different requests, each one containing a long random string
# that we can later use to find with find_tracing_session()
if test_table_s_isolation_always.name > test_table_s_isolation_always_2.name:
# we don't care about those names, but we care about the order name wise, as later we check
# for `table[0]` and `table[1]` in tracing and tables in tracing are outputed sorted
test_table_s_isolation_always, test_table_s_isolation_always_2 = test_table_s_isolation_always_2, test_table_s_isolation_always
table = test_table_s_isolation_always
# PutItem:
p_putitem = random_string(20)
table.put_item(Item={'p': p_putitem})
# GetItem:
p_getitem = random_string(20)
table.get_item(Key={'p': p_getitem})
# DeleteItem:
p_deleteitem = random_string(20)
table.delete_item(Key={'p': p_deleteitem})
# UpdateItem:
p_updateitem = random_string(20)
table.update_item(Key={'p': p_updateitem}, AttributeUpdates={})
# BatchGetItem:
p_batchgetitem = random_string(20)
table.meta.client.batch_get_item(RequestItems = {table.name: {'Keys': [{'p': p_batchgetitem}]}})
# BatchWriteItem:
p_batchwriteitem = random_string(20)
table.meta.client.batch_write_item(RequestItems = {table.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem}}}]})
# Query:
p_query = random_string(20)
full_query(table, KeyConditionExpression='p = :p', ExpressionAttributeValues={':p': p_query})
# Scan:
p_scan = random_string(20)
full_scan(table, FilterExpression='p = :p', ExpressionAttributeValues={':p': p_scan})
# we will fetch tracing sessions based on operation and beginning of the long unique string
# we have set max trace output size to the length of the string, so we can expect truncation
# to happen at the end of the string and the beginning should be present
p_putitem_really_long = random_string(1024 * 8)
beginning_of_p_putitem_really_long = p_putitem_really_long[:1024]
with scylla_config_temporary(dynamodb, 'alternator_max_users_query_size_in_trace_output', str(len(p_putitem_really_long))):
table.put_item(Item={'p': p_putitem_really_long})
with scylla_config_temporary(dynamodb, 'alternator_max_users_query_size_in_trace_output', str(len(p_putitem_really_long) + 1024 * 256)):
table.put_item(Item={'p': '_' + p_putitem_really_long})
p_batchwriteitem1 = random_string(20)
p_batchwriteitem2 = random_string(20)
p_batchgetitem1 = random_string(20)
p_batchgetitem2 = random_string(20)
table.meta.client.batch_write_item(RequestItems = {
test_table_s_isolation_always.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem1}}}],
test_table_s_isolation_always_2.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem2}}}],
})
table.meta.client.batch_get_item(RequestItems = {
test_table_s_isolation_always.name: {'Keys': [{'p': p_batchgetitem1}]},
test_table_s_isolation_always_2.name: {'Keys': [{'p': p_batchgetitem2}]},
})
# Check the traces. NOTE: the following checks are fairly arbitrary, and
# may break in the future if we change the tracing messages...
expect_tracing_events(dynamodb, p_putitem, ['PutItem', 'CAS successful'], table.name)
expect_tracing_events(dynamodb, p_getitem, ['GetItem', 'Querying is done'], table.name)
expect_tracing_events(dynamodb, p_deleteitem, ['DeleteItem', 'CAS successful'], table.name)
expect_tracing_events(dynamodb, p_updateitem, ['UpdateItem', 'CAS successful'], table.name)
expect_tracing_events(dynamodb, p_batchgetitem, ['BatchGetItem', 'Querying is done'], table.name)
expect_tracing_events(dynamodb, p_batchwriteitem, ['BatchWriteItem', 'CAS successful'], table.name)
expect_tracing_events(dynamodb, p_query, ['Query', 'Querying is done'], table.name)
expect_tracing_events(dynamodb, p_scan, ['Scan', 'Performing a database query'], table.name)
op_1 = 'Alternator PutItem'
op_2 = 'Alternator BatchWriteItem'
op_3 = 'Alternator BatchGetItem'
found_truncated = False
found_not_truncated = False
batch_get_item_double_tables_session_id = None
batch_write_item_double_tables_session_id = None
for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table.name):
p = entry['parameters']
if entry['request'] == op_1:
if beginning_of_p_putitem_really_long in p:
if '<truncated>' in p:
found_truncated = True
else:
found_not_truncated = True
elif entry['request'] == op_2:
if p_batchwriteitem1 in p and p_batchwriteitem2 in p:
assert f'table[0] : {test_table_s_isolation_always.name}' in p
assert f'table[1] : {test_table_s_isolation_always_2.name}' in p
batch_write_item_double_tables_session_id = entry['session_id']
elif entry['request'] == op_3:
if p_batchgetitem1 in p and p_batchgetitem2 in p:
assert f'table[0] : {test_table_s_isolation_always.name}' in p
assert f'table[1] : {test_table_s_isolation_always_2.name}' in p
batch_get_item_double_tables_session_id = entry['session_id']
if found_truncated and found_not_truncated and batch_get_item_double_tables_session_id and batch_write_item_double_tables_session_id:
break
assert found_truncated
assert found_not_truncated
assert batch_get_item_double_tables_session_id
assert batch_write_item_double_tables_session_id
expect_tracing_events_with_session(dynamodb, batch_get_item_double_tables_session_id, ['BatchGetItem', 'Querying is done'])
expect_tracing_events_with_session(dynamodb, batch_write_item_double_tables_session_id, ['BatchWriteItem', 'CAS successful'])
# TODO:
# We could use traces to show that the right things actually happen during a
# request. In issue #6747 we suspected that maybe GetItem doesn't read just
# the requested item, and a tracing test can prove or disprove that hunch.
def test_slow_query_log(with_slow_query_logging, test_table_s, dynamodb):
table = test_table_s
p = random_string(20)
print(f"Traced key: {p}")
table.put_item(Item={'p': p})
table.delete_item(Key={'p': p})
# Verify that the operations got logged. Each operation taking more than 0 microseconds is logged,
# which effectively logs all requests as slow.
slow_query_table = dynamodb.Table('.scylla.alternator.system_traces.node_slow_log')
start_time = time.time()
while time.time() < start_time + 60:
results = full_scan(slow_query_table, ConsistentRead=False)
put_item_found = any("PutItem" in result['parameters'] and p in result['parameters']
and result['username'] == "cassandra" for result in results)
delete_item_found = any("DeleteItem" in result['parameters'] and p in result['parameters']
and result['username'] == "cassandra" for result in results)
if put_item_found and delete_item_found:
return
else:
time.sleep(0.5)
pytest.fail("Slow query entries not found")