From e1b2926a8d8e63f09b8d7844f63a81e695fa6462 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 26 Jul 2016 11:31:59 +0300 Subject: [PATCH 1/7] tracing: add a missing try-catch in params building Signed-off-by: Vlad Zolotarov --- tracing/trace_state.cc | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 55cc30cd97..25e3ebabe9 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -97,7 +97,18 @@ trace_state::~trace_state() { // then do nothing - they will create a lot of session_record events // and we do want to know about it. ++_pending_trace_events; - _local_backend.write_session_record(_session_id, _client, get_params(), std::move(_request), _started_at, _type, elapsed(), _ttl); + + // get_params() may throw. We don't want to record the session + // record in this case since its data may be incomplete. These + // events should be really rare however, therefore we don't want to + // optimize this flow (e.g. rollback the corresponding events' + // records). + try { + _local_backend.write_session_record(_session_id, _client, get_params(), std::move(_request), _started_at, _type, elapsed(), _ttl); + } catch (...) { + // Bump up an error counter and ignore + ++_local_tracing_ptr->stats.trace_errors; + } } _local_tracing_ptr->end_session(); From 960b423ce085b466b036dc2fc5f88cc6f6c36b8f Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 2 Aug 2016 17:16:57 +0300 Subject: [PATCH 2/7] tracing/tracing.cc: rename a logger object s/logger/tracing_logger/ Signed-off-by: Vlad Zolotarov --- tracing/tracing.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tracing/tracing.cc b/tracing/tracing.cc index 54eeb56bba..b34a62b258 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -44,7 +44,7 @@ namespace tracing { -static logging::logger logger("tracing"); +static logging::logger tracing_logger("tracing"); const gc_clock::duration tracing::tracing::write_period = std::chrono::seconds(2); @@ -90,7 +90,7 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) try { _tracing_backend_helper_ptr = create_object(tracing_backend_helper_class_name, *this); } catch (no_such_class& e) { - logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name); + tracing_logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name); throw; } catch (...) { throw; @@ -110,13 +110,13 @@ trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, co try { if (_active_sessions + _pending_for_write_sessions + _flushing_sessions > 2 * max_pending_for_write_sessions) { if (session_id) { - logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id); + tracing_logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id); } else { - logger.trace("Maximum sessions count is reached. Dropping a primary session"); + tracing_logger.trace("Maximum sessions count is reached. Dropping a primary session"); } if (++stats.max_sessions_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_flush_sessions {}, flushing_sessions {}", + tracing_logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_flush_sessions {}, flushing_sessions {}", stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_sessions, _flushing_sessions); } @@ -142,18 +142,18 @@ void tracing::write_timer_callback() { return; } - logger.trace("Timer kicks in: {}", _pending_for_write_sessions ? "writing" : "not writing"); + tracing_logger.trace("Timer kicks in: {}", _pending_for_write_sessions ? "writing" : "not writing"); write_pending_records(); _write_timer.arm(write_period); } future<> tracing::shutdown() { - logger.info("Asked to shut down"); + tracing_logger.info("Asked to shut down"); write_pending_records(); _down = true; _write_timer.cancel(); return _tracing_backend_helper_ptr->stop().then([] { - logger.info("Tracing is down"); + tracing_logger.info("Tracing is down"); }); } @@ -173,7 +173,7 @@ void tracing::set_trace_probability(double p) { _trace_probability = p; _normalized_trace_probability = std::llround(_trace_probability * (_gen.max() + 1)); - logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability); + tracing_logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability); } } From 63a0502ed165dfd1a9e5684ed8e7824acaf64743 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Fri, 22 Jul 2016 20:41:47 +0300 Subject: [PATCH 3/7] tracing: rework the interface between the tracing/trace_state and the backend Before this patch the interaction between the layers above was as follows: - trace_state was passing the trace event data to a backend object every time trace() method was called. - trace_state was passing the session data to a backend object in a destructor. - A backend object was storing this data in a form of lambda where all data above was caught in a capture list. This was primarily done in order to delay the call for make_xxx_mutation(). Lambdas were stored in a map by a session ID and they were executed when a kick() method was called. - A tracing::tracing object was periodically calling a kick() method of a backend that was initiating a write of all pending data to the storage. All backend methods used in the described above interactions were virtual. Thereby, for instance, for each and every trace record we were calling a virtual method that was receiving a significant amount of parameters, store a lambda in a map and return. This is clearly a suboptimal way of using virtual functions since we prevent a compiler from inlining an obviously inlinable operations. This patch changes the interaction scheme to be as follows: - Trace events and session data are stored and passed around in a form of structs that hold all relevant information (no more lambdas). - As long as a trace session is active its data is aggregated inside the corresponding trace_state object. - The object containing all records is passed and stored as a lw_shared_ptr to save extra copies and to shorten capture lists. - All aggregated data is passed to a tracing::tracing object in a trace_state destructor. The data is stored in a std::deque in a tracing::tracing object (instead of a map by a session ID). - A single backend's virtual method call writes all data aggregated so far (kick() method is not needed any more), every time a write event occurs. - Backend has only one virtual method now: - Write a bulk of sessions' data aggregated so far. - Backend's virtual method receives a records bulk object by reference. As a result: - A latency of a single trace event that has no formatting improved from 0.2us to 0.1us. Signed-off-by: Vlad Zolotarov --- tracing/trace_keyspace_helper.cc | 146 +++++++++++-------------------- tracing/trace_keyspace_helper.hh | 79 +++++------------ tracing/trace_state.cc | 29 +++--- tracing/trace_state.hh | 44 ++++++---- tracing/tracing.cc | 14 +-- tracing/tracing.hh | 105 +++++++++++----------- 6 files changed, 177 insertions(+), 240 deletions(-) diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index a8add2b0f7..72fb77c5d3 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -174,63 +174,51 @@ future<> trace_keyspace_helper::start() { } } -void trace_keyspace_helper::write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) { - try { - _mutation_makers[session_id].first = [request = std::move(request), client, parameters = std::move(parameters), started_at, command, elapsed, ttl, this] (const utils::UUID& session_id) { - return make_session_mutation(session_id, client, parameters, request, started_at, type_to_string(command), elapsed, ttl); - }; - } catch (...) { - // OOM: ignore - } +void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { + with_gate(_pending_writes, [this, records = std::move(records)] { + return this->flush_one_session_mutations(std::move(records)).finally([this] { _local_tracing.write_complete(); }); + }).handle_exception([this] (auto ep) { + try { + ++_stats.tracing_errors; + std::rethrow_exception(ep); + } catch (exceptions::overloaded_exception&) { + logger.warn("Too many nodes are overloaded to save trace events"); + } catch (bad_column_family& e) { + if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) { + logger.warn("Tracing is enabled but {}", e.what()); + } + } catch (...) { + // TODO: Handle some more exceptions maybe? + } + }).discard_result(); } -void trace_keyspace_helper::write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) { - try { - _mutation_makers[session_id].second.makers.emplace_back([message = std::move(message), elapsed, ttl, event_time_point, this] (const utils::UUID& session_id) mutable { - return make_event_mutation(session_id, message, elapsed, _local_tracing.get_thread_name(), ttl, event_time_point); - }); - } catch (...) { - // OOM: ignore - } +void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) { + std::for_each(bulk.begin(), bulk.end(), [this] (records_bulk::value_type& one_session_records_ptr) { + write_one_session_records(std::move(one_session_records_ptr)); + }); } -mutation trace_keyspace_helper::make_session_mutation( - const utils::UUID& session_id, - gms::inet_address client, - const std::unordered_map& parameters, - const sstring& request, - long started_at, - const sstring& command, - int elapsed, - gc_clock::duration ttl) { +mutation trace_keyspace_helper::make_session_mutation(const one_session_records& session_records) { schema_ptr schema = get_schema_ptr_or_create(_sessions_id, SESSIONS, _sessions_create_cql, [this] (const schema_ptr& s) { return cache_sessions_table_handles(s); }); - auto key = partition_key::from_singular(*schema, session_id); + auto key = partition_key::from_singular(*schema, session_records.session_id); + auto ttl = session_records.ttl; + const session_record& record = session_records.session_rec; auto timestamp = api::new_timestamp(); mutation m(key, schema); auto& cells = m.partition().clustered_row(clustering_key::make_empty(*schema)).cells(); - cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(client.addr()), ttl)); + cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(record.client.addr()), ttl)); cells.apply(*_coordinator_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl)); - cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(request), ttl)); - cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(started_at), ttl)); - cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(command), ttl)); - cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)elapsed), ttl)); + cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.request), ttl)); + cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(record.started_at), ttl)); + cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(type_to_string(record.command)), ttl)); + cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)record.elapsed), ttl)); std::vector> map_cell; - for (auto& param_pair : parameters) { + for (auto& param_pair : record.parameters) { map_cell.emplace_back(utf8_type->decompose(param_pair.first), atomic_cell::make_live(timestamp, utf8_type->decompose(param_pair.second), ttl)); } @@ -241,82 +229,52 @@ mutation trace_keyspace_helper::make_session_mutation( return m; } -mutation trace_keyspace_helper::make_event_mutation(const utils::UUID& session_id, - const sstring& message, - int elapsed, - const sstring& thread_name, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) { +mutation trace_keyspace_helper::make_event_mutation(one_session_records& session_records, const event_record& record) { schema_ptr schema = get_schema_ptr_or_create(_events_id, EVENTS, _events_create_cql, [this] (const schema_ptr& s) { return cache_events_table_handles(s); }); - auto key = partition_key::from_singular(*schema, session_id); + auto key = partition_key::from_singular(*schema, session_records.session_id); + auto ttl = session_records.ttl; auto timestamp = api::new_timestamp(); mutation m(key, schema); - auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(event_time_point)))).cells(); + auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(record.event_time_point)))).cells(); - cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(message), ttl)); + cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.message), ttl)); cells.apply(*_source_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl)); - cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(thread_name), ttl)); - - assert(elapsed >= 0); - cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(elapsed), ttl)); + cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(_local_tracing.get_thread_name()), ttl)); + cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(record.elapsed), ttl)); return m; } -future<> trace_keyspace_helper::flush_one_session_mutations(utils::UUID session_id, std::pair& mutation_makers) { - return make_ready_future<>().then([this, session_id, events_makers = std::move(mutation_makers.second.makers)] { - if (events_makers.size()) { +future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr records) { + return futurize::apply([this, records] { + auto& events_records = records->events_recs; + + if (events_records.size()) { // Reset the "monotinic time point" state machine since it's // relevant in a context of a single tracing session only. The // events from different sessions will differ by a session UUID. reset_monotonic_tp(); - logger.trace("{}: events number is {}", session_id, events_makers.size()); - mutation m((*events_makers.begin())(session_id)); - std::for_each(std::next(events_makers.begin()), events_makers.end(), [&m, &session_id] (const mutation_maker& maker) mutable { m.apply(maker(session_id)); }); + + logger.trace("{}: events number is {}", records->session_id, events_records.size()); + mutation m(make_event_mutation(*records, *events_records.begin())); + std::for_each(std::next(events_records.begin()), events_records.end(), [this, &m, &all_records = *records] (const event_record& record) { m.apply(make_event_mutation(all_records, record)); }); + return service::get_local_storage_proxy().mutate({std::move(m)}, db::consistency_level::ANY, nullptr); } else { return make_ready_future<>(); } - }).then([session_id = std::move(session_id), session_maker = std::move(mutation_makers.first)] { - if (session_maker) { - logger.trace("{}: storing a session event", session_id); - return service::get_local_storage_proxy().mutate({session_maker(session_id)}, db::consistency_level::ANY, nullptr); + }).then([this, records] { + if (records->session_rec.elapsed >= 0) { + logger.trace("{}: storing a session event", records->session_id); + return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr); } else { return make_ready_future<>(); } }); } -void trace_keyspace_helper::kick() { - logger.trace("flushing {} sessions", _mutation_makers.size()); - parallel_for_each(_mutation_makers,[this](decltype(_mutation_makers)::value_type& uuid_mutation_makers) { - return with_gate(_pending_writes, [this, &uuid_mutation_makers] { - logger.trace("{}: flushing traces", uuid_mutation_makers.first); - return this->flush_one_session_mutations(uuid_mutation_makers.first, uuid_mutation_makers.second).finally([this] { _local_tracing.write_complete(); }); - }).handle_exception([this] (auto ep) { - try { - ++_stats.tracing_errors; - std::rethrow_exception(ep); - } catch (exceptions::overloaded_exception&) { - logger.warn("Too many nodes are overloaded to save trace events"); - } catch (bad_column_family& e) { - if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) { - logger.warn("Tracing is enabled but {}", e.what()); - } - } catch (...) { - // TODO: Handle some more exceptions maybe? - } - }); - }).discard_result(); - - // We can clear the hash table here because we cared to capture all relevant - // data from it in lambdas' capture-lists inside - // flush_one_session_mutations() before any asynchronous call. - _mutation_makers.clear(); -} - using registry = class_registrator; static registry registrator1("trace_keyspace_helper"); diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 02b5632e7c..19034c402f 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -47,28 +47,15 @@ namespace tracing { -class trace_keyspace_helper : public i_tracing_backend_helper { +class trace_keyspace_helper final : public i_tracing_backend_helper { public: static const sstring KEYSPACE_NAME; static const sstring SESSIONS; static const sstring EVENTS; private: - using mutation_maker = std::function; - static constexpr int bad_column_family_message_period = 10000; - struct events_mutation_makers { - std::vector makers; - public: - events_mutation_makers() { - makers.reserve(tracing::max_trace_events_per_session); - } - }; - - // a hash table of session ID to one session mutation and a vector of events mutations - std::unordered_map> _mutation_makers; - seastar::gate _pending_writes; sstring _sessions_create_cql; @@ -110,26 +97,19 @@ public: virtual future<> start() override; virtual future<> stop() override { - kick(); return _pending_writes.close(); }; - virtual void write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) override; - - virtual void write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) override; + virtual void write_records_bulk(records_bulk& bulk) override; private: + /** + * Write records of a single tracing session + * + * @param records records to write + */ + void write_one_session_records(lw_shared_ptr records); + /** * Makes a monotonically increasing value in 100ns based on the given time stamp. * @@ -161,8 +141,6 @@ private: _last_event_nanos = 0; } - virtual void kick() override; - /** * Tries to create a table with a given name and using the provided CQL * command. @@ -179,14 +157,12 @@ private: * Flush mutations of one particular tracing session. First "events" * mutations and then, when they are complete, a "sessions" mutation. * - * @param session_id ID of a tracing session - * @param mutation_makers a pair of a "sessions" mutation maker and an array - * of "events" mutations makers. + * @param records records describing the session's records * * @return A future that resolves when applying of above mutations is * complete. */ - future<> flush_one_session_mutations(utils::UUID session_id, std::pair& mutation_makers); + future<> flush_one_session_mutations(lw_shared_ptr records); /** * Get a schema_ptr by a table (UU)ID. If not found will try to get it by @@ -230,33 +206,26 @@ private: */ bool cache_events_table_handles(const schema_ptr& s); - mutation make_session_mutation(const utils::UUID& session_id, - gms::inet_address client, - const std::unordered_map& parameters, - const sstring& request, - long started_at, - const sstring& command, - int elapsed, - gc_clock::duration ttl); + /** + * Create a mutation for a new session record + * + * @param all_records_handle handle to access an object with all records of this session + * + * @return the relevant mutation + */ + mutation make_session_mutation(const one_session_records& all_records_handle); /** * Create a mutation for a new trace point record * - * @param session_id tracing session ID - * @param message trace record message - * @param elapsed time elapsed since begin() till this trace point - * @param thread_name - * @param ttl - * @param event_time_stamp wall clock time point of this trace event + * @param session_records handle to access an object with all records of this session. + * It's needed here in order to update the last event's mutation + * timestamp value stored inside it. + * @param record data describing this trace event * * @return the relevant mutation */ - mutation make_event_mutation(const utils::UUID& session_id, - const sstring& message, - int elapsed, - const sstring& thread_name, - gc_clock::duration ttl, - wall_clock::time_point event_time_stamp); + mutation make_event_mutation(one_session_records& session_records, const event_record& record); }; struct bad_column_family : public std::exception { diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 25e3ebabe9..41b902029d 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -49,12 +49,12 @@ namespace tracing { static logging::logger logger("trace_state"); -std::unordered_map trace_state::get_params() { +void trace_state::build_parameters_map() { if (!_params_ptr) { - return {}; + return; } - std::unordered_map params_map; + auto& params_map = _records->session_rec.parameters; params_values& vals = *_params_ptr; if (vals.batchlog_endpoints) { @@ -80,8 +80,6 @@ std::unordered_map trace_state::get_params() { if (vals.user_timestamp) { params_map.emplace("user_timestamp", seastar::format("{:d}", *vals.user_timestamp)); } - - return params_map; } trace_state::~trace_state() { @@ -98,24 +96,21 @@ trace_state::~trace_state() { // and we do want to know about it. ++_pending_trace_events; - // get_params() may throw. We don't want to record the session - // record in this case since its data may be incomplete. These - // events should be really rare however, therefore we don't want to - // optimize this flow (e.g. rollback the corresponding events' - // records). + // build_parameters_map() may throw. We don't want to record the + // session record in this case since its data may be incomplete. + // These events should be really rare however, therefore we don't + // want to optimize this flow (e.g. rollback the corresponding + // events' records). try { - _local_backend.write_session_record(_session_id, _client, get_params(), std::move(_request), _started_at, _type, elapsed(), _ttl); + build_parameters_map(); + _records->session_rec.elapsed = elapsed(); } catch (...) { // Bump up an error counter and ignore ++_local_tracing_ptr->stats.trace_errors; } } - _local_tracing_ptr->end_session(); - - if (_write_on_close) { - _local_tracing_ptr->write_pending_records(); - } + _local_tracing_ptr->end_session(_records, _write_on_close); // update some stats and get out... auto& tracing_stats = _local_tracing_ptr->stats; @@ -123,7 +118,7 @@ trace_state::~trace_state() { tracing_stats.trace_events_count += _pending_trace_events; if (_pending_trace_events >= tracing::max_trace_events_per_session) { - logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", _session_id); + logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", _records->session_id); if (++tracing_stats.max_traces_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { logger.warn("Maximum traces per session limit is hit {} times", tracing_stats.max_traces_threshold_hits); diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 6d30fa8ceb..c9b977c2d6 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -54,13 +54,11 @@ class trace_state final { using clock_type = std::chrono::steady_clock; private: - utils::UUID _session_id; - trace_type _type; + lw_shared_ptr _records; bool _write_on_close; // Used for calculation of time passed since the beginning of a tracing // session till each tracing event. clock_type::time_point _start; - gc_clock::duration _ttl; // TRUE for a primary trace_state object bool _primary; bool _tracing_began = false; @@ -69,7 +67,6 @@ private: sstring _request; int _pending_trace_events = 0; shared_ptr _local_tracing_ptr; - i_tracing_backend_helper& _local_backend; struct params_values { std::experimental::optional> batchlog_endpoints; @@ -107,23 +104,24 @@ private: public: trace_state(trace_type type, bool write_on_close, const std::experimental::optional& session_id = std::experimental::nullopt) - : _session_id(session_id ? *session_id : utils::UUID_gen::get_time_UUID()) - , _type(type) - , _write_on_close(write_on_close) - , _ttl(ttl_by_type(_type)) + : _write_on_close(write_on_close) , _primary(!session_id) , _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this()) - , _local_backend(_local_tracing_ptr->backend_helper()) - { } + { + _records = make_lw_shared(); + _records->session_id = session_id ? *session_id : utils::UUID_gen::get_time_UUID(); + _records->ttl = ttl_by_type(type); + _records->session_rec.command = type; + } ~trace_state(); const utils::UUID& get_session_id() const { - return _session_id; + return _records->session_id; } trace_type get_type() const { - return _type; + return _records->session_rec.command; } bool get_write_on_close() const { @@ -163,9 +161,9 @@ private: */ void begin(sstring request, gms::inet_address client) { begin(); - _started_at = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - _request = std::move(request); - _client = std::move(client); + _records->session_rec.client = client; + _records->session_rec.request = std::move(request); + _records->session_rec.started_at = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); } template @@ -249,7 +247,12 @@ private: _params_ptr->user_timestamp.emplace(val); } - std::unordered_map get_params(); + /** + * Fill the map in a session's record with the values set so far. + * + * @param params_map the map to fill + */ + void build_parameters_map(); /** * Add a single trace entry - a special case for a simple string. @@ -302,8 +305,13 @@ inline void trace_state::trace(sstring message) { return; } - _local_backend.write_event_record(_session_id, std::move(message), elapsed(), _ttl, i_tracing_backend_helper::wall_clock::now()); - ++_pending_trace_events; + try { + _records->events_recs.emplace_back(std::move(message), elapsed(), i_tracing_backend_helper::wall_clock::now()); + ++_pending_trace_events; + } catch (...) { + // Bump up an error counter and ignore + ++_local_tracing_ptr->stats.trace_errors; + } } template diff --git a/tracing/tracing.cc b/tracing/tracing.cc index b34a62b258..9fbca4e292 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -81,7 +81,7 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance , "queue_length", "pending_for_write_sessions") - , scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_sessions)), + , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _pending_for_write_records_bulk.size(); })), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance , "queue_length", "flushing_sessions") @@ -108,7 +108,7 @@ future<> tracing::create_tracing(const sstring& tracing_backend_class_name) { trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id) { trace_state_ptr tstate; try { - if (_active_sessions + _pending_for_write_sessions + _flushing_sessions > 2 * max_pending_for_write_sessions) { + if (_active_sessions + _pending_for_write_records_bulk.size() + _flushing_sessions > 2 * max_pending_for_write_sessions) { if (session_id) { tracing_logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id); } else { @@ -116,8 +116,8 @@ trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, co } if (++stats.max_sessions_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - tracing_logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_flush_sessions {}, flushing_sessions {}", - stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_sessions, _flushing_sessions); + tracing_logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_write_sessions {}, flushing_sessions {}", + stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_records_bulk.size(), _flushing_sessions); } return trace_state_ptr(); @@ -142,13 +142,17 @@ void tracing::write_timer_callback() { return; } - tracing_logger.trace("Timer kicks in: {}", _pending_for_write_sessions ? "writing" : "not writing"); + tracing_logger.trace("Timer kicks in: {}", _pending_for_write_records_bulk.size() ? "writing" : "not writing"); write_pending_records(); _write_timer.arm(write_period); } future<> tracing::shutdown() { tracing_logger.info("Asked to shut down"); + if (_down) { + throw std::logic_error("tracing: shutdown() called for the service that is already down"); + } + write_pending_records(); _down = true; _write_timer.cancel(); diff --git a/tracing/tracing.hh b/tracing/tracing.hh index 8b720e0dcc..f8c965f0f0 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -99,6 +99,9 @@ public: { } }; +struct one_session_records; +using records_bulk = std::deque>; + struct i_tracing_backend_helper { using wall_clock = std::chrono::system_clock; @@ -112,53 +115,43 @@ public: virtual future<> stop() = 0; /** - * Write a new tracing session record + * Write a bulk of tracing records * - * @param session_id tracing session ID - * @param client client IP - * @param parameters optional parameters - * @param request request we are tracing - * @param started_at amount of milliseconds passed since Epoch before this - * session is started (on a Coordinator Node) - * @param command a type of this trace - * @param elapsed number of microseconds this tracing session took - * @param ttl TTL of a session record + * @param bulk a bulk of records */ - virtual void write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) = 0; - - /** - * Write a new tracing event record - * @param session_id tracing session ID - * @param message tracing message - * @param elapsed number of microseconds passed since a beginning of a - * corresponding tracing session till this event - * @param ttl TTL of the event record - * @param event_time_point time point when a record was taken - */ - virtual void write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) = 0; + virtual void write_records_bulk(records_bulk& bulk) = 0; private: - /** - * Commit all pending tracing records to the underlying storage. - * The implementation has to call tracing::tracing::write_complete(nr) for - * each "nr" completed records once they are written to the backend. - */ - virtual void kick() = 0; - friend class tracing; }; +struct event_record { + sstring message; + int elapsed; + i_tracing_backend_helper::wall_clock::time_point event_time_point; + + event_record(sstring message_, int elapsed_, i_tracing_backend_helper::wall_clock::time_point event_time_point_) + : message(std::move(message_)) + , elapsed(elapsed_) + , event_time_point(event_time_point_) {} +}; + +struct session_record { + gms::inet_address client; + std::unordered_map parameters; + sstring request; + long started_at = 0; + trace_type command = trace_type::NONE; + int elapsed = -1; +}; + +struct one_session_records { + utils::UUID session_id; + session_record session_rec; + gc_clock::duration ttl; + std::deque events_recs; +}; + using trace_state_ptr = lw_shared_ptr; class tracing : public seastar::async_sharded_service { @@ -178,8 +171,8 @@ public: private: uint64_t _active_sessions = 0; - uint64_t _pending_for_write_sessions = 0; uint64_t _flushing_sessions = 0; + records_bulk _pending_for_write_records_bulk; timer _write_timer; bool _down = false; std::unique_ptr _tracing_backend_helper_ptr; @@ -228,14 +221,11 @@ public: future<> shutdown(); void write_pending_records() { - // if service is down - do nothing - if (_down) { - return; + if (_pending_for_write_records_bulk.size()) { + _flushing_sessions += _pending_for_write_records_bulk.size(); + _tracing_backend_helper_ptr->write_records_bulk(_pending_for_write_records_bulk); + _pending_for_write_records_bulk.clear(); } - - _flushing_sessions += _pending_for_write_sessions; - _pending_for_write_sessions = 0; - _tracing_backend_helper_ptr->kick(); } void write_complete(uint64_t nr = 1) { @@ -256,10 +246,23 @@ public: */ trace_state_ptr create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id = std::experimental::nullopt); - void end_session() { + void end_session(lw_shared_ptr records, bool write_now) { --_active_sessions; - ++_pending_for_write_sessions; - if (_pending_for_write_sessions >= max_pending_for_write_sessions) { + + // if service is down - drop the records and return + if (_down) { + return; + } + + try { + _pending_for_write_records_bulk.emplace_back(std::move(records)); + } catch (...) { + // OOM: bump up the error counter and ignore + ++stats.trace_errors; + return; + } + + if (write_now || _pending_for_write_records_bulk.size() >= max_pending_for_write_sessions) { write_pending_records(); } } From d8fe5317d18b97594a49e146e8214196e1ff6f4f Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 2 Aug 2016 19:02:36 +0300 Subject: [PATCH 4/7] tracing::trace_keyspace_helper: make events' mutations applying loop interruptible When building events' mutation don't apply them in a tight loop but rather apply each of them in a separate continuation to allow reactor to interrupt this loop if it takes too long for it to complete (e.g. where there are a lot of mutations to apply). Since building all events' mutations is asynchronous now we can no longer keep the "nanos" state in a global trace_keyspace_helper object but rather have to move it into the per-session backend_session_state class. backend_session_state class is a backend-specific implementation of a tracing::backend_session_state_base class. An instance of the above object is created by a tracing::i_tracing_backend_helper::allocate_session_state() virtual method and is stored in a tracing::one_session_records object. Signed-off-by: Vlad Zolotarov --- tracing/trace_keyspace_helper.cc | 51 +++++++++++++++++++++----------- tracing/trace_keyspace_helper.hh | 30 ++++++++----------- tracing/tracing.cc | 3 ++ tracing/tracing.hh | 17 +++++++++++ 4 files changed, 67 insertions(+), 34 deletions(-) diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 72fb77c5d3..b9252125e5 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -52,6 +52,11 @@ const sstring trace_keyspace_helper::KEYSPACE_NAME("system_traces"); const sstring trace_keyspace_helper::SESSIONS("sessions"); const sstring trace_keyspace_helper::EVENTS("events"); +struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base { + int64_t last_nanos = 0; + virtual ~trace_keyspace_backend_sesssion_state() {} +}; + trace_keyspace_helper::trace_keyspace_helper(tracing& tr) : i_tracing_backend_helper(tr) , _registrations{ @@ -235,9 +240,11 @@ mutation trace_keyspace_helper::make_event_mutation(one_session_records& session auto key = partition_key::from_singular(*schema, session_records.session_id); auto ttl = session_records.ttl; + auto backend_state_ptr = static_cast(session_records.backend_state_ptr.get()); + int64_t& last_event_nanos = backend_state_ptr->last_nanos; auto timestamp = api::new_timestamp(); mutation m(key, schema); - auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(record.event_time_point)))).cells(); + auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(last_event_nanos, record.event_time_point)))).cells(); cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.message), ttl)); cells.apply(*_source_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl)); @@ -247,24 +254,30 @@ mutation trace_keyspace_helper::make_event_mutation(one_session_records& session return m; } +future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records) { + std::deque& events_records = records->events_recs; + + if (events_records.empty()) { + return make_ready_future<>(); + } + + logger.trace("{}: storing {} events records", records->session_id, events_records.size()); + + mutation m(make_event_mutation(*records, *events_records.begin())); + + return do_with(std::move(m), std::move(events_records), [this, records] (mutation& m, std::deque& events_records) { + return do_for_each(std::next(events_records.begin()), events_records.end(), [this, &m, &events_records, all_records = records] (event_record& one_event_record) { + m.apply(make_event_mutation(*all_records, one_event_record)); + return make_ready_future<>(); + }).then([&m] { + return service::get_local_storage_proxy().mutate({std::move(m)}, db::consistency_level::ANY, nullptr); + }); + }); +} + future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr records) { return futurize::apply([this, records] { - auto& events_records = records->events_recs; - - if (events_records.size()) { - // Reset the "monotinic time point" state machine since it's - // relevant in a context of a single tracing session only. The - // events from different sessions will differ by a session UUID. - reset_monotonic_tp(); - - logger.trace("{}: events number is {}", records->session_id, events_records.size()); - mutation m(make_event_mutation(*records, *events_records.begin())); - std::for_each(std::next(events_records.begin()), events_records.end(), [this, &m, &all_records = *records] (const event_record& record) { m.apply(make_event_mutation(all_records, record)); }); - - return service::get_local_storage_proxy().mutate({std::move(m)}, db::consistency_level::ANY, nullptr); - } else { - return make_ready_future<>(); - } + return apply_events_mutation(records); }).then([this, records] { if (records->session_rec.elapsed >= 0) { logger.trace("{}: storing a session event", records->session_id); @@ -275,6 +288,10 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr trace_keyspace_helper::allocate_session_state() const { + return std::make_unique(); +} + using registry = class_registrator; static registry registrator1("trace_keyspace_helper"); diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 19034c402f..3cbcec5fcc 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -81,8 +81,6 @@ private: uint64_t bad_column_family_errors = 0; } _stats; - int64_t _last_event_nanos = 0; - scollectd::registrations _registrations; public: @@ -101,6 +99,7 @@ public: }; virtual void write_records_bulk(records_bulk& bulk) override; + virtual std::unique_ptr allocate_session_state() const override; private: /** @@ -111,36 +110,31 @@ private: void write_one_session_records(lw_shared_ptr records); /** - * Makes a monotonically increasing value in 100ns based on the given time stamp. + * Makes a monotonically increasing value in 100ns ("nanos") based on the given time + * stamp and the "nanos" value of the previous event. * * If the amount of 100s of ns evaluated from the @param tp is equal to the - * value returned in the previouse call to get_next_nanos() (without a reset_monotonic_tp() - * call in between), increment it by one. + * given @param last_event_nanos increment @param last_event_nanos by one + * and return a time point based its new value. * + * @param last_event_nanos a reference to the last nanos to align the given time point to. * @param tp the amount of time passed since the Epoch that will be used for the calculation. * * @return the monotonically increasing vlaue in 100s of ns based on the - * given time stamp. + * given time stamp and on the "nanos" value of the previous event. */ - wall_clock::time_point make_monotonic_UUID_tp(wall_clock::time_point tp) { + wall_clock::time_point make_monotonic_UUID_tp(int64_t& last_event_nanos, wall_clock::time_point tp) { using namespace std::chrono; auto tp_nanos = duration_cast(tp.time_since_epoch()).count() / 100; - if (tp_nanos > _last_event_nanos) { - _last_event_nanos = tp_nanos; + if (tp_nanos > last_event_nanos) { + last_event_nanos = tp_nanos; return tp; } else { - return wall_clock::time_point(nanoseconds((++_last_event_nanos) * 100)); + return wall_clock::time_point(nanoseconds((++last_event_nanos) * 100)); } } - /** - * Reset the make_monotonic_tp() state machine. - */ - void reset_monotonic_tp() { - _last_event_nanos = 0; - } - /** * Tries to create a table with a given name and using the provided CQL * command. @@ -164,6 +158,8 @@ private: */ future<> flush_one_session_mutations(lw_shared_ptr records); + future<> apply_events_mutation(lw_shared_ptr records); + /** * Get a schema_ptr by a table (UU)ID. If not found will try to get it by * name. If not found will issue a table creation and throw diff --git a/tracing/tracing.cc b/tracing/tracing.cc index 9fbca4e292..f786868e71 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -179,5 +179,8 @@ void tracing::set_trace_probability(double p) { tracing_logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability); } + +one_session_records::one_session_records() + : backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state()) {} } diff --git a/tracing/tracing.hh b/tracing/tracing.hh index f8c965f0f0..d93c3fbffe 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -102,6 +102,10 @@ public: struct one_session_records; using records_bulk = std::deque>; +struct backend_session_state_base { + virtual ~backend_session_state_base() {}; +}; + struct i_tracing_backend_helper { using wall_clock = std::chrono::system_clock; @@ -121,6 +125,8 @@ public: */ virtual void write_records_bulk(records_bulk& bulk) = 0; + virtual std::unique_ptr allocate_session_state() const = 0; + private: friend class tracing; }; @@ -150,6 +156,13 @@ struct one_session_records { session_record session_rec; gc_clock::duration ttl; std::deque events_recs; + std::unique_ptr backend_state_ptr; + + one_session_records(); + + uint64_t size() const { + return events_recs.size() + (session_rec.elapsed >= 0); + } }; using trace_state_ptr = lw_shared_ptr; @@ -284,6 +297,10 @@ public: return _normalized_trace_probability != 0 && _gen() < _normalized_trace_probability; } + std::unique_ptr allocate_backend_session_state() const { + return _tracing_backend_helper_ptr->allocate_session_state(); + } + private: void write_timer_callback(); }; From 5391bcc5a90f6f7747dba44b61411306a08285bf Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 25 Jul 2016 10:55:39 +0300 Subject: [PATCH 5/7] tracing: improve a back pressure policy Use a per-shard tracing records budget instead of maintaining a fixed-size per-session records budget and a per-shard sessions budget. The original policy could lead to some irrational situations, when we have a single tracing session that creates a substantial amount of records that we can handle but we would start dropping new records after it surpasses the per-session limit. The new policy handles a per-shard trace records budget that is being consumed by each trace() call and by a primary session destructor when a session record is created. Each active record may only be in one of the following states: - cached: stored in its session's object. When record is in this state it's not going to be written to I/O during the next write event. - pending for write: when record is in this state it's going to be written to I/O during the next write event. - flushing: the record is being currently written to the I/O. There are counters of the total amount of records in each state above. Each record may only be in a specific state at every point of time and thereby it must be accounted only in one and only one of the three counters. The sum of all three counters should not be greater than (max_pending_trace_records + write_event_records_threshold) at any time (actually it can get as high as a value above plus (max_pending_sessions) if all sessions are primary but we won't take this into an account for simplicity). The same is about the number of outstanding sessions: it may not be greater than (max_pending_sessions + write_event_sessions_threshold) at any time. If total number of tracing records is greater or equal to the limit above, the new trace point is going to be dropped. If current number or records plus the expected number of trace records per session (exp_trace_events_per_session) is greater than the limit above new sessions will be dropped. A new session will also be dropped if there are too many active sessions. When the record or a session is dropped the appropriate statistics counters are updated and there is a rate-limited warning message printed to the log. Every time a number of records pending for write is greater or equal to (write_event_records_threshold) or a number of sessions pending for write is greater or equal to (write_event_sessions_threshold) a write event is issued. Every 2 seconds a timer would write all pending for write records available so far. Signed-off-by: Vlad Zolotarov --- tracing/trace_keyspace_helper.cc | 6 +- tracing/trace_keyspace_helper.hh | 3 + tracing/trace_state.cc | 35 +++----- tracing/trace_state.hh | 18 +++- tracing/tracing.cc | 42 +++++---- tracing/tracing.hh | 148 +++++++++++++++++++++++++++---- 6 files changed, 192 insertions(+), 60 deletions(-) diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index b9252125e5..1708a0c541 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -181,7 +181,8 @@ future<> trace_keyspace_helper::start() { void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { with_gate(_pending_writes, [this, records = std::move(records)] { - return this->flush_one_session_mutations(std::move(records)).finally([this] { _local_tracing.write_complete(); }); + auto num_records = records->size(); + return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); }); }).handle_exception([this] (auto ep) { try { ++_stats.tracing_errors; @@ -199,6 +200,7 @@ void trace_keyspace_helper::write_one_session_records(lw_shared_ptr trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr::apply([this, records] { return apply_events_mutation(records); }).then([this, records] { - if (records->session_rec.elapsed >= 0) { + if (records->session_rec.ready()) { logger.trace("{}: storing a session event", records->session_id); return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr); } else { diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 3cbcec5fcc..794df0ec39 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -151,6 +151,9 @@ private: * Flush mutations of one particular tracing session. First "events" * mutations and then, when they are complete, a "sessions" mutation. * + * @note This function guaranties that it'll handle exactly the same number + * of records @param records had when the function was invoked. + * * @param records records describing the session's records * * @return A future that resolves when applying of above mutations is diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 41b902029d..cc06ad63f5 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -47,7 +47,7 @@ namespace tracing { -static logging::logger logger("trace_state"); +logging::logger trace_state_logger("trace_state"); void trace_state::build_parameters_map() { if (!_params_ptr) { @@ -90,40 +90,29 @@ trace_state::~trace_state() { // event and we don't want to cripple the primary session by // "stealing" one trace() event from it. // - // We do want to report it in statistics however. If for instance - // there are a lot of tracing sessions that only open itself and - // then do nothing - they will create a lot of session_record events - // and we do want to know about it. - ++_pending_trace_events; + // We do want to account them however. If for instance there are a + // lot of tracing sessions that only open itself and then do nothing + // - they will create a lot of session_record events and we do want + // to handle this case properly. + _records->consume_from_budget(); // build_parameters_map() may throw. We don't want to record the - // session record in this case since its data may be incomplete. + // session's record in this case since its data may be incomplete. // These events should be really rare however, therefore we don't // want to optimize this flow (e.g. rollback the corresponding - // events' records). + // events' records that have already been sent to I/O). try { build_parameters_map(); _records->session_rec.elapsed = elapsed(); } catch (...) { - // Bump up an error counter and ignore + // Bump up an error counter, drop any pending events records and + // continue ++_local_tracing_ptr->stats.trace_errors; + _records->events_recs.clear(); } } - _local_tracing_ptr->end_session(_records, _write_on_close); - - // update some stats and get out... - auto& tracing_stats = _local_tracing_ptr->stats; - - tracing_stats.trace_events_count += _pending_trace_events; - - if (_pending_trace_events >= tracing::max_trace_events_per_session) { - logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", _records->session_id); - - if (++tracing_stats.max_traces_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - logger.warn("Maximum traces per session limit is hit {} times", tracing_stats.max_traces_threshold_hits); - } - } + _local_tracing_ptr->end_session(std::move(_records), _write_on_close); } } } diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index c9b977c2d6..29ca3f49bb 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -50,6 +50,8 @@ namespace tracing { +extern logging::logger trace_state_logger; + class trace_state final { using clock_type = std::chrono::steady_clock; @@ -301,13 +303,25 @@ inline void trace_state::trace(sstring message) { throw std::logic_error("trying to use a trace() before begin() for \"" + message + "\" tracepoint"); } - if (_pending_trace_events >= tracing::max_trace_events_per_session) { + // We don't want the total amount of pending, active and flushing records to + // bypass two times the maximum number of pending records. + // + // If either records are being created too fast or a backend doesn't + // keep up we want to start dropping records. + // In any case, this should be rare, therefore we don't try to optimize this + // flow. + if (!_local_tracing_ptr->have_records_budget()) { + tracing_logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", get_session_id()); + if ((++_local_tracing_ptr->stats.dropped_records) % tracing::log_warning_period == 1) { + tracing_logger.warn("Maximum records limit is hit {} times", _local_tracing_ptr->stats.dropped_records); + } + return; } try { _records->events_recs.emplace_back(std::move(message), elapsed(), i_tracing_backend_helper::wall_clock::now()); - ++_pending_trace_events; + _records->consume_from_budget(); } catch (...) { // Bump up an error counter and ignore ++_local_tracing_ptr->stats.trace_errors; diff --git a/tracing/tracing.cc b/tracing/tracing.cc index f786868e71..53e0553562 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -44,7 +44,7 @@ namespace tracing { -static logging::logger tracing_logger("tracing"); +logging::logger tracing_logger("tracing"); const gc_clock::duration tracing::tracing::write_period = std::chrono::seconds(2); @@ -60,16 +60,16 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) , _registrations{ scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "max_sessions_threshold_hits") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_sessions_threshold_hits)), + , "total_operations", "dropped_sessions") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_sessions)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "max_traces_threshold_hits") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_traces_threshold_hits)), + , "total_operations", "dropped_records") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_records)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "trace_events_count") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_events_count)), + , "total_operations", "trace_records_count") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_records_count)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance , "total_operations", "trace_errors") @@ -80,12 +80,16 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) , scollectd::make_typed(scollectd::data_type::GAUGE, _active_sessions)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "queue_length", "pending_for_write_sessions") - , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { return _pending_for_write_records_bulk.size(); })), + , "queue_length", "cached_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _cached_records)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "queue_length", "flushing_sessions") - , scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_sessions))} + , "queue_length", "pending_for_write_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_records_count)), + scollectd::add_polled_metric(scollectd::type_instance_id("tracing" + , scollectd::per_cpu_plugin_instance + , "queue_length", "flushing_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_records))} , _gen(std::random_device()()) { try { _tracing_backend_helper_ptr = create_object(tracing_backend_helper_class_name, *this); @@ -108,16 +112,17 @@ future<> tracing::create_tracing(const sstring& tracing_backend_class_name) { trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id) { trace_state_ptr tstate; try { - if (_active_sessions + _pending_for_write_records_bulk.size() + _flushing_sessions > 2 * max_pending_for_write_sessions) { + // Don't create a session if its records are likely to be dropped + if (!have_records_budget(exp_trace_events_per_session) || _active_sessions >= max_pending_sessions + write_event_sessions_threshold) { if (session_id) { - tracing_logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id); + tracing_logger.trace("{}: Too many outstanding tracing records or sessions. Dropping a secondary session", session_id); } else { - tracing_logger.trace("Maximum sessions count is reached. Dropping a primary session"); + tracing_logger.trace("Too many outstanding tracing records or sessions. Dropping a primary session"); } - if (++stats.max_sessions_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - tracing_logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_write_sessions {}, flushing_sessions {}", - stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_records_bulk.size(), _flushing_sessions); + if (++stats.dropped_sessions % tracing::log_warning_period == 1) { + tracing_logger.warn("Dropped {} sessions: open_sessions {}, cached_records {} pending_for_write_records {}, flushing_records {}", + stats.dropped_sessions, _active_sessions, _cached_records, _pending_for_write_records_count, _flushing_records); } return trace_state_ptr(); @@ -181,6 +186,7 @@ void tracing::set_trace_probability(double p) { } one_session_records::one_session_records() - : backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state()) {} + : backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state()) + , budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {} } diff --git a/tracing/tracing.hh b/tracing/tracing.hh index d93c3fbffe..4c0a118893 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -51,6 +51,8 @@ namespace tracing { +extern logging::logger tracing_logger; + class trace_state; class tracing; @@ -149,6 +151,10 @@ struct session_record { long started_at = 0; trace_type command = trace_type::NONE; int elapsed = -1; + + bool ready() const { + return elapsed >= 0; + } }; struct one_session_records { @@ -158,10 +164,22 @@ struct one_session_records { std::deque events_recs; std::unique_ptr backend_state_ptr; + // A pointer to the records counter of the corresponding state new records + // of this tracing session should consume from (e.g. "cached" or "pending + // for write"). + uint64_t* budget_ptr; + one_session_records(); + /** + * Consume a single record from the per-shard budget. + */ + void consume_from_budget() { + ++(*budget_ptr); + } + uint64_t size() const { - return events_recs.size() + (session_rec.elapsed >= 0); + return events_recs.size() + session_rec.ready(); } }; @@ -170,21 +188,76 @@ using trace_state_ptr = lw_shared_ptr; class tracing : public seastar::async_sharded_service { public: static const gc_clock::duration write_period; - static constexpr int max_pending_for_write_sessions = 1000; - static constexpr int max_trace_events_per_session = 30; - // Number of max threshold XXX hits when an info message is printed - static constexpr int max_threshold_hits_warning_period = 10000; + // maximum number of sessions pending for write per shard + static constexpr int max_pending_sessions = 1000; + // expectation of an average number of trace records per session + static constexpr int exp_trace_events_per_session = 10; + // maximum allowed pending records per-shard + static constexpr int max_pending_trace_records = max_pending_sessions * exp_trace_events_per_session; + // number of pending sessions that would trigger a write event + static constexpr int write_event_sessions_threshold = 100; + // number of pending records that would trigger a write event + static constexpr int write_event_records_threshold = write_event_sessions_threshold * exp_trace_events_per_session; + // Number of events when an info message is printed + static constexpr int log_warning_period = 10000; struct stats { - uint64_t max_sessions_threshold_hits = 0; - uint64_t max_traces_threshold_hits = 0; - uint64_t trace_events_count = 0; + uint64_t dropped_sessions = 0; + uint64_t dropped_records = 0; + uint64_t trace_records_count = 0; uint64_t trace_errors = 0; } stats; private: + // A number of currently active tracing sessions uint64_t _active_sessions = 0; - uint64_t _flushing_sessions = 0; + + // Below are 3 counters that describe the total amount of tracing records on + // this shard. Each counter describes a state in which a record may be. + // + // Each record may only be in a specific state at every point of time and + // thereby it must be accounted only in one and only one of the three + // counters below at any given time. + // + // The sum of all three counters should not be greater than + // (max_pending_trace_records + write_event_records_threshold) at any time + // (actually it can get as high as a value above plus (max_pending_sessions) + // if all sessions are primary but we won't take this into an account for + // simplicity). + // + // The same is about the number of outstanding sessions: it may not be + // greater than (max_pending_sessions + write_event_sessions_threshold) at + // any time. + // + // If total number of tracing records is greater or equal to the limit + // above, the new trace point is going to be dropped. + // + // If current number or records plus the expected number of trace records + // per session (exp_trace_events_per_session) is greater than the limit + // above new sessions will be dropped. A new session will also be dropped if + // there are too many active sessions. + // + // When the record or a session is dropped the appropriate statistics + // counters are updated and there is a rate-limited warning message printed + // to the log. + // + // Every time a number of records pending for write is greater or equal to + // (write_event_records_threshold) or a number of sessions pending for + // write is greater or equal to (write_event_sessions_threshold) a write + // event is issued. + // + // Every 2 seconds a timer would write all pending for write records + // available so far. + + // Total number of records cached in the active sessions that are not going + // to be written in the next write event + uint64_t _cached_records = 0; + // Total number of records that are currently being written to I/O + uint64_t _flushing_records = 0; + // Total number of records in the _pending_for_write_records_bulk. All of + // them are going to be written to the I/O during the next write event. + uint64_t _pending_for_write_records_count = 0; + records_bulk _pending_for_write_records_bulk; timer _write_timer; bool _down = false; @@ -235,17 +308,19 @@ public: void write_pending_records() { if (_pending_for_write_records_bulk.size()) { - _flushing_sessions += _pending_for_write_records_bulk.size(); + _flushing_records += _pending_for_write_records_count; + stats.trace_records_count += _pending_for_write_records_count; + _pending_for_write_records_count = 0; _tracing_backend_helper_ptr->write_records_bulk(_pending_for_write_records_bulk); _pending_for_write_records_bulk.clear(); } } void write_complete(uint64_t nr = 1) { - if (nr > _flushing_sessions) { - throw std::logic_error("completing more sessions than there are pending"); + if (nr > _flushing_records) { + throw std::logic_error("completing more records than there are pending"); } - _flushing_sessions -= nr; + _flushing_records -= nr; } /** @@ -259,6 +334,12 @@ public: */ trace_state_ptr create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id = std::experimental::nullopt); + void write_maybe() { + if (_pending_for_write_records_count >= write_event_records_threshold || _pending_for_write_records_bulk.size() >= write_event_sessions_threshold) { + write_pending_records(); + } + } + void end_session(lw_shared_ptr records, bool write_now) { --_active_sessions; @@ -268,15 +349,17 @@ public: } try { - _pending_for_write_records_bulk.emplace_back(std::move(records)); + schedule_for_write(std::move(records)); } catch (...) { // OOM: bump up the error counter and ignore ++stats.trace_errors; return; } - if (write_now || _pending_for_write_records_bulk.size() >= max_pending_for_write_sessions) { + if (write_now) { write_pending_records(); + } else { + write_maybe(); } } @@ -301,6 +384,41 @@ public: return _tracing_backend_helper_ptr->allocate_session_state(); } + /** + * Checks if there is enough budget for the @param nr new records + * @param nr number of new records + * + * @return TRUE if there is enough budget, FLASE otherwise + */ + bool have_records_budget(uint64_t nr = 1) { + // We don't want the total amount of pending, active and flushing records to + // bypass the maximum number of pending records plus the number of + // records that are possibly being written write now. + // + // If either records are being created too fast or a backend doesn't + // keep up we want to start dropping records. + // In any case, this should be rare. + if (_pending_for_write_records_count + _cached_records + _flushing_records + nr > max_pending_trace_records + write_event_records_threshold) { + return false; + } + + return true; + } + + + uint64_t* get_cached_records_ptr() { + return &_cached_records; + } + + void schedule_for_write(lw_shared_ptr records) { + _pending_for_write_records_bulk.emplace_back(records); + + // move the current records from a "cached" to "pending for write" state + auto current_records_num = records->size(); + _cached_records -= current_records_num; + _pending_for_write_records_count += current_records_num; + } + private: void write_timer_callback(); }; From 67d537ecb558645681186270abb3fe177479026b Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Wed, 3 Aug 2016 18:11:38 +0300 Subject: [PATCH 6/7] tracing: issue a write event if a single session creates a lot of events Currently write events are issued every time a trace session is closed. However if a single session creates a lot of events we will start dropping them after the total amount of pending records bypasses the limit. This patch will issue a write event before the session end in that case. Since now new events may be added to the active tracing session while it's scheduled for write we have to ensure the following: - Not to add the already pending for write session to the pending bulk. - Grab all pending data in a specific session in a synchronous way during the write event. - Serialize creation of events mutations - otherwise the "monotonic nanos" logic won't work. Signed-off-by: Vlad Zolotarov --- tracing/trace_keyspace_helper.cc | 44 +++++++++++++++++++++-------- tracing/trace_keyspace_helper.hh | 2 +- tracing/trace_state.hh | 10 +++++++ tracing/tracing.hh | 48 ++++++++++++++++++++++++++++++-- 4 files changed, 89 insertions(+), 15 deletions(-) diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index 1708a0c541..c37fde710e 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -54,6 +54,7 @@ const sstring trace_keyspace_helper::EVENTS("events"); struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base { int64_t last_nanos = 0; + semaphore write_sem; virtual ~trace_keyspace_backend_sesssion_state() {} }; @@ -256,9 +257,7 @@ mutation trace_keyspace_helper::make_event_mutation(one_session_records& session return m; } -future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records) { - std::deque& events_records = records->events_recs; - +future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records, std::deque& events_records) { if (events_records.empty()) { return make_ready_future<>(); } @@ -278,15 +277,36 @@ future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr records) { - return futurize::apply([this, records] { - return apply_events_mutation(records); - }).then([this, records] { - if (records->session_rec.ready()) { - logger.trace("{}: storing a session event", records->session_id); - return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr); - } else { - return make_ready_future<>(); - } + // grab events records available so far + return do_with(std::move(records->events_recs), [this, records] (std::deque& events_records) { + records->events_recs.clear(); + + // Check if a session's record is ready before handling events' records. + // + // New event's records and a session's record may become ready while a + // mutation with the current events' records is being written. We don't want + // to allow the situation when a session's record is written before the last + // event record from the same session. + bool session_record_is_ready = records->session_rec.ready(); + + // From this point on - all new data will have to be handled in the next write event + records->data_consumed(); + + // We want to serialize the creation of events mutations in order to ensure + // that mutations the events that were created first are going to be created + // first too. + auto backend_state_ptr = static_cast(records->backend_state_ptr.get()); + semaphore& write_sem = backend_state_ptr->write_sem; + return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] { + return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] { + if (session_record_is_ready) { + logger.trace("{}: storing a session event", records->session_id); + return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr); + } else { + return make_ready_future<>(); + } + }); + }).finally([records] {}); }); } diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 794df0ec39..ad975f0088 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -161,7 +161,7 @@ private: */ future<> flush_one_session_mutations(lw_shared_ptr records); - future<> apply_events_mutation(lw_shared_ptr records); + future<> apply_events_mutation(lw_shared_ptr records, std::deque& events_records); /** * Get a schema_ptr by a table (UU)ID. If not found will try to get it by diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 29ca3f49bb..c04bc88ea6 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -322,6 +322,16 @@ inline void trace_state::trace(sstring message) { try { _records->events_recs.emplace_back(std::move(message), elapsed(), i_tracing_backend_helper::wall_clock::now()); _records->consume_from_budget(); + + // If we have aggregated enough records - schedule them for write already. + // + // We prefer the traces to be written after the session is over. However + // if there is a session that creates a lot of traces - we want to write + // them before we start to drop new records. + if (_records->events_recs.size() >= tracing::exp_trace_events_per_session) { + _local_tracing_ptr->schedule_for_write(_records); + _local_tracing_ptr->write_maybe(); + } } catch (...) { // Bump up an error counter and ignore ++_local_tracing_ptr->stats.trace_errors; diff --git a/tracing/tracing.hh b/tracing/tracing.hh index 4c0a118893..99cf0d36d0 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -121,7 +121,10 @@ public: virtual future<> stop() = 0; /** - * Write a bulk of tracing records + * Write a bulk of tracing records. + * + * This function has to clear a scheduled state of each one_session_records object + * in the @param bulk after it has been actually passed to the backend for writing. * * @param bulk a bulk of records */ @@ -157,7 +160,8 @@ struct session_record { } }; -struct one_session_records { +class one_session_records { +public: utils::UUID session_id; session_record session_rec; gc_clock::duration ttl; @@ -178,9 +182,31 @@ struct one_session_records { ++(*budget_ptr); } + /** + * Should be called when a record is scheduled for write. + * From that point till data_consumed() call all new records will be written + * in the next write event. + */ + inline void set_pending_for_write(); + + /** + * Should be called after all data pending to be written in this record has + * been processed. + * From that point on new records are cached internally and have to be + * explicitly committed for write in order to be written during the write event. + */ + inline void data_consumed(); + + bool is_pending_for_write() const { + return _is_pending_for_write; + } + uint64_t size() const { return events_recs.size() + session_rec.ready(); } + +private: + bool _is_pending_for_write = false; }; using trace_state_ptr = lw_shared_ptr; @@ -405,13 +431,21 @@ public: return true; } + uint64_t* get_pending_records_ptr() { + return &_pending_for_write_records_count; + } uint64_t* get_cached_records_ptr() { return &_cached_records; } void schedule_for_write(lw_shared_ptr records) { + if (records->is_pending_for_write()) { + return; + } + _pending_for_write_records_bulk.emplace_back(records); + records->set_pending_for_write(); // move the current records from a "cached" to "pending for write" state auto current_records_num = records->size(); @@ -422,4 +456,14 @@ public: private: void write_timer_callback(); }; + +void one_session_records::set_pending_for_write() { + _is_pending_for_write = true; + budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr(); +} + +void one_session_records::data_consumed() { + _is_pending_for_write = false; + budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr(); +} } From 5deec0e327d3ccd87ac01f21626fedc7bcc31ff9 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 2 Aug 2016 17:21:38 +0300 Subject: [PATCH 7/7] tracing::write_complete(): improve a message in case of a logic error Improve a message if there is a logic error and add logging of such errors. Signed-off-by: Vlad Zolotarov --- tracing/trace_keyspace_helper.cc | 2 ++ tracing/tracing.hh | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index c37fde710e..bd20bec348 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -194,6 +194,8 @@ void trace_keyspace_helper::write_one_session_records(lw_shared_ptr _flushing_records) { - throw std::logic_error("completing more records than there are pending"); + throw std::logic_error(seastar::format("completing more records ({:d}) than there are pending ({:d})", nr, _flushing_records)); } _flushing_records -= nr; }