Files
scylladb/tracing/trace_keyspace_helper.cc
Benny Halevy 3d87b67d0e tracing: trace_keyspace_helper: use named gate
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-04-12 11:29:48 +03:00

481 lines
28 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include <seastar/core/metrics.hh>
#include "types/types.hh"
#include "tracing/trace_keyspace_helper.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/cql_config.hh"
#include "types/set.hh"
#include "types/map.hh"
#include "utils/assert.hh"
#include "utils/UUID_gen.hh"
#include "utils/class_registrator.hh"
#include "service/storage_proxy.hh"
namespace tracing {
using namespace std::chrono_literals;
static logging::logger tlogger("trace_keyspace_helper");
static service::client_state& tracing_client_state() {
static timeout_config tracing_db_timeout_config {
5s, 5s, 5s, 5s, 5s, 5s, 5s,
};
static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config);
return s;
}
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
int64_t last_nanos = 0;
semaphore write_sem {1};
virtual ~trace_keyspace_backend_sesssion_state() {}
};
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
: i_tracing_backend_helper(tr)
, _pending_writes("trace_keyspace_helper::pending_writes")
, _dummy_query_state(tracing_client_state(), empty_service_permit())
, _sessions(KEYSPACE_NAME, SESSIONS,
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
"session_id uuid,"
"command text,"
"client inet,"
"coordinator inet,"
"duration int,"
"parameters map<text, text>,"
"request text,"
"started_at timestamp,"
"request_size int,"
"response_size int,"
"username text,"
"PRIMARY KEY ((session_id))) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, SESSIONS),
fmt::format("INSERT INTO {}.{} ("
"session_id,"
"command,"
"client,"
"coordinator,"
"duration,"
"parameters,"
"request,"
"started_at,"
"request_size,"
"response_size,"
"username) VALUES ("
":session_id,"
":command,"
":client,"
":coordinator,"
":duration,"
":parameters,"
":request,"
":started_at,"
":request_size,"
":response_size,"
":username) USING TTL :ttl", KEYSPACE_NAME, SESSIONS),
fmt::format("INSERT INTO {}.{} ("
"session_id,"
"command,"
"client,"
"coordinator,"
"duration,"
"parameters,"
"request,"
"started_at,"
"request_size,"
"response_size) VALUES ("
":session_id,"
":command,"
":client,"
":coordinator,"
":duration,"
":parameters,"
":request,"
":started_at,"
":request_size,"
":response_size) USING TTL :ttl", KEYSPACE_NAME, SESSIONS))
, _sessions_time_idx(KEYSPACE_NAME, SESSIONS_TIME_IDX,
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
"minute timestamp,"
"started_at timestamp,"
"session_id uuid,"
"PRIMARY KEY (minute, started_at, session_id)) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, SESSIONS_TIME_IDX),
fmt::format("INSERT INTO {}.{} ("
"minute,"
"started_at,"
"session_id) VALUES (?, ?, ?) "
"USING TTL ?", KEYSPACE_NAME, SESSIONS_TIME_IDX))
, _events(KEYSPACE_NAME, EVENTS,
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
"session_id uuid,"
"event_id timeuuid,"
"activity text,"
"source inet,"
"source_elapsed int,"
"thread text,"
"scylla_parent_id bigint,"
"scylla_span_id bigint,"
"PRIMARY KEY ((session_id), event_id)) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, EVENTS),
fmt::format("INSERT INTO {}.{} ("
"session_id, "
"event_id, "
"activity, "
"source, "
"source_elapsed, "
"thread,"
"scylla_parent_id,"
"scylla_span_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) "
"USING TTL ?", KEYSPACE_NAME, EVENTS))
, _slow_query_log(KEYSPACE_NAME, NODE_SLOW_QUERY_LOG,
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
"node_ip inet,"
"shard int,"
"session_id uuid,"
"date timestamp,"
"start_time timeuuid,"
"command text,"
"duration int,"
"parameters map<text, text>,"
"source_ip inet,"
"table_names set<text>,"
"username text,"
"PRIMARY KEY (start_time, node_ip, shard)) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, NODE_SLOW_QUERY_LOG),
fmt::format("INSERT INTO {}.{} ("
"node_ip,"
"shard,"
"session_id,"
"date,"
"start_time,"
"command,"
"duration,"
"parameters,"
"source_ip,"
"table_names,"
"username) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "
"USING TTL ?", KEYSPACE_NAME, NODE_SLOW_QUERY_LOG))
, _slow_query_log_time_idx(KEYSPACE_NAME, NODE_SLOW_QUERY_LOG_TIME_IDX,
fmt::format("CREATE TABLE IF NOT EXISTS {}.{} ("
"minute timestamp,"
"started_at timestamp,"
"session_id uuid,"
"start_time timeuuid,"
"node_ip inet,"
"shard int,"
"PRIMARY KEY (minute, started_at, session_id)) "
"WITH default_time_to_live = 86400", KEYSPACE_NAME, NODE_SLOW_QUERY_LOG_TIME_IDX),
fmt::format("INSERT INTO {}.{} ("
"minute,"
"started_at,"
"session_id,"
"start_time,"
"node_ip,"
"shard) VALUES (?, ?, ?, ?, ?, ?)"
"USING TTL ?", KEYSPACE_NAME, NODE_SLOW_QUERY_LOG_TIME_IDX))
{
namespace sm = seastar::metrics;
_metrics.add_group("tracing_keyspace_helper", {
sm::make_counter("tracing_errors", [this] { return _stats.tracing_errors; },
sm::description("Counts a number of errors during writing to a system_traces keyspace. "
"One error may cause one or more tracing records to be lost.")),
sm::make_counter("bad_column_family_errors", [this] { return _stats.bad_column_family_errors; },
sm::description("Counts a number of times write failed due to one of the tables in the system_traces keyspace has an incompatible schema. "
"One error may result one or more tracing records to be lost. "
"Non-zero value indicates that the administrator has to take immediate steps to fix the corresponding schema. "
"The appropriate error message will be printed in the syslog.")),
});
}
future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migration_manager& mm) {
_qp_anchor = &qp;
_mm_anchor = &mm;
return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "org.apache.cassandra.locator.SimpleStrategy", "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
}
gms::inet_address trace_keyspace_helper::my_address() const noexcept {
return _qp_anchor->proxy().my_address();
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
// Future is waited on indirectly in `stop()` (via `_pending_writes`).
(void)with_gate(_pending_writes, [this, records = std::move(records)] {
auto num_records = records->size();
return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); });
}).handle_exception([this] (auto ep) {
try {
++_stats.tracing_errors;
std::rethrow_exception(ep);
} catch (exceptions::overloaded_exception&) {
tlogger.warn("Too many nodes are overloaded to save trace events");
} catch (bad_column_family& e) {
if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) {
tlogger.warn("Tracing is enabled but {}", e.what());
}
} catch (std::logic_error& e) {
tlogger.error("{}", e.what());
} catch (...) {
// TODO: Handle some more exceptions maybe?
}
}).discard_result();
}
void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) {
tlogger.trace("Writing {} sessions", bulk.size());
std::for_each(bulk.begin(), bulk.end(), [this] (records_bulk::value_type& one_session_records_ptr) {
write_one_session_records(std::move(one_session_records_ptr));
});
}
cql3::query_options trace_keyspace_helper::make_session_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
const session_record& record = session_records.session_rec;
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
std::vector<std::pair<data_value, data_value>> parameters_values_vector;
parameters_values_vector.reserve(record.parameters.size());
std::for_each(record.parameters.begin(), record.parameters.end(), [&parameters_values_vector] (auto& val_pair) { parameters_values_vector.emplace_back(val_pair.first, val_pair.second); });
auto my_map_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
std::vector<std::string_view> names {
"session_id",
"command",
"client",
"coordinator",
"duration",
"parameters",
"request",
"started_at",
"request_size",
"response_size",
"username",
"ttl"
};
std::vector<cql3::raw_value> values {
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(utf8_type->decompose(type_to_string(record.command))),
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
cql3::raw_value::make_value(utf8_type->decompose(record.request)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.request_size))),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.response_size))),
cql3::raw_value::make_value(utf8_type->decompose(record.username)),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(session_records.ttl.count())))
};
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records) {
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
// timestamp in minutes when the query began
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
// timestamp when the query began
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(started_at_duration).count();
std::vector<cql3::raw_value> values {
cql3::raw_value::make_value(timestamp_type->decompose(minutes_in_millis)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.ttl.count())))
};
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
const session_record& record = session_records.session_rec;
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(record.started_at.time_since_epoch()).count();
// query command is stored on a parameters map with a 'query' key
const auto query_str_it = record.parameters.find("query");
if (query_str_it == record.parameters.end()) {
tlogger.trace("No \"query\" parameter set for a session requesting a slow_query_log record");
}
// parameters map
std::vector<std::pair<data_value, data_value>> parameters_values_vector;
parameters_values_vector.reserve(record.parameters.size());
std::for_each(record.parameters.begin(), record.parameters.end(), [&parameters_values_vector] (auto& val_pair) { parameters_values_vector.emplace_back(val_pair.first, val_pair.second); });
auto my_map_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
// set of tables involved in this query
std::vector<data_value> tables_names_vector;
tables_names_vector.reserve(record.tables.size());
std::for_each(record.tables.begin(), record.tables.end(), [&tables_names_vector] (auto& val) { tables_names_vector.emplace_back(val); });
auto my_set_type = set_type_impl::get_instance(utf8_type, true);
std::vector<cql3::raw_value> values({
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(this_shard_id()))),
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
query_str_it != record.parameters.end()
? cql3::raw_value::make_value(utf8_type->decompose(query_str_it->second))
: cql3::raw_value::make_null(),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(make_map_value(my_map_type, map_type_impl::native_type(std::move(parameters_values_vector))).serialize()),
cql3::raw_value::make_value(inet_addr_type->decompose(record.client.addr())),
cql3::raw_value::make_value(make_set_value(my_set_type, set_type_impl::native_type(std::move(tables_names_vector))).serialize()),
cql3::raw_value::make_value(utf8_type->decompose(record.username)),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.slow_query_record_ttl.count())))
});
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(gms::inet_address my_address, const one_session_records& session_records, const utils::UUID& start_time_id) {
auto started_at_duration = session_records.session_rec.started_at.time_since_epoch();
// timestamp in minutes when the query began
auto minutes_in_millis = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration_cast<std::chrono::minutes>(started_at_duration)).count();
// timestamp when the query began
auto millis_since_epoch = std::chrono::duration_cast<std::chrono::milliseconds>(started_at_duration).count();
std::vector<cql3::raw_value> values({
cql3::raw_value::make_value(timestamp_type->decompose(minutes_in_millis)),
cql3::raw_value::make_value(timestamp_type->decompose(millis_since_epoch)),
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timeuuid_type->decompose(start_time_id)),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(int32_t(this_shard_id()))),
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count())))
});
return cql3::query_options(cql3::default_cql_config,
db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(gms::inet_address my_address, one_session_records& session_records, const event_record& record) {
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(session_records.backend_state_ptr.get());
std::vector<cql3::raw_value> values({
cql3::raw_value::make_value(uuid_type->decompose(session_records.session_id)),
cql3::raw_value::make_value(timeuuid_type->decompose(utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(backend_state_ptr->last_nanos, record.event_time_point)))),
cql3::raw_value::make_value(utf8_type->decompose(record.message)),
cql3::raw_value::make_value(inet_addr_type->decompose(my_address.addr())),
cql3::raw_value::make_value(int32_type->decompose(elapsed_to_micros(record.elapsed))),
cql3::raw_value::make_value(utf8_type->decompose(fmt::format("{}/{}", _local_tracing.get_thread_name(), record.scheduling_group_name))),
cql3::raw_value::make_value(long_type->decompose(int64_t(session_records.parent_id.get_id()))),
cql3::raw_value::make_value(long_type->decompose(int64_t(session_records.my_span_id.get_id()))),
cql3::raw_value::make_value(int32_type->decompose((int32_t)(session_records.ttl.count())))
});
return values;
}
future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, service::migration_manager& mm, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
if (events_records.empty()) {
return now();
}
return _events.cache_table_info(qp, mm, _dummy_query_state).then([this, &qp, records, &events_records] {
tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id);
std::vector<cql3::statements::batch_statement::single_statement> modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false));
std::vector<cql3::raw_value_vector_with_unset> values;
values.reserve(events_records.size());
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(my_address(), *all_records, one_event_record)); });
return do_with(
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT), std::move(values)),
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
[this, &qp] (auto& batch_options, auto& batch) {
return batch.execute(qp, _dummy_query_state, batch_options, std::nullopt).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
}
);
});
}
future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_session_records> records) {
// grab events records available so far
return do_with(std::move(records->events_recs), [this, records] (std::deque<event_record>& events_records) {
records->events_recs.clear();
// Check if a session's record is ready before handling events' records.
//
// New event's records and a session's record may become ready while a
// mutation with the current events' records is being written. We don't want
// to allow the situation when a session's record is written before the last
// event record from the same session.
bool session_record_is_ready = records->session_rec.ready();
// From this point on - all new data will have to be handled in the next write event
records->data_consumed();
// We want to serialize the creation of events mutations in order to ensure
// that mutations for events that were created first are going to be
// created first too.
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(records->backend_state_ptr.get());
semaphore& write_sem = backend_state_ptr->write_sem;
return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] {
// This code is inside the _pending_writes gate and the qp pointer
// is cleared on ::stop() after the gate is closed.
SCYLLA_ASSERT(_qp_anchor != nullptr && _mm_anchor != nullptr);
cql3::query_processor& qp = *_qp_anchor;
service::migration_manager& mm = *_mm_anchor;
return apply_events_mutation(qp, mm, records, events_records).then([this, &qp, &mm, session_record_is_ready, records] {
if (session_record_is_ready) {
// if session is finished - store a session and a session time index entries
tlogger.trace("{}: going to store a session event", records->session_id);
return _sessions.insert(qp, mm, _dummy_query_state, make_session_mutation_data, my_address(), std::ref(*records)).then([this, &qp, &mm, records] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
return _sessions_time_idx.insert(qp, mm, _dummy_query_state, make_session_time_idx_mutation_data, my_address(), std::ref(*records));
}).then([this, &qp, &mm, records] {
if (!records->do_log_slow_query) {
return now();
}
// if slow query log is requested - store a slow query log and a slow query log time index entries
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
tlogger.trace("{}: going to store a slow query event", records->session_id);
return _slow_query_log.insert(qp, mm, _dummy_query_state, make_slow_query_mutation_data, my_address(), std::ref(*records), start_time_id).then([this, &qp, &mm, records, start_time_id] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
return _slow_query_log_time_idx.insert(qp, mm, _dummy_query_state, make_slow_query_time_idx_mutation_data, my_address(), std::ref(*records), start_time_id);
});
});
} else {
return now();
}
});
}).finally([records] {});
});
}
std::unique_ptr<backend_session_state_base> trace_keyspace_helper::allocate_session_state() const {
return std::make_unique<trace_keyspace_backend_sesssion_state>();
}
using registry_default = class_registrator<i_tracing_backend_helper, trace_keyspace_helper, tracing&>;
static registry_default registrator_default("trace_keyspace_helper");
}