mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Add table name to tracing in alternator
Add a table name to Alternator's tracing output, as some clients would like to consistently receive this information. - add missing `tracing::add_table_name` in `executor::scan` - add emiting tables' names in `trace_state::build_parameters_map` - update tests, so when tracing is looked for it is filtered by table's name, which confirms table is being outputed. - change `struct one_session_records` declaration to `class one_session_records`, as `one_session_records` is later defined as class. Refs #26618 Fixes #24031 Closes scylladb/scylladb#26634
This commit is contained in:
committed by
Botond Dénes
parent
22f22d183f
commit
ce8db6e19e
@@ -888,7 +888,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
|
||||
rjson::value response = rjson::empty_object();
|
||||
@@ -989,7 +989,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
std::string table_name = get_table_name(request);
|
||||
|
||||
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
|
||||
tracing::add_table_name(trace_state, keyspace_name, table_name);
|
||||
tracing::add_alternator_table_name(trace_state, table_name);
|
||||
auto& p = _proxy.container();
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
@@ -1583,7 +1583,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
std::unordered_set<std::string> unused_attribute_definitions =
|
||||
validate_attribute_definitions("", *attribute_definitions);
|
||||
|
||||
tracing::add_table_name(trace_state, keyspace_name, table_name);
|
||||
tracing::add_alternator_table_name(trace_state, table_name);
|
||||
|
||||
schema_builder builder(keyspace_name, table_name);
|
||||
auto [hash_key, range_key] = parse_key_schema(request, "");
|
||||
@@ -1930,7 +1930,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
|
||||
schema_ptr tab = get_table(p.local(), request);
|
||||
|
||||
tracing::add_table_name(gt, tab->ks_name(), tab->cf_name());
|
||||
tracing::add_alternator_table_name(gt, tab->cf_name());
|
||||
|
||||
// the ugly but harmless conversion to string_view here is because
|
||||
// Seastar's sstring is missing a find(std::string_view) :-()
|
||||
@@ -2856,7 +2856,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
|
||||
elogger.trace("put_item {}", request);
|
||||
|
||||
auto op = make_shared<put_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
|
||||
const bool needs_read_before_write = op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
@@ -2960,7 +2960,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
|
||||
auto op = make_shared<delete_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *(op->schema()));
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
@@ -3204,7 +3204,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
per_table_stats->api_operations.batch_write_item++;
|
||||
per_table_stats->api_operations.batch_write_item_batch_total += it->value.Size();
|
||||
per_table_stats->api_operations.batch_write_item_histogram.add(it->value.Size());
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
1, primary_key_hash{schema}, primary_key_equal{schema});
|
||||
@@ -4464,7 +4464,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
|
||||
elogger.trace("update_item {}", request);
|
||||
|
||||
auto op = make_shared<update_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
|
||||
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
|
||||
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
|
||||
@@ -4545,7 +4545,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *schema);
|
||||
per_table_stats->api_operations.get_item++;
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value& query_key = request["Key"];
|
||||
db::consistency_level cl = get_read_consistency(request);
|
||||
@@ -4694,7 +4694,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
uint batch_size = 0;
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
table_requests rs(get_table_from_batch_request(_proxy, it));
|
||||
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, rs.schema->cf_name());
|
||||
rs.cl = get_read_consistency(it->value);
|
||||
std::unordered_set<std::string> used_attribute_names;
|
||||
rs.attrs_to_get = ::make_shared<const std::optional<attrs_to_get>>(calculate_attrs_to_get(it->value, *_parsed_expression_cache, used_attribute_names));
|
||||
@@ -5296,6 +5296,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
|
||||
elogger.trace("Scanning {}", request);
|
||||
|
||||
auto [schema, table_type] = get_table_or_view(_proxy, request);
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.scan++;
|
||||
auto segment = get_int_attribute(request, "Segment");
|
||||
auto total_segments = get_int_attribute(request, "TotalSegments");
|
||||
@@ -5775,7 +5776,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
|
||||
|
||||
auto [schema, table_type] = get_table_or_view(_proxy, request);
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.query++;
|
||||
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
|
||||
db::consistency_level cl = get_read_consistency(request);
|
||||
|
||||
@@ -125,6 +125,12 @@ Note that there is no guaranteed way to know when the tracing of a particular se
|
||||
* `request`: a short string describing the current query, like "Execute CQL3 query"
|
||||
* `started_at`: is a timestamp taken when tracing session has began
|
||||
|
||||
##### Alternator specific
|
||||
Alternator commands will add following information to traces:
|
||||
* `alternator_op` key in `parameters` map - operation type, for example `CreateTable`
|
||||
* `table` key in `parameters` map - table used in given session if there was exactly one
|
||||
* `table[0]`, `table[1]`, ... in `parameters` map - tables used in given session, if there were more than one. Names will be sorted before inserting, names will not repeat
|
||||
|
||||
### Slow queries logging
|
||||
#### The motivation
|
||||
Many times in real life installations one of the most important parameters of the system is the longest response time. Naturally, the shorter it is - the better. Therefore capturing the request that take a long time and understanding why it took it so long is a very critical and challenging task.
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import json
|
||||
import time
|
||||
import re
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -117,10 +118,15 @@ def iterate_tracing_sessions(dynamodb):
|
||||
time.sleep(0.3)
|
||||
last_scan = full_scan(trace_sessions_table, ConsistentRead=False)
|
||||
|
||||
|
||||
def find_tracing_session(dynamodb, str):
|
||||
def iterate_tracing_sessions_with_table_filter(dynamodb, table_name):
|
||||
expected_table_re = re.compile(fr'table(\[\d+\])? : {table_name}}}')
|
||||
for entry in iterate_tracing_sessions(dynamodb):
|
||||
if str in entry['parameters']:
|
||||
if expected_table_re.search(entry["parameters"]):
|
||||
yield entry
|
||||
|
||||
def find_tracing_session(dynamodb, session_key, table_name):
|
||||
for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table_name):
|
||||
if session_key in entry['parameters']:
|
||||
return entry['session_id']
|
||||
pytest.fail("Couldn't find tracing session")
|
||||
|
||||
@@ -142,6 +148,10 @@ def get_tracing_events(dynamodb, session_id):
|
||||
ret.append(result['activity'])
|
||||
return ret
|
||||
|
||||
def expect_tracing_events(dynamodb, session_key, expected_events, table_name):
|
||||
session = find_tracing_session(dynamodb, session_key, table_name)
|
||||
expect_tracing_events_with_session(dynamodb, session, expected_events)
|
||||
|
||||
# We have no way to know whether the tracing events returned by
|
||||
# get_tracing_events() is the entire trace. Even though we have already seen
|
||||
# the tracing session, it doesn't guarantee that all the events were fully
|
||||
@@ -149,8 +159,7 @@ def get_tracing_events(dynamodb, session_id):
|
||||
# expect_tracing_events() has to retry the get_tracing_events() call until
|
||||
# a timeout. In the successful case, we'll finish very quickly (usually,
|
||||
# even immediately).
|
||||
def expect_tracing_events(dynamodb, str, expected_events):
|
||||
session = find_tracing_session(dynamodb, str)
|
||||
def expect_tracing_events_with_session(dynamodb, session, expected_events):
|
||||
start = time.time()
|
||||
while time.time() - start < 100:
|
||||
events = get_tracing_events(dynamodb, session)
|
||||
@@ -171,8 +180,7 @@ def expect_tracing_events(dynamodb, str, expected_events):
|
||||
assert event in events
|
||||
|
||||
# A test table based on test_table_s, but with isolation level defined to 'always'
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_s_isolation_always(dynamodb):
|
||||
def create_yield_destroy_test_table_s_isolation_always(dynamodb):
|
||||
table = create_test_table(dynamodb,
|
||||
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
||||
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ],
|
||||
@@ -180,6 +188,14 @@ def test_table_s_isolation_always(dynamodb):
|
||||
yield table
|
||||
table.delete()
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_s_isolation_always(dynamodb):
|
||||
yield from create_yield_destroy_test_table_s_isolation_always(dynamodb)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_s_isolation_always_2(dynamodb):
|
||||
yield from create_yield_destroy_test_table_s_isolation_always(dynamodb)
|
||||
|
||||
# Because tracing is asynchronous and usually appears as much as two
|
||||
# seconds after the request, it is inefficient to have separate tests
|
||||
# for separate request types - each of these tests will have a latency
|
||||
@@ -187,9 +203,14 @@ def test_table_s_isolation_always(dynamodb):
|
||||
# different request types. This test enables tracing, runs a bunch of
|
||||
# different requests - and only then looks for all of them in the trace
|
||||
# table.
|
||||
def test_tracing_all(with_tracing, test_table_s_isolation_always, dynamodb):
|
||||
def test_tracing_all(with_tracing, test_table_s_isolation_always, test_table_s_isolation_always_2, dynamodb):
|
||||
# Run the different requests, each one containing a long random string
|
||||
# that we can later use to find with find_tracing_session():
|
||||
# that we can later use to find with find_tracing_session()
|
||||
|
||||
if test_table_s_isolation_always.name > test_table_s_isolation_always_2.name:
|
||||
# we don't care about those names, but we care about the order name wise, as later we check
|
||||
# for `table[0]` and `table[1]` in tracing and tables in tracing are outputed sorted
|
||||
test_table_s_isolation_always, test_table_s_isolation_always_2 = test_table_s_isolation_always_2, test_table_s_isolation_always
|
||||
|
||||
table = test_table_s_isolation_always
|
||||
# PutItem:
|
||||
@@ -229,32 +250,67 @@ def test_tracing_all(with_tracing, test_table_s_isolation_always, dynamodb):
|
||||
with scylla_config_temporary(dynamodb, 'alternator_max_users_query_size_in_trace_output', str(len(p_putitem_really_long) + 1024 * 256)):
|
||||
table.put_item(Item={'p': '_' + p_putitem_really_long})
|
||||
|
||||
p_batchwriteitem1 = random_string(20)
|
||||
p_batchwriteitem2 = random_string(20)
|
||||
p_batchgetitem1 = random_string(20)
|
||||
p_batchgetitem2 = random_string(20)
|
||||
|
||||
table.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s_isolation_always.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem1}}}],
|
||||
test_table_s_isolation_always_2.name: [{'PutRequest': {'Item': {'p': p_batchwriteitem2}}}],
|
||||
})
|
||||
table.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s_isolation_always.name: {'Keys': [{'p': p_batchgetitem1}]},
|
||||
test_table_s_isolation_always_2.name: {'Keys': [{'p': p_batchgetitem2}]},
|
||||
})
|
||||
|
||||
# Check the traces. NOTE: the following checks are fairly arbitrary, and
|
||||
# may break in the future if we change the tracing messages...
|
||||
expect_tracing_events(dynamodb, p_putitem, ['PutItem', 'CAS successful'])
|
||||
expect_tracing_events(dynamodb, p_getitem, ['GetItem', 'Querying is done'])
|
||||
expect_tracing_events(dynamodb, p_deleteitem, ['DeleteItem', 'CAS successful'])
|
||||
expect_tracing_events(dynamodb, p_updateitem, ['UpdateItem', 'CAS successful'])
|
||||
expect_tracing_events(dynamodb, p_batchgetitem, ['BatchGetItem', 'Querying is done'])
|
||||
expect_tracing_events(dynamodb, p_batchwriteitem, ['BatchWriteItem', 'CAS successful'])
|
||||
expect_tracing_events(dynamodb, p_query, ['Query', 'Querying is done'])
|
||||
expect_tracing_events(dynamodb, p_scan, ['Scan', 'Performing a database query'])
|
||||
expect_tracing_events(dynamodb, p_putitem, ['PutItem', 'CAS successful'], table.name)
|
||||
expect_tracing_events(dynamodb, p_getitem, ['GetItem', 'Querying is done'], table.name)
|
||||
expect_tracing_events(dynamodb, p_deleteitem, ['DeleteItem', 'CAS successful'], table.name)
|
||||
expect_tracing_events(dynamodb, p_updateitem, ['UpdateItem', 'CAS successful'], table.name)
|
||||
expect_tracing_events(dynamodb, p_batchgetitem, ['BatchGetItem', 'Querying is done'], table.name)
|
||||
expect_tracing_events(dynamodb, p_batchwriteitem, ['BatchWriteItem', 'CAS successful'], table.name)
|
||||
expect_tracing_events(dynamodb, p_query, ['Query', 'Querying is done'], table.name)
|
||||
expect_tracing_events(dynamodb, p_scan, ['Scan', 'Performing a database query'], table.name)
|
||||
|
||||
op = 'Alternator PutItem'
|
||||
op_1 = 'Alternator PutItem'
|
||||
op_2 = 'Alternator BatchWriteItem'
|
||||
op_3 = 'Alternator BatchGetItem'
|
||||
found_truncated = False
|
||||
found_not_truncated = False
|
||||
for entry in iterate_tracing_sessions(dynamodb):
|
||||
if entry['request'] == op:
|
||||
p = entry['parameters']
|
||||
batch_get_item_double_tables_session_id = None
|
||||
batch_write_item_double_tables_session_id = None
|
||||
for entry in iterate_tracing_sessions_with_table_filter(dynamodb, table.name):
|
||||
p = entry['parameters']
|
||||
if entry['request'] == op_1:
|
||||
if beginning_of_p_putitem_really_long in p:
|
||||
if '<truncated>' in p:
|
||||
found_truncated = True
|
||||
else:
|
||||
found_not_truncated = True
|
||||
if found_truncated and found_not_truncated:
|
||||
break
|
||||
assert found_truncated and found_not_truncated
|
||||
elif entry['request'] == op_2:
|
||||
if p_batchwriteitem1 in p and p_batchwriteitem2 in p:
|
||||
assert f'table[0] : {test_table_s_isolation_always.name}' in p
|
||||
assert f'table[1] : {test_table_s_isolation_always_2.name}' in p
|
||||
batch_write_item_double_tables_session_id = entry['session_id']
|
||||
elif entry['request'] == op_3:
|
||||
if p_batchgetitem1 in p and p_batchgetitem2 in p:
|
||||
assert f'table[0] : {test_table_s_isolation_always.name}' in p
|
||||
assert f'table[1] : {test_table_s_isolation_always_2.name}' in p
|
||||
batch_get_item_double_tables_session_id = entry['session_id']
|
||||
if found_truncated and found_not_truncated and batch_get_item_double_tables_session_id and batch_write_item_double_tables_session_id:
|
||||
break
|
||||
|
||||
assert found_truncated
|
||||
assert found_not_truncated
|
||||
assert batch_get_item_double_tables_session_id
|
||||
assert batch_write_item_double_tables_session_id
|
||||
|
||||
expect_tracing_events_with_session(dynamodb, batch_get_item_double_tables_session_id, ['BatchGetItem', 'Querying is done'])
|
||||
expect_tracing_events_with_session(dynamodb, batch_write_item_double_tables_session_id, ['BatchWriteItem', 'CAS successful'])
|
||||
|
||||
|
||||
# TODO:
|
||||
# We could use traces to show that the right things actually happen during a
|
||||
|
||||
@@ -114,6 +114,7 @@ void trace_state::build_parameters_map() {
|
||||
|
||||
auto& params_map = _records->session_rec.parameters;
|
||||
params_values& vals = *_params_ptr;
|
||||
const auto &tables = _records->session_rec.tables;
|
||||
|
||||
if (vals.batchlog_endpoints) {
|
||||
auto batch_endpoints = fmt::format("{}", fmt::join(*vals.batchlog_endpoints | std::views::transform([](locator::host_id ep) {return seastar::format("/{}", ep);}), ","));
|
||||
@@ -132,6 +133,15 @@ void trace_state::build_parameters_map() {
|
||||
params_map.emplace("page_size", seastar::format("{:d}", *vals.page_size));
|
||||
}
|
||||
|
||||
if (tables.size() == 1) {
|
||||
params_map.emplace("table", *tables.begin());
|
||||
} else {
|
||||
size_t index = 0;
|
||||
for (const auto& table : tables) {
|
||||
params_map.emplace(format("table[{:d}]", index++), table);
|
||||
}
|
||||
}
|
||||
|
||||
auto& queries = vals.queries;
|
||||
if (!queries.empty()) {
|
||||
if (queries.size() == 1) {
|
||||
|
||||
@@ -501,6 +501,7 @@ private:
|
||||
friend void add_prepared_statement(const trace_state_ptr& p, prepared_checked_weak_ptr& prepared);
|
||||
friend void set_username(const trace_state_ptr& p, const std::optional<auth::authenticated_user>& user);
|
||||
friend void add_table_name(const trace_state_ptr& p, const sstring& ks_name, const sstring& cf_name);
|
||||
friend void add_alternator_table_name(const trace_state_ptr& p, std::string_view table_name);
|
||||
friend void add_prepared_query_options(const trace_state_ptr& state, const cql3::query_options& prepared_options_ptr);
|
||||
friend void stop_foreground(const trace_state_ptr& state) noexcept;
|
||||
};
|
||||
@@ -677,6 +678,12 @@ inline void add_table_name(const trace_state_ptr& p, const sstring& ks_name, con
|
||||
}
|
||||
}
|
||||
|
||||
inline void add_alternator_table_name(const trace_state_ptr& p, std::string_view table_name) {
|
||||
if (p) {
|
||||
p->add_table_name(sstring{ table_name });
|
||||
}
|
||||
}
|
||||
|
||||
inline bool should_return_id_in_response(const trace_state_ptr& p) {
|
||||
if (p) {
|
||||
return p->write_on_close();
|
||||
|
||||
@@ -134,7 +134,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
struct one_session_records;
|
||||
class one_session_records;
|
||||
using records_bulk = std::deque<lw_shared_ptr<one_session_records>>;
|
||||
|
||||
struct backend_session_state_base {
|
||||
|
||||
Reference in New Issue
Block a user