Merge "Tracing: change a back pressure scheme" from Vlad

"This series changes the tracing back pressure scheme from limiting the amount traces in
a single session by a fixed number to have a per-shard budget consumed by all active tracing
sessions.

It was really easy to cause the traces to be dropped even if there weren't too many
active traces: e.g. if there was a single active session which creates more traces
than a per-session limit (30) the traces above 30-th were going to be dropped. Namely
traces were dropped when there were only 30 active traces, which is ridiculous.

This series introduces two main changes:
   - Changes the records budgeting from being per-session to be per-shard. This substantially
     increases the amount of active records after which new records are going to be dropped.
   - Introduces a flow when events' records are written BEFORE the corresponding tracing
     session is over (right now traces are written to I/O back end only when the session object
     is destroyed).

The later is meant to virtually eliminate the traces drops in normal situations at all.
Of course, if a back end is slow or if there are a lot of small sessions that do not complete we would still have
to drop new sessions/records in order to avoid uncontrolled growth of a memory foot print of Tracing.

If we see the later case happening a lot in the future we may add lowres timers to each session that would
commit the cached records for writing every X time. But let's not try to optimize something that we
are not completely sure has to be optimized... "
This commit is contained in:
Avi Kivity
2016-08-16 12:21:02 +03:00
6 changed files with 495 additions and 306 deletions

View File

