Files
scylladb/tracing/trace_state.cc
Avi Kivity 841481c202 Merge "move storage proxy and adjacent services to identify hosts by ids" from Gleb
"
This rather large patch series moves storage proxy and some adjacent
services (like migration manager) to use host ids to identify nodes rather
than ips. Messaging service gains a capability to address nodes by host
ids (which allows dropping translations from topology coordinator code
that worked on host ids already) and also makes sure that a node with
incorrect host id will reject a message (can happen during address
changes).

The series gets rid of the raft address map completely and replaces it with
the gossiper address map which is managed by the gossiper since translation
is now done in the layer below raft.

Fixes: scylladb/scylladb#6403

perf-simple-query -- smp 1 -m 1G output

Before:

enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
64336.82 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   41291 insns/op,   24485 cycles/op,        0 errors)
62669.58 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   41277 insns/op,   24695 cycles/op,        0 errors)
69172.12 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   41326 insns/op,   24463 cycles/op,        0 errors)
56706.60 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   41143 insns/op,   24513 cycles/op,        0 errors)
56416.65 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   41186 insns/op,   24851 cycles/op,        0 errors)

         throughput: mean=61860.35 standard-deviation=5395.48 median=62669.58 median-absolute-deviation=5153.75 maximum=69172.12 minimum=56416.65
instructions_per_op: mean=41244.62 standard-deviation=76.90 median=41276.94 median-absolute-deviation=58.55 maximum=41326.19 minimum=41142.80
  cpu_cycles_per_op: mean=24601.35 standard-deviation=167.39 median=24512.64 median-absolute-deviation=116.65 maximum=24851.45 minimum=24462.70

After:

enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
65237.35 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   40733 insns/op,   23145 cycles/op,        0 errors)
59283.09 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   40624 insns/op,   23948 cycles/op,        0 errors)
70851.03 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   40625 insns/op,   23027 cycles/op,        0 errors)
70549.61 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   40650 insns/op,   23266 cycles/op,        0 errors)
68634.96 tps ( 63.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   40622 insns/op,   22935 cycles/op,        0 errors)

         throughput: mean=66911.21 standard-deviation=4814.60 median=68634.96 median-absolute-deviation=3638.40 maximum=70851.03 minimum=59283.09
instructions_per_op: mean=40650.89 standard-deviation=47.55 median=40624.60 median-absolute-deviation=27.11 maximum=40733.37 minimum=40622.33
  cpu_cycles_per_op: mean=23264.16 standard-deviation=402.12 median=23145.29 median-absolute-deviation=237.63 maximum=23947.96 minimum=22934.59

CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/13531/
SCT (longevity-100gb-4h with nemesis_selector: ['topology_changes']): https://jenkins.scylladb.com/view/staging/job/scylla-staging/job/gleb/job/move-to-host-id/3/

Tested mixed cluster manually.
"

* 'gleb/move-to-host-id-v2' of github.com:scylladb/scylla-dev: (55 commits)
  group0: drop unused field from replace_info struct
  test: rename raft_address_map_test to address_map_test and move if from raft tests
  raft_address_map: remove raft address map
  topology coordinator: do not modify expire state for left/new nodes any more in raft address map
  topology coordinator: drop expiring entries in gossiper address map on error injections since raft one is no longer used
  group0: drop raft address map dependency from raft_rpc
  group0: move raft_ticker_type definition from raft_address_map.hh
  storage_service: do not update raft address map on gossiper events
  group0: drop raft address map dependency from raft_server_with_timeouts
  group0: move group0 upgrade code to host ids
  repair: drop raft address map dependency
  group0: remove unused raft address map getter from raft_group0
  group0: drop raft address map from group0_state_machine dependency since it is not used there any more
  group0: remove dependency on raft address map from group0_state_id_handler
  gossiper: add get_application_state_ptr that searches by host_id
  gossiper: change get_live_token_owners to return host ids
  view: move view building to host id
  hints: use host id to send hints
  storage_proxy: remove id_vector_to_addr since it is no longer used
  db: consistency_level: change is_sufficient_live_nodes to work on host ids
  ...
2024-12-03 18:18:48 +02:00

284 lines
10 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include <chrono>
#include "cql3/statements/prepared_statement.hh"
#include "tracing/trace_state.hh"
#include "timestamp.hh"
#include "cql3/values.hh"
#include "cql3/query_options.hh"
namespace tracing {
logging::logger trace_state_logger("trace_state");
struct trace_state::params_values {
struct prepared_statement_info {
prepared_checked_weak_ptr statement;
std::optional<std::vector<std::string_view>> query_option_names;
cql3::raw_value_view_vector_with_unset query_option_values;
explicit prepared_statement_info(prepared_checked_weak_ptr statement) : statement(std::move(statement)) {}
};
std::optional<host_id_vector_replica_set> batchlog_endpoints;
std::optional<api::timestamp_type> user_timestamp;
std::vector<sstring> queries;
std::optional<db::consistency_level> cl;
std::optional<db::consistency_level> serial_cl;
std::optional<int32_t> page_size;
std::vector<prepared_statement_info> prepared_statements;
};
trace_state::params_values* trace_state::params_ptr::get_ptr_safe() {
if (!_vals) {
_vals = std::unique_ptr<params_values, params_values_deleter>(new params_values, params_values_deleter());
}
return _vals.get();
}
void trace_state::params_values_deleter::operator()(params_values* pv) {
delete pv;
}
void trace_state::set_batchlog_endpoints(const host_id_vector_replica_set& val) {
_params_ptr->batchlog_endpoints.emplace(val);
}
void trace_state::set_consistency_level(db::consistency_level val) {
_params_ptr->cl.emplace(val);
}
void trace_state::set_optional_serial_consistency_level(const std::optional<db::consistency_level>& val) {
if (val) {
_params_ptr->serial_cl.emplace(*val);
}
}
void trace_state::set_page_size(int32_t val) {
if (val > 0) {
_params_ptr->page_size.emplace(val);
}
}
void trace_state::set_request_size(size_t s) noexcept {
_records->session_rec.request_size = s;
}
void trace_state::set_response_size(size_t s) noexcept {
_records->session_rec.response_size = s;
}
void trace_state::add_query(std::string_view val) {
_params_ptr->queries.emplace_back(std::move(val));
}
void trace_state::add_session_param(std::string_view key, std::string_view val) {
_records->session_rec.parameters.emplace(std::move(key), std::move(val));
}
void trace_state::set_user_timestamp(api::timestamp_type val) {
_params_ptr->user_timestamp.emplace(val);
}
void trace_state::add_prepared_statement(prepared_checked_weak_ptr& prepared) {
_params_ptr->prepared_statements.emplace_back(prepared->checked_weak_from_this());
}
void trace_state::add_prepared_query_options(const cql3::query_options& prepared_options_ptr) {
if (_params_ptr->prepared_statements.empty()) {
throw std::logic_error("Tracing a prepared statement but no prepared statement is stored");
}
for (size_t i = 0; i < _params_ptr->prepared_statements.size(); ++i) {
const cql3::query_options& opts = prepared_options_ptr.for_statement(i);
_params_ptr->prepared_statements[i].query_option_names = opts.get_names();
_params_ptr->prepared_statements[i].query_option_values = opts.get_values();
}
}
void trace_state::build_parameters_map() {
if (!_params_ptr) {
return;
}
auto& params_map = _records->session_rec.parameters;
params_values& vals = *_params_ptr;
if (vals.batchlog_endpoints) {
auto batch_endpoints = fmt::format("{}", fmt::join(*vals.batchlog_endpoints | std::views::transform([](locator::host_id ep) {return seastar::format("/{}", ep);}), ","));
params_map.emplace("batch_endpoints", std::move(batch_endpoints));
}
if (vals.cl) {
params_map.emplace("consistency_level", seastar::format("{}", *vals.cl));
}
if (vals.serial_cl) {
params_map.emplace("serial_consistency_level", seastar::format("{}", *vals.serial_cl));
}
if (vals.page_size) {
params_map.emplace("page_size", seastar::format("{:d}", *vals.page_size));
}
auto& queries = vals.queries;
if (!queries.empty()) {
if (queries.size() == 1) {
params_map.emplace("query", queries[0]);
} else {
// BATCH
for (size_t i = 0; i < queries.size(); ++i) {
params_map.emplace(format("query[{:d}]", i), queries[i]);
}
}
}
if (vals.user_timestamp) {
params_map.emplace("user_timestamp", seastar::format("{:d}", *vals.user_timestamp));
}
auto& prepared_statements = vals.prepared_statements;
if (!prepared_statements.empty()) {
// Parameter's key in the map will be "param[X]" for a single query CQL command and "param[Y][X] for a multiple
// queries CQL command, where X is an index of the parameter in a corresponding query and Y is an index of the
// corresponding query in the BATCH.
if (prepared_statements.size() == 1) {
auto& stmt_info = prepared_statements[0];
build_parameters_map_for_one_prepared(stmt_info.statement, stmt_info.query_option_names, stmt_info.query_option_values, "param");
} else {
// BATCH
for (size_t i = 0; i < prepared_statements.size(); ++i) {
auto& stmt_info = prepared_statements[i];
build_parameters_map_for_one_prepared(stmt_info.statement, stmt_info.query_option_names, stmt_info.query_option_values, format("param[{:d}]", i));
}
}
}
}
void trace_state::build_parameters_map_for_one_prepared(const prepared_checked_weak_ptr& prepared_ptr,
std::optional<std::vector<std::string_view>>& names_opt,
cql3::raw_value_view_vector_with_unset& values, const sstring& param_name_prefix) {
auto& params_map = _records->session_rec.parameters;
size_t i = 0;
// Trace parameters native values representations only if the current prepared statement has not been evicted from the cache by the time we got here.
// Such an eviction is a very unlikely event, however if it happens, since we are unable to recover their types, trace raw representations of the values.
if (names_opt) {
if (names_opt->size() != values.values.size()) {
throw std::logic_error(format("Number of \"names\" ({}) doesn't match the number of positional variables ({})", names_opt->size(), values.values.size()).c_str());
}
auto& names = names_opt.value();
for (; i < values.values.size(); ++i) {
params_map.emplace(seastar::format("{}[{:d}]({})", param_name_prefix, i, names[i]), raw_value_to_sstring(values.values[i], values.unset[i], prepared_ptr ? prepared_ptr->bound_names[i]->type : nullptr));
}
} else {
for (; i < values.values.size(); ++i) {
params_map.emplace(format("{}[{:d}]", param_name_prefix, i), raw_value_to_sstring(values.values[i], values.unset[i], prepared_ptr ? prepared_ptr->bound_names[i]->type : nullptr));
}
}
}
trace_state::~trace_state() {
if (!is_primary() && is_in_state(state::background)) {
trace_state_logger.error("Secondary session is in a background state! session_id: {}", session_id());
}
stop_foreground_and_write();
_local_tracing_ptr->end_session();
trace_state_logger.trace("{}: destructing", session_id());
}
void trace_state::stop_foreground_and_write() noexcept {
// Do nothing if state hasn't been initiated
if (is_in_state(state::inactive)) {
return;
}
if (is_in_state(state::foreground)) {
auto e = elapsed();
_records->do_log_slow_query = should_log_slow_query(e);
if (is_primary()) {
// We don't account the session_record event when checking a limit
// of maximum events per session because there may be only one such
// event and we don't want to cripple the primary session by
// "stealing" one trace() event from it.
//
// We do want to account them however. If for instance there are a
// lot of tracing sessions that only open itself and then do nothing
// - they will create a lot of session_record events and we do want
// to handle this case properly.
_records->consume_from_budget();
_records->session_rec.elapsed = e;
// build_parameters_map() may throw. We don't want to record the
// session's record in this case since its data may be incomplete.
// These events should be really rare however, therefore we don't
// want to optimize this flow (e.g. rollback the corresponding
// events' records that have already been sent to I/O).
if (should_write_records()) {
try {
build_parameters_map();
} catch (...) {
// Bump up an error counter, drop any pending records and
// continue
++_local_tracing_ptr->stats.trace_errors;
_records->drop_records();
}
}
}
set_state(state::background);
}
trace_state_logger.trace("{}: Current records count is {}", session_id(), _records->size());
if (should_write_records()) {
_local_tracing_ptr->write_session_records(_records, write_on_close());
} else {
_records->drop_records();
}
}
sstring trace_state::raw_value_to_sstring(const cql3::raw_value_view& v, bool is_unset, const data_type& t) {
static constexpr int max_val_bytes = 64;
if (is_unset) {
return "unset value";
}
if (v.is_null()) {
return "null";
} else {
return v.with_linearized([&] (bytes_view val) {
sstring str_rep;
if (t) {
str_rep = t->to_string(to_bytes(val));
} else {
trace_state_logger.trace("{}: data types are unavailable - tracing a raw value", session_id());
str_rep = to_hex(val);
}
if (str_rep.size() > max_val_bytes) {
return format("{}...", str_rep.substr(0, max_val_bytes));
} else {
return str_rep;
}
});
}
}
}