@@ -52,6 +52,12 @@ const sstring trace_keyspace_helper::KEYSPACE_NAME("system_traces");
const sstring trace_keyspace_helper::SESSIONS("sessions");
const sstring trace_keyspace_helper::EVENTS("events");
struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base {
int64_t last_nanos = 0;
semaphore write_sem;
virtual ~trace_keyspace_backend_sesssion_state() {}
};
trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
: i_tracing_backend_helper(tr)
, _registrations{
@@ -174,63 +180,55 @@ future<> trace_keyspace_helper::start() {
}
}
void trace_keyspace_helper::write_session_record(const utils::UUID& session_id,
gms::inet_address client,
std::unordered_map<sstring, sstring> parameters,
sstring request,
long started_at,
trace_type command,
int elapsed,
gc_clock::duration ttl) {
try {
_mutation_makers[session_id].first = [request = std::move(request), client, parameters = std::move(parameters), started_at, command, elapsed, ttl, this] (const utils::UUID& session_id) {
return make_session_mutation(session_id, client, parameters, request, started_at, type_to_string(command), elapsed, ttl);
};
} catch (...) {
// OOM: ignore
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
with_gate(_pending_writes, [this, records = std::move(records)] {
auto num_records = records->size();
return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); });
}).handle_exception([this] (auto ep) {
try {
++_stats.tracing_errors;
std::rethrow_exception(ep);
} catch (exceptions::overloaded_exception&) {
logger.warn("Too many nodes are overloaded to save trace events");
} catch (bad_column_family& e) {
if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) {
logger.warn("Tracing is enabled but {}", e.what());
}
} catch (std::logic_error& e) {
logger.error(e.what());
} catch (...) {
// TODO: Handle some more exceptions maybe?
}
}).discard_result();
}
void trace_keyspace_helper::write_event_record(const utils::UUID& session_id,
sstring message,
int elapsed,
gc_clock::duration ttl,
wall_clock::time_point event_time_point) {
try {
_mutation_makers[session_id].second.makers.emplace_back([message = std::move(message), elapsed, ttl, event_time_point, this] (const utils::UUID& session_id) mutable {
return make_event_mutation(session_id, message, elapsed, _local_tracing.get_thread_name(), ttl, event_time_point);
});
} catch (...) {
// OOM: ignore
}
void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) {
logger.trace("Writing {} sessions", bulk.size());
std::for_each(bulk.begin(), bulk.end(), [this] (records_bulk::value_type& one_session_records_ptr) {
write_one_session_records(std::move(one_session_records_ptr));
});
}
mutation trace_keyspace_helper::make_session_mutation(
const utils::UUID& session_id,
gms::inet_address client,
const std::unordered_map<sstring, sstring>& parameters,
const sstring& request,
long started_at,
const sstring& command,
int elapsed,
gc_clock::duration ttl) {
mutation trace_keyspace_helper::make_session_mutation(const one_session_records& session_records) {
schema_ptr schema = get_schema_ptr_or_create(_sessions_id, SESSIONS, _sessions_create_cql,
[this] (const schema_ptr& s) { return cache_sessions_table_handles(s); });
auto key = partition_key::from_singular(*schema, session_id);
auto key = partition_key::from_singular(*schema, session_records.session_id);
auto ttl = session_records.ttl;
const session_record& record = session_records.session_rec;
auto timestamp = api::new_timestamp();
mutation m(key, schema);
auto& cells = m.partition().clustered_row(clustering_key::make_empty(*schema)).cells();
cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(client.addr()), ttl));
cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(record.client.addr()), ttl));
cells.apply(*_coordinator_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl));
cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(request), ttl));
cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(started_at), ttl));
cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(command), ttl));
cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)elapsed), ttl));
cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.request), ttl));
cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(record.started_at), ttl));
cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(type_to_string(record.command)), ttl));
cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)record.elapsed), ttl));
std::vector<std::pair<bytes, atomic_cell>> map_cell;
for (auto& param_pair : parameters) {
for (auto& param_pair : record.parameters) {
map_cell.emplace_back(utf8_type->decompose(param_pair.first), atomic_cell::make_live(timestamp, utf8_type->decompose(param_pair.second), ttl));
}
@@ -241,80 +239,81 @@ mutation trace_keyspace_helper::make_session_mutation(
return m;
}
mutation trace_keyspace_helper::make_event_mutation(const utils::UUID& session_id,
const sstring& message,
int elapsed,
const sstring& thread_name,
gc_clock::duration ttl,
wall_clock::time_point event_time_point) {
mutation trace_keyspace_helper::make_event_mutation(one_session_records& session_records, const event_record& record) {
schema_ptr schema = get_schema_ptr_or_create(_events_id, EVENTS, _events_create_cql,
[this] (const schema_ptr& s) { return cache_events_table_handles(s); });
auto key = partition_key::from_singular(*schema, session_id);
auto key = partition_key::from_singular(*schema, session_records.session_id);
auto ttl = session_records.ttl;
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(session_records.backend_state_ptr.get());
int64_t& last_event_nanos = backend_state_ptr->last_nanos;
auto timestamp = api::new_timestamp();
mutation m(key, schema);
auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(event_time_point)))).cells();
auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(last_event_nanos, record.event_time_point)))).cells();
cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(message), ttl));
cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.message), ttl));
cells.apply(*_source_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl));
cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(thread_name), ttl));
assert(elapsed >= 0);
cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(elapsed), ttl));
cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(_local_tracing.get_thread_name()), ttl));
cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(record.elapsed), ttl));
return m;
}
future<> trace_keyspace_helper::flush_one_session_mutations(utils::UUID session_id, std::pair<mutation_maker, events_mutation_makers>& mutation_makers) {
return make_ready_future<>().then([this, session_id, events_makers = std::move(mutation_makers.second.makers)] {
if (events_makers.size()) {
// Reset the "monotinic time point" state machine since it's
// relevant in a context of a single tracing session only. The
// events from different sessions will differ by a session UUID.
reset_monotonic_tp();
logger.trace("{}: events number is {}", session_id, events_makers.size());
mutation m((*events_makers.begin())(session_id));
std::for_each(std::next(events_makers.begin()), events_makers.end(), [&m, &session_id] (const mutation_maker& maker) mutable { m.apply(maker(session_id)); });
future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
if (events_records.empty()) {
return make_ready_future<>();
}
logger.trace("{}: storing {} events records", records->session_id, events_records.size());
mutation m(make_event_mutation(*records, *events_records.begin()));
return do_with(std::move(m), std::move(events_records), [this, records] (mutation& m, std::deque<event_record>& events_records) {
return do_for_each(std::next(events_records.begin()), events_records.end(), [this, &m, &events_records, all_records = records] (event_record& one_event_record) {
m.apply(make_event_mutation(*all_records, one_event_record));
return make_ready_future<>();
}).then([&m] {
return service::get_local_storage_proxy().mutate({std::move(m)}, db::consistency_level::ANY, nullptr);
} else {
return make_ready_future<>();
}
}).then([session_id = std::move(session_id), session_maker = std::move(mutation_makers.first)] {
if (session_maker) {
logger.trace("{}: storing a session event", session_id);
return service::get_local_storage_proxy().mutate({session_maker(session_id)}, db::consistency_level::ANY, nullptr);
} else {
return make_ready_future<>();
}
});
});
}
void trace_keyspace_helper::kick() {
logger.trace("flushing {} sessions", _mutation_makers.size());
parallel_for_each(_mutation_makers,[this](decltype(_mutation_makers)::value_type& uuid_mutation_makers) {
return with_gate(_pending_writes, [this, &uuid_mutation_makers] {
logger.trace("{}: flushing traces", uuid_mutation_makers.first);
return this->flush_one_session_mutations(uuid_mutation_makers.first, uuid_mutation_makers.second).finally([this] { _local_tracing.write_complete(); });
}).handle_exception([this] (auto ep) {
try {
++_stats.tracing_errors;
std::rethrow_exception(ep);
} catch (exceptions::overloaded_exception&) {
logger.warn("Too many nodes are overloaded to save trace events");
} catch (bad_column_family& e) {
if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) {
logger.warn("Tracing is enabled but {}", e.what());
}
} catch (...) {
// TODO: Handle some more exceptions maybe?
}
});
}).discard_result();
future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_session_records> records) {
// grab events records available so far
return do_with(std::move(records->events_recs), [this, records] (std::deque<event_record>& events_records) {
records->events_recs.clear();
// We can clear the hash table here because we cared to capture all relevant
// data from it in lambdas' capture-lists inside
// flush_one_session_mutations() before any asynchronous call.
_mutation_makers.clear();
// Check if a session's record is ready before handling events' records.
//
// New event's records and a session's record may become ready while a
// mutation with the current events' records is being written. We don't want
// to allow the situation when a session's record is written before the last
// event record from the same session.
bool session_record_is_ready = records->session_rec.ready();
// From this point on - all new data will have to be handled in the next write event
records->data_consumed();
// We want to serialize the creation of events mutations in order to ensure
// that mutations the events that were created first are going to be created
// first too.
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(records->backend_state_ptr.get());
semaphore& write_sem = backend_state_ptr->write_sem;
return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] {
return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] {
if (session_record_is_ready) {
logger.trace("{}: storing a session event", records->session_id);
return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr);
} else {
return make_ready_future<>();
}
});
}).finally([records] {});
});
}
std::unique_ptr<backend_session_state_base> trace_keyspace_helper::allocate_session_state() const {
return std::make_unique<trace_keyspace_backend_sesssion_state>();
}
using registry = class_registrator<i_tracing_backend_helper, trace_keyspace_helper, tracing&>;

View File

@@ -47,28 +47,15 @@
namespace tracing {
class trace_keyspace_helper : public i_tracing_backend_helper {
class trace_keyspace_helper final : public i_tracing_backend_helper {
public:
static const sstring KEYSPACE_NAME;
static const sstring SESSIONS;
static const sstring EVENTS;
private:
using mutation_maker = std::function<mutation (const utils::UUID& session_id)>;
static constexpr int bad_column_family_message_period = 10000;
struct events_mutation_makers {
std::vector<mutation_maker> makers;
public:
events_mutation_makers() {
makers.reserve(tracing::max_trace_events_per_session);
}
};
// a hash table of session ID to one session mutation and a vector of events mutations
std::unordered_map<utils::UUID, std::pair<mutation_maker, events_mutation_makers>> _mutation_makers;
seastar::gate _pending_writes;
sstring _sessions_create_cql;
@@ -94,8 +81,6 @@ private:
uint64_t bad_column_family_errors = 0;
} _stats;
int64_t _last_event_nanos = 0;
scollectd::registrations _registrations;
public:
@@ -110,59 +95,46 @@ public:
virtual future<> start() override;
virtual future<> stop() override {
kick();
return _pending_writes.close();
};
virtual void write_session_record(const utils::UUID& session_id,
gms::inet_address client,
std::unordered_map<sstring, sstring> parameters,
sstring request,
long started_at,
trace_type command,
int elapsed,
gc_clock::duration ttl) override;
virtual void write_event_record(const utils::UUID& session_id,
sstring message,
int elapsed,
gc_clock::duration ttl,
wall_clock::time_point event_time_point) override;
virtual void write_records_bulk(records_bulk& bulk) override;
virtual std::unique_ptr<backend_session_state_base> allocate_session_state() const override;
private:
/**
* Makes a monotonically increasing value in 100ns based on the given time stamp.
* Write records of a single tracing session
*
* @param records records to write
*/
void write_one_session_records(lw_shared_ptr<one_session_records> records);
/**
* Makes a monotonically increasing value in 100ns ("nanos") based on the given time
* stamp and the "nanos" value of the previous event.
*
* If the amount of 100s of ns evaluated from the @param tp is equal to the
* value returned in the previouse call to get_next_nanos() (without a reset_monotonic_tp()
* call in between), increment it by one.
* given @param last_event_nanos increment @param last_event_nanos by one
* and return a time point based its new value.
*
* @param last_event_nanos a reference to the last nanos to align the given time point to.
* @param tp the amount of time passed since the Epoch that will be used for the calculation.
*
* @return the monotonically increasing vlaue in 100s of ns based on the
* given time stamp.
* given time stamp and on the "nanos" value of the previous event.
*/
wall_clock::time_point make_monotonic_UUID_tp(wall_clock::time_point tp) {
wall_clock::time_point make_monotonic_UUID_tp(int64_t& last_event_nanos, wall_clock::time_point tp) {
using namespace std::chrono;
auto tp_nanos = duration_cast<nanoseconds>(tp.time_since_epoch()).count() / 100;
if (tp_nanos > _last_event_nanos) {
_last_event_nanos = tp_nanos;
if (tp_nanos > last_event_nanos) {
last_event_nanos = tp_nanos;
return tp;
} else {
return wall_clock::time_point(nanoseconds((++_last_event_nanos) * 100));
return wall_clock::time_point(nanoseconds((++last_event_nanos) * 100));
}
}
/**
* Reset the make_monotonic_tp() state machine.
*/
void reset_monotonic_tp() {
_last_event_nanos = 0;
}
virtual void kick() override;
/**
* Tries to create a table with a given name and using the provided CQL
* command.
@@ -179,14 +151,17 @@ private:
* Flush mutations of one particular tracing session. First "events"
* mutations and then, when they are complete, a "sessions" mutation.
*
* @param session_id ID of a tracing session
* @param mutation_makers a pair of a "sessions" mutation maker and an array
* of "events" mutations makers.
* @note This function guaranties that it'll handle exactly the same number
* of records @param records had when the function was invoked.
*
* @param records records describing the session's records
*
* @return A future that resolves when applying of above mutations is
* complete.
*/
future<> flush_one_session_mutations(utils::UUID session_id, std::pair<mutation_maker, events_mutation_makers>& mutation_makers);
future<> flush_one_session_mutations(lw_shared_ptr<one_session_records> records);
future<> apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
/**
* Get a schema_ptr by a table (UU)ID. If not found will try to get it by
@@ -230,33 +205,26 @@ private:
*/
bool cache_events_table_handles(const schema_ptr& s);
mutation make_session_mutation(const utils::UUID& session_id,
gms::inet_address client,
const std::unordered_map<sstring, sstring>& parameters,
const sstring& request,
long started_at,
const sstring& command,
int elapsed,
gc_clock::duration ttl);
/**
* Create a mutation for a new session record
*
* @param all_records_handle handle to access an object with all records of this session
*
* @return the relevant mutation
*/
mutation make_session_mutation(const one_session_records& all_records_handle);
/**
* Create a mutation for a new trace point record
*
* @param session_id tracing session ID
* @param message trace record message
* @param elapsed time elapsed since begin() till this trace point
* @param thread_name
* @param ttl
* @param event_time_stamp wall clock time point of this trace event
* @param session_records handle to access an object with all records of this session.
* It's needed here in order to update the last event's mutation
* timestamp value stored inside it.
* @param record data describing this trace event
*
* @return the relevant mutation
*/
mutation make_event_mutation(const utils::UUID& session_id,
const sstring& message,
int elapsed,
const sstring& thread_name,
gc_clock::duration ttl,
wall_clock::time_point event_time_stamp);
mutation make_event_mutation(one_session_records& session_records, const event_record& record);
};
struct bad_column_family : public std::exception {

View File

@@ -47,14 +47,14 @@
namespace tracing {
static logging::logger logger("trace_state");
logging::logger trace_state_logger("trace_state");
std::unordered_map<sstring, sstring> trace_state::get_params() {
void trace_state::build_parameters_map() {
if (!_params_ptr) {
return {};
return;
}
std::unordered_map<sstring, sstring> params_map;
auto& params_map = _records->session_rec.parameters;
params_values& vals = *_params_ptr;
if (vals.batchlog_endpoints) {
@@ -80,8 +80,6 @@ std::unordered_map<sstring, sstring> trace_state::get_params() {
if (vals.user_timestamp) {
params_map.emplace("user_timestamp", seastar::format("{:d}", *vals.user_timestamp));
}
return params_map;
}
trace_state::~trace_state() {
@@ -92,32 +90,29 @@ trace_state::~trace_state() {
// event and we don't want to cripple the primary session by
// "stealing" one trace() event from it.
//
// We do want to report it in statistics however. If for instance
// there are a lot of tracing sessions that only open itself and
// then do nothing - they will create a lot of session_record events
// and we do want to know about it.
++_pending_trace_events;
_local_backend.write_session_record(_session_id, _client, get_params(), std::move(_request), _started_at, _type, elapsed(), _ttl);
}
// We do want to account them however. If for instance there are a
// lot of tracing sessions that only open itself and then do nothing
// - they will create a lot of session_record events and we do want
// to handle this case properly.
_records->consume_from_budget();
_local_tracing_ptr->end_session();
if (_write_on_close) {
_local_tracing_ptr->write_pending_records();
}
// update some stats and get out...
auto& tracing_stats = _local_tracing_ptr->stats;
tracing_stats.trace_events_count += _pending_trace_events;
if (_pending_trace_events >= tracing::max_trace_events_per_session) {
logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", _session_id);
if (++tracing_stats.max_traces_threshold_hits % tracing::max_threshold_hits_warning_period == 1) {
logger.warn("Maximum traces per session limit is hit {} times", tracing_stats.max_traces_threshold_hits);
// build_parameters_map() may throw. We don't want to record the
// session's record in this case since its data may be incomplete.
// These events should be really rare however, therefore we don't
// want to optimize this flow (e.g. rollback the corresponding
// events' records that have already been sent to I/O).
try {
build_parameters_map();
_records->session_rec.elapsed = elapsed();
} catch (...) {
// Bump up an error counter, drop any pending events records and
// continue
++_local_tracing_ptr->stats.trace_errors;
_records->events_recs.clear();
}
}
_local_tracing_ptr->end_session(std::move(_records), _write_on_close);
}
}
}

View File

@@ -50,17 +50,17 @@
namespace tracing {
extern logging::logger trace_state_logger;
class trace_state final {
using clock_type = std::chrono::steady_clock;
private:
utils::UUID _session_id;
trace_type _type;
lw_shared_ptr<one_session_records> _records;
bool _write_on_close;
// Used for calculation of time passed since the beginning of a tracing
// session till each tracing event.
clock_type::time_point _start;
gc_clock::duration _ttl;
// TRUE for a primary trace_state object
bool _primary;
bool _tracing_began = false;
@@ -69,7 +69,6 @@ private:
sstring _request;
int _pending_trace_events = 0;
shared_ptr<tracing> _local_tracing_ptr;
i_tracing_backend_helper& _local_backend;
struct params_values {
std::experimental::optional<std::unordered_set<gms::inet_address>> batchlog_endpoints;
@@ -107,23 +106,24 @@ private:
public:
trace_state(trace_type type, bool write_on_close, const std::experimental::optional<utils::UUID>& session_id = std::experimental::nullopt)
: _session_id(session_id ? *session_id : utils::UUID_gen::get_time_UUID())
, _type(type)
, _write_on_close(write_on_close)
, _ttl(ttl_by_type(_type))
: _write_on_close(write_on_close)
, _primary(!session_id)
, _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this())
, _local_backend(_local_tracing_ptr->backend_helper())
{ }
{
_records = make_lw_shared<one_session_records>();
_records->session_id = session_id ? *session_id : utils::UUID_gen::get_time_UUID();
_records->ttl = ttl_by_type(type);
_records->session_rec.command = type;
}
~trace_state();
const utils::UUID& get_session_id() const {
return _session_id;
return _records->session_id;
}
trace_type get_type() const {
return _type;
return _records->session_rec.command;
}
bool get_write_on_close() const {
@@ -163,9 +163,9 @@ private:
*/
void begin(sstring request, gms::inet_address client) {
begin();
_started_at = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
_request = std::move(request);
_client = std::move(client);
_records->session_rec.client = client;
_records->session_rec.request = std::move(request);
_records->session_rec.started_at = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
template <typename Func>
@@ -249,7 +249,12 @@ private:
_params_ptr->user_timestamp.emplace(val);
}
std::unordered_map<sstring, sstring> get_params();
/**
* Fill the map in a session's record with the values set so far.
*
* @param params_map the map to fill
*/
void build_parameters_map();
/**
* Add a single trace entry - a special case for a simple string.
@@ -298,12 +303,39 @@ inline void trace_state::trace(sstring message) {
throw std::logic_error("trying to use a trace() before begin() for \"" + message + "\" tracepoint");
}
if (_pending_trace_events >= tracing::max_trace_events_per_session) {
// We don't want the total amount of pending, active and flushing records to
// bypass two times the maximum number of pending records.
//
// If either records are being created too fast or a backend doesn't
// keep up we want to start dropping records.
// In any case, this should be rare, therefore we don't try to optimize this
// flow.
if (!_local_tracing_ptr->have_records_budget()) {
tracing_logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", get_session_id());
if ((++_local_tracing_ptr->stats.dropped_records) % tracing::log_warning_period == 1) {
tracing_logger.warn("Maximum records limit is hit {} times", _local_tracing_ptr->stats.dropped_records);
}
return;
}
_local_backend.write_event_record(_session_id, std::move(message), elapsed(), _ttl, i_tracing_backend_helper::wall_clock::now());
++_pending_trace_events;
try {
_records->events_recs.emplace_back(std::move(message), elapsed(), i_tracing_backend_helper::wall_clock::now());
_records->consume_from_budget();
// If we have aggregated enough records - schedule them for write already.
//
// We prefer the traces to be written after the session is over. However
// if there is a session that creates a lot of traces - we want to write
// them before we start to drop new records.
if (_records->events_recs.size() >= tracing::exp_trace_events_per_session) {
_local_tracing_ptr->schedule_for_write(_records);
_local_tracing_ptr->write_maybe();
}
} catch (...) {
// Bump up an error counter and ignore
++_local_tracing_ptr->stats.trace_errors;
}
}
template <typename... A>

View File

@@ -44,7 +44,7 @@
namespace tracing {
static logging::logger logger("tracing");
logging::logger tracing_logger("tracing");
const gc_clock::duration tracing::tracing::write_period = std::chrono::seconds(2);
@@ -60,16 +60,16 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name)
, _registrations{
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "total_operations", "max_sessions_threshold_hits")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_sessions_threshold_hits)),
, "total_operations", "dropped_sessions")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_sessions)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "total_operations", "max_traces_threshold_hits")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_traces_threshold_hits)),
, "total_operations", "dropped_records")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_records)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "total_operations", "trace_events_count")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_events_count)),
, "total_operations", "trace_records_count")
, scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_records_count)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "total_operations", "trace_errors")
@@ -80,17 +80,21 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name)
, scollectd::make_typed(scollectd::data_type::GAUGE, _active_sessions)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "queue_length", "pending_for_write_sessions")
, scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_sessions)),
, "queue_length", "cached_records")
, scollectd::make_typed(scollectd::data_type::GAUGE, _cached_records)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "queue_length", "flushing_sessions")
, scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_sessions))}
, "queue_length", "pending_for_write_records")
, scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_records_count)),
scollectd::add_polled_metric(scollectd::type_instance_id("tracing"
, scollectd::per_cpu_plugin_instance
, "queue_length", "flushing_records")
, scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_records))}
, _gen(std::random_device()()) {
try {
_tracing_backend_helper_ptr = create_object<i_tracing_backend_helper>(tracing_backend_helper_class_name, *this);
} catch (no_such_class& e) {
logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name);
tracing_logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name);
throw;
} catch (...) {
throw;
@@ -108,16 +112,17 @@ future<> tracing::create_tracing(const sstring& tracing_backend_class_name) {
trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, const std::experimental::optional<utils::UUID>& session_id) {
trace_state_ptr tstate;
try {
if (_active_sessions + _pending_for_write_sessions + _flushing_sessions > 2 * max_pending_for_write_sessions) {
// Don't create a session if its records are likely to be dropped
if (!have_records_budget(exp_trace_events_per_session) || _active_sessions >= max_pending_sessions + write_event_sessions_threshold) {
if (session_id) {
logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id);
tracing_logger.trace("{}: Too many outstanding tracing records or sessions. Dropping a secondary session", session_id);
} else {
logger.trace("Maximum sessions count is reached. Dropping a primary session");
tracing_logger.trace("Too many outstanding tracing records or sessions. Dropping a primary session");
}
if (++stats.max_sessions_threshold_hits % tracing::max_threshold_hits_warning_period == 1) {
logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_flush_sessions {}, flushing_sessions {}",
stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_sessions, _flushing_sessions);
if (++stats.dropped_sessions % tracing::log_warning_period == 1) {
tracing_logger.warn("Dropped {} sessions: open_sessions {}, cached_records {} pending_for_write_records {}, flushing_records {}",
stats.dropped_sessions, _active_sessions, _cached_records, _pending_for_write_records_count, _flushing_records);
}
return trace_state_ptr();
@@ -142,18 +147,22 @@ void tracing::write_timer_callback() {
return;
}
logger.trace("Timer kicks in: {}", _pending_for_write_sessions ? "writing" : "not writing");
tracing_logger.trace("Timer kicks in: {}", _pending_for_write_records_bulk.size() ? "writing" : "not writing");
write_pending_records();
_write_timer.arm(write_period);
}
future<> tracing::shutdown() {
logger.info("Asked to shut down");
tracing_logger.info("Asked to shut down");
if (_down) {
throw std::logic_error("tracing: shutdown() called for the service that is already down");
}
write_pending_records();
_down = true;
_write_timer.cancel();
return _tracing_backend_helper_ptr->stop().then([] {
logger.info("Tracing is down");
tracing_logger.info("Tracing is down");
});
}
@@ -173,7 +182,11 @@ void tracing::set_trace_probability(double p) {
_trace_probability = p;
_normalized_trace_probability = std::llround(_trace_probability * (_gen.max() + 1));
logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability);
}
tracing_logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability);
}
one_session_records::one_session_records()
: backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state())
, budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {}
}

View File

@@ -51,6 +51,8 @@
namespace tracing {
extern logging::logger tracing_logger;
class trace_state;
class tracing;
@@ -99,6 +101,13 @@ public:
{ }
};
struct one_session_records;
using records_bulk = std::deque<lw_shared_ptr<one_session_records>>;
struct backend_session_state_base {
virtual ~backend_session_state_base() {};
};
struct i_tracing_backend_helper {
using wall_clock = std::chrono::system_clock;
@@ -112,74 +121,170 @@ public:
virtual future<> stop() = 0;
/**
* Write a new tracing session record
* Write a bulk of tracing records.
*
* @param session_id tracing session ID
* @param client client IP
* @param parameters optional parameters
* @param request request we are tracing
* @param started_at amount of milliseconds passed since Epoch before this
* session is started (on a Coordinator Node)
* @param command a type of this trace
* @param elapsed number of microseconds this tracing session took
* @param ttl TTL of a session record
* This function has to clear a scheduled state of each one_session_records object
* in the @param bulk after it has been actually passed to the backend for writing.
*
* @param bulk a bulk of records
*/
virtual void write_session_record(const utils::UUID& session_id,
gms::inet_address client,
std::unordered_map<sstring, sstring> parameters,
sstring request,
long started_at,
trace_type command,
int elapsed,
gc_clock::duration ttl) = 0;
virtual void write_records_bulk(records_bulk& bulk) = 0;
/**
* Write a new tracing event record
* @param session_id tracing session ID
* @param message tracing message
* @param elapsed number of microseconds passed since a beginning of a
* corresponding tracing session till this event
* @param ttl TTL of the event record
* @param event_time_point time point when a record was taken
*/
virtual void write_event_record(const utils::UUID& session_id,
sstring message,
int elapsed,
gc_clock::duration ttl,
wall_clock::time_point event_time_point) = 0;
virtual std::unique_ptr<backend_session_state_base> allocate_session_state() const = 0;
private:
/**
* Commit all pending tracing records to the underlying storage.
* The implementation has to call tracing::tracing::write_complete(nr) for
* each "nr" completed records once they are written to the backend.
*/
virtual void kick() = 0;
friend class tracing;
};
struct event_record {
sstring message;
int elapsed;
i_tracing_backend_helper::wall_clock::time_point event_time_point;
event_record(sstring message_, int elapsed_, i_tracing_backend_helper::wall_clock::time_point event_time_point_)
: message(std::move(message_))
, elapsed(elapsed_)
, event_time_point(event_time_point_) {}
};
struct session_record {
gms::inet_address client;
std::unordered_map<sstring, sstring> parameters;
sstring request;
long started_at = 0;
trace_type command = trace_type::NONE;
int elapsed = -1;
bool ready() const {
return elapsed >= 0;
}
};
class one_session_records {
public:
utils::UUID session_id;
session_record session_rec;
gc_clock::duration ttl;
std::deque<event_record> events_recs;
std::unique_ptr<backend_session_state_base> backend_state_ptr;
// A pointer to the records counter of the corresponding state new records
// of this tracing session should consume from (e.g. "cached" or "pending
// for write").
uint64_t* budget_ptr;
one_session_records();
/**
* Consume a single record from the per-shard budget.
*/
void consume_from_budget() {
++(*budget_ptr);
}
/**
* Should be called when a record is scheduled for write.
* From that point till data_consumed() call all new records will be written
* in the next write event.
*/
inline void set_pending_for_write();
/**
* Should be called after all data pending to be written in this record has
* been processed.
* From that point on new records are cached internally and have to be
* explicitly committed for write in order to be written during the write event.
*/
inline void data_consumed();
bool is_pending_for_write() const {
return _is_pending_for_write;
}
uint64_t size() const {
return events_recs.size() + session_rec.ready();
}
private:
bool _is_pending_for_write = false;
};
using trace_state_ptr = lw_shared_ptr<trace_state>;
class tracing : public seastar::async_sharded_service<tracing> {
public:
static const gc_clock::duration write_period;
static constexpr int max_pending_for_write_sessions = 1000;
static constexpr int max_trace_events_per_session = 30;
// Number of max threshold XXX hits when an info message is printed
static constexpr int max_threshold_hits_warning_period = 10000;
// maximum number of sessions pending for write per shard
static constexpr int max_pending_sessions = 1000;
// expectation of an average number of trace records per session
static constexpr int exp_trace_events_per_session = 10;
// maximum allowed pending records per-shard
static constexpr int max_pending_trace_records = max_pending_sessions * exp_trace_events_per_session;
// number of pending sessions that would trigger a write event
static constexpr int write_event_sessions_threshold = 100;
// number of pending records that would trigger a write event
static constexpr int write_event_records_threshold = write_event_sessions_threshold * exp_trace_events_per_session;
// Number of events when an info message is printed
static constexpr int log_warning_period = 10000;
struct stats {
uint64_t max_sessions_threshold_hits = 0;
uint64_t max_traces_threshold_hits = 0;
uint64_t trace_events_count = 0;
uint64_t dropped_sessions = 0;
uint64_t dropped_records = 0;
uint64_t trace_records_count = 0;
uint64_t trace_errors = 0;
} stats;
private:
// A number of currently active tracing sessions
uint64_t _active_sessions = 0;
uint64_t _pending_for_write_sessions = 0;
uint64_t _flushing_sessions = 0;
// Below are 3 counters that describe the total amount of tracing records on
// this shard. Each counter describes a state in which a record may be.
//
// Each record may only be in a specific state at every point of time and
// thereby it must be accounted only in one and only one of the three
// counters below at any given time.
//
// The sum of all three counters should not be greater than
// (max_pending_trace_records + write_event_records_threshold) at any time
// (actually it can get as high as a value above plus (max_pending_sessions)
// if all sessions are primary but we won't take this into an account for
// simplicity).
//
// The same is about the number of outstanding sessions: it may not be
// greater than (max_pending_sessions + write_event_sessions_threshold) at
// any time.
//
// If total number of tracing records is greater or equal to the limit
// above, the new trace point is going to be dropped.
//
// If current number or records plus the expected number of trace records
// per session (exp_trace_events_per_session) is greater than the limit
// above new sessions will be dropped. A new session will also be dropped if
// there are too many active sessions.
//
// When the record or a session is dropped the appropriate statistics
// counters are updated and there is a rate-limited warning message printed
// to the log.
//
// Every time a number of records pending for write is greater or equal to
// (write_event_records_threshold) or a number of sessions pending for
// write is greater or equal to (write_event_sessions_threshold) a write
// event is issued.
//
// Every 2 seconds a timer would write all pending for write records
// available so far.
// Total number of records cached in the active sessions that are not going
// to be written in the next write event
uint64_t _cached_records = 0;
// Total number of records that are currently being written to I/O
uint64_t _flushing_records = 0;
// Total number of records in the _pending_for_write_records_bulk. All of
// them are going to be written to the I/O during the next write event.
uint64_t _pending_for_write_records_count = 0;
records_bulk _pending_for_write_records_bulk;
timer<lowres_clock> _write_timer;
bool _down = false;
std::unique_ptr<i_tracing_backend_helper> _tracing_backend_helper_ptr;
@@ -228,21 +333,20 @@ public:
future<> shutdown();
void write_pending_records() {
// if service is down - do nothing
if (_down) {
return;
if (_pending_for_write_records_bulk.size()) {
_flushing_records += _pending_for_write_records_count;
stats.trace_records_count += _pending_for_write_records_count;
_pending_for_write_records_count = 0;
_tracing_backend_helper_ptr->write_records_bulk(_pending_for_write_records_bulk);
_pending_for_write_records_bulk.clear();
}
_flushing_sessions += _pending_for_write_sessions;
_pending_for_write_sessions = 0;
_tracing_backend_helper_ptr->kick();
}
void write_complete(uint64_t nr = 1) {
if (nr > _flushing_sessions) {
throw std::logic_error("completing more sessions than there are pending");
if (nr > _flushing_records) {
throw std::logic_error(seastar::format("completing more records ({:d}) than there are pending ({:d})", nr, _flushing_records));
}
_flushing_sessions -= nr;
_flushing_records -= nr;
}
/**
@@ -256,14 +360,35 @@ public:
*/
trace_state_ptr create_session(trace_type type, bool write_on_close, const std::experimental::optional<utils::UUID>& session_id = std::experimental::nullopt);
void end_session() {
--_active_sessions;
++_pending_for_write_sessions;
if (_pending_for_write_sessions >= max_pending_for_write_sessions) {
void write_maybe() {
if (_pending_for_write_records_count >= write_event_records_threshold || _pending_for_write_records_bulk.size() >= write_event_sessions_threshold) {
write_pending_records();
}
}
void end_session(lw_shared_ptr<one_session_records> records, bool write_now) {
--_active_sessions;
// if service is down - drop the records and return
if (_down) {
return;
}
try {
schedule_for_write(std::move(records));
} catch (...) {
// OOM: bump up the error counter and ignore
++stats.trace_errors;
return;
}
if (write_now) {
write_pending_records();
} else {
write_maybe();
}
}
/**
* Sets a probability for tracing a CQL request.
*
@@ -281,7 +406,64 @@ public:
return _normalized_trace_probability != 0 && _gen() < _normalized_trace_probability;
}
std::unique_ptr<backend_session_state_base> allocate_backend_session_state() const {
return _tracing_backend_helper_ptr->allocate_session_state();
}
/**
* Checks if there is enough budget for the @param nr new records
* @param nr number of new records
*
* @return TRUE if there is enough budget, FLASE otherwise
*/
bool have_records_budget(uint64_t nr = 1) {
// We don't want the total amount of pending, active and flushing records to
// bypass the maximum number of pending records plus the number of
// records that are possibly being written write now.
//
// If either records are being created too fast or a backend doesn't
// keep up we want to start dropping records.
// In any case, this should be rare.
if (_pending_for_write_records_count + _cached_records + _flushing_records + nr > max_pending_trace_records + write_event_records_threshold) {
return false;
}
return true;
}
uint64_t* get_pending_records_ptr() {
return &_pending_for_write_records_count;
}
uint64_t* get_cached_records_ptr() {
return &_cached_records;
}
void schedule_for_write(lw_shared_ptr<one_session_records> records) {
if (records->is_pending_for_write()) {
return;
}
_pending_for_write_records_bulk.emplace_back(records);
records->set_pending_for_write();
// move the current records from a "cached" to "pending for write" state
auto current_records_num = records->size();
_cached_records -= current_records_num;
_pending_for_write_records_count += current_records_num;
}
private:
void write_timer_callback();
};
void one_session_records::set_pending_for_write() {
_is_pending_for_write = true;
budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr();
}
void one_session_records::data_consumed() {
_is_pending_for_write = false;
budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr();
}
}