diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index a8add2b0f7..bd20bec348 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -52,6 +52,12 @@ const sstring trace_keyspace_helper::KEYSPACE_NAME("system_traces"); const sstring trace_keyspace_helper::SESSIONS("sessions"); const sstring trace_keyspace_helper::EVENTS("events"); +struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base { + int64_t last_nanos = 0; + semaphore write_sem; + virtual ~trace_keyspace_backend_sesssion_state() {} +}; + trace_keyspace_helper::trace_keyspace_helper(tracing& tr) : i_tracing_backend_helper(tr) , _registrations{ @@ -174,63 +180,55 @@ future<> trace_keyspace_helper::start() { } } -void trace_keyspace_helper::write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) { - try { - _mutation_makers[session_id].first = [request = std::move(request), client, parameters = std::move(parameters), started_at, command, elapsed, ttl, this] (const utils::UUID& session_id) { - return make_session_mutation(session_id, client, parameters, request, started_at, type_to_string(command), elapsed, ttl); - }; - } catch (...) { - // OOM: ignore - } +void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { + with_gate(_pending_writes, [this, records = std::move(records)] { + auto num_records = records->size(); + return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); }); + }).handle_exception([this] (auto ep) { + try { + ++_stats.tracing_errors; + std::rethrow_exception(ep); + } catch (exceptions::overloaded_exception&) { + logger.warn("Too many nodes are overloaded to save trace events"); + } catch (bad_column_family& e) { + if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) { + logger.warn("Tracing is enabled but {}", e.what()); + } + } catch (std::logic_error& e) { + logger.error(e.what()); + } catch (...) { + // TODO: Handle some more exceptions maybe? + } + }).discard_result(); } -void trace_keyspace_helper::write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) { - try { - _mutation_makers[session_id].second.makers.emplace_back([message = std::move(message), elapsed, ttl, event_time_point, this] (const utils::UUID& session_id) mutable { - return make_event_mutation(session_id, message, elapsed, _local_tracing.get_thread_name(), ttl, event_time_point); - }); - } catch (...) { - // OOM: ignore - } +void trace_keyspace_helper::write_records_bulk(records_bulk& bulk) { + logger.trace("Writing {} sessions", bulk.size()); + std::for_each(bulk.begin(), bulk.end(), [this] (records_bulk::value_type& one_session_records_ptr) { + write_one_session_records(std::move(one_session_records_ptr)); + }); } -mutation trace_keyspace_helper::make_session_mutation( - const utils::UUID& session_id, - gms::inet_address client, - const std::unordered_map& parameters, - const sstring& request, - long started_at, - const sstring& command, - int elapsed, - gc_clock::duration ttl) { +mutation trace_keyspace_helper::make_session_mutation(const one_session_records& session_records) { schema_ptr schema = get_schema_ptr_or_create(_sessions_id, SESSIONS, _sessions_create_cql, [this] (const schema_ptr& s) { return cache_sessions_table_handles(s); }); - auto key = partition_key::from_singular(*schema, session_id); + auto key = partition_key::from_singular(*schema, session_records.session_id); + auto ttl = session_records.ttl; + const session_record& record = session_records.session_rec; auto timestamp = api::new_timestamp(); mutation m(key, schema); auto& cells = m.partition().clustered_row(clustering_key::make_empty(*schema)).cells(); - cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(client.addr()), ttl)); + cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(record.client.addr()), ttl)); cells.apply(*_coordinator_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl)); - cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(request), ttl)); - cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(started_at), ttl)); - cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(command), ttl)); - cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)elapsed), ttl)); + cells.apply(*_request_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.request), ttl)); + cells.apply(*_started_at_column, atomic_cell::make_live(timestamp, timestamp_type->decompose(record.started_at), ttl)); + cells.apply(*_command_column, atomic_cell::make_live(timestamp, utf8_type->decompose(type_to_string(record.command)), ttl)); + cells.apply(*_duration_column, atomic_cell::make_live(timestamp, int32_type->decompose((int32_t)record.elapsed), ttl)); std::vector> map_cell; - for (auto& param_pair : parameters) { + for (auto& param_pair : record.parameters) { map_cell.emplace_back(utf8_type->decompose(param_pair.first), atomic_cell::make_live(timestamp, utf8_type->decompose(param_pair.second), ttl)); } @@ -241,80 +239,81 @@ mutation trace_keyspace_helper::make_session_mutation( return m; } -mutation trace_keyspace_helper::make_event_mutation(const utils::UUID& session_id, - const sstring& message, - int elapsed, - const sstring& thread_name, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) { +mutation trace_keyspace_helper::make_event_mutation(one_session_records& session_records, const event_record& record) { schema_ptr schema = get_schema_ptr_or_create(_events_id, EVENTS, _events_create_cql, [this] (const schema_ptr& s) { return cache_events_table_handles(s); }); - auto key = partition_key::from_singular(*schema, session_id); + auto key = partition_key::from_singular(*schema, session_records.session_id); + auto ttl = session_records.ttl; + auto backend_state_ptr = static_cast(session_records.backend_state_ptr.get()); + int64_t& last_event_nanos = backend_state_ptr->last_nanos; auto timestamp = api::new_timestamp(); mutation m(key, schema); - auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(event_time_point)))).cells(); + auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(last_event_nanos, record.event_time_point)))).cells(); - cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(message), ttl)); + cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.message), ttl)); cells.apply(*_source_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl)); - cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(thread_name), ttl)); - - assert(elapsed >= 0); - cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(elapsed), ttl)); + cells.apply(*_thread_column, atomic_cell::make_live(timestamp, utf8_type->decompose(_local_tracing.get_thread_name()), ttl)); + cells.apply(*_source_elapsed_column, atomic_cell::make_live(timestamp, int32_type->decompose(record.elapsed), ttl)); return m; } -future<> trace_keyspace_helper::flush_one_session_mutations(utils::UUID session_id, std::pair& mutation_makers) { - return make_ready_future<>().then([this, session_id, events_makers = std::move(mutation_makers.second.makers)] { - if (events_makers.size()) { - // Reset the "monotinic time point" state machine since it's - // relevant in a context of a single tracing session only. The - // events from different sessions will differ by a session UUID. - reset_monotonic_tp(); - logger.trace("{}: events number is {}", session_id, events_makers.size()); - mutation m((*events_makers.begin())(session_id)); - std::for_each(std::next(events_makers.begin()), events_makers.end(), [&m, &session_id] (const mutation_maker& maker) mutable { m.apply(maker(session_id)); }); +future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records, std::deque& events_records) { + if (events_records.empty()) { + return make_ready_future<>(); + } + + logger.trace("{}: storing {} events records", records->session_id, events_records.size()); + + mutation m(make_event_mutation(*records, *events_records.begin())); + + return do_with(std::move(m), std::move(events_records), [this, records] (mutation& m, std::deque& events_records) { + return do_for_each(std::next(events_records.begin()), events_records.end(), [this, &m, &events_records, all_records = records] (event_record& one_event_record) { + m.apply(make_event_mutation(*all_records, one_event_record)); + return make_ready_future<>(); + }).then([&m] { return service::get_local_storage_proxy().mutate({std::move(m)}, db::consistency_level::ANY, nullptr); - } else { - return make_ready_future<>(); - } - }).then([session_id = std::move(session_id), session_maker = std::move(mutation_makers.first)] { - if (session_maker) { - logger.trace("{}: storing a session event", session_id); - return service::get_local_storage_proxy().mutate({session_maker(session_id)}, db::consistency_level::ANY, nullptr); - } else { - return make_ready_future<>(); - } + }); }); } -void trace_keyspace_helper::kick() { - logger.trace("flushing {} sessions", _mutation_makers.size()); - parallel_for_each(_mutation_makers,[this](decltype(_mutation_makers)::value_type& uuid_mutation_makers) { - return with_gate(_pending_writes, [this, &uuid_mutation_makers] { - logger.trace("{}: flushing traces", uuid_mutation_makers.first); - return this->flush_one_session_mutations(uuid_mutation_makers.first, uuid_mutation_makers.second).finally([this] { _local_tracing.write_complete(); }); - }).handle_exception([this] (auto ep) { - try { - ++_stats.tracing_errors; - std::rethrow_exception(ep); - } catch (exceptions::overloaded_exception&) { - logger.warn("Too many nodes are overloaded to save trace events"); - } catch (bad_column_family& e) { - if (_stats.bad_column_family_errors++ % bad_column_family_message_period == 0) { - logger.warn("Tracing is enabled but {}", e.what()); - } - } catch (...) { - // TODO: Handle some more exceptions maybe? - } - }); - }).discard_result(); +future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr records) { + // grab events records available so far + return do_with(std::move(records->events_recs), [this, records] (std::deque& events_records) { + records->events_recs.clear(); - // We can clear the hash table here because we cared to capture all relevant - // data from it in lambdas' capture-lists inside - // flush_one_session_mutations() before any asynchronous call. - _mutation_makers.clear(); + // Check if a session's record is ready before handling events' records. + // + // New event's records and a session's record may become ready while a + // mutation with the current events' records is being written. We don't want + // to allow the situation when a session's record is written before the last + // event record from the same session. + bool session_record_is_ready = records->session_rec.ready(); + + // From this point on - all new data will have to be handled in the next write event + records->data_consumed(); + + // We want to serialize the creation of events mutations in order to ensure + // that mutations the events that were created first are going to be created + // first too. + auto backend_state_ptr = static_cast(records->backend_state_ptr.get()); + semaphore& write_sem = backend_state_ptr->write_sem; + return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] { + return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] { + if (session_record_is_ready) { + logger.trace("{}: storing a session event", records->session_id); + return service::get_local_storage_proxy().mutate({make_session_mutation(*records)}, db::consistency_level::ANY, nullptr); + } else { + return make_ready_future<>(); + } + }); + }).finally([records] {}); + }); +} + +std::unique_ptr trace_keyspace_helper::allocate_session_state() const { + return std::make_unique(); } using registry = class_registrator; diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index 02b5632e7c..ad975f0088 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -47,28 +47,15 @@ namespace tracing { -class trace_keyspace_helper : public i_tracing_backend_helper { +class trace_keyspace_helper final : public i_tracing_backend_helper { public: static const sstring KEYSPACE_NAME; static const sstring SESSIONS; static const sstring EVENTS; private: - using mutation_maker = std::function; - static constexpr int bad_column_family_message_period = 10000; - struct events_mutation_makers { - std::vector makers; - public: - events_mutation_makers() { - makers.reserve(tracing::max_trace_events_per_session); - } - }; - - // a hash table of session ID to one session mutation and a vector of events mutations - std::unordered_map> _mutation_makers; - seastar::gate _pending_writes; sstring _sessions_create_cql; @@ -94,8 +81,6 @@ private: uint64_t bad_column_family_errors = 0; } _stats; - int64_t _last_event_nanos = 0; - scollectd::registrations _registrations; public: @@ -110,59 +95,46 @@ public: virtual future<> start() override; virtual future<> stop() override { - kick(); return _pending_writes.close(); }; - virtual void write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) override; - - virtual void write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) override; + virtual void write_records_bulk(records_bulk& bulk) override; + virtual std::unique_ptr allocate_session_state() const override; private: /** - * Makes a monotonically increasing value in 100ns based on the given time stamp. + * Write records of a single tracing session + * + * @param records records to write + */ + void write_one_session_records(lw_shared_ptr records); + + /** + * Makes a monotonically increasing value in 100ns ("nanos") based on the given time + * stamp and the "nanos" value of the previous event. * * If the amount of 100s of ns evaluated from the @param tp is equal to the - * value returned in the previouse call to get_next_nanos() (without a reset_monotonic_tp() - * call in between), increment it by one. + * given @param last_event_nanos increment @param last_event_nanos by one + * and return a time point based its new value. * + * @param last_event_nanos a reference to the last nanos to align the given time point to. * @param tp the amount of time passed since the Epoch that will be used for the calculation. * * @return the monotonically increasing vlaue in 100s of ns based on the - * given time stamp. + * given time stamp and on the "nanos" value of the previous event. */ - wall_clock::time_point make_monotonic_UUID_tp(wall_clock::time_point tp) { + wall_clock::time_point make_monotonic_UUID_tp(int64_t& last_event_nanos, wall_clock::time_point tp) { using namespace std::chrono; auto tp_nanos = duration_cast(tp.time_since_epoch()).count() / 100; - if (tp_nanos > _last_event_nanos) { - _last_event_nanos = tp_nanos; + if (tp_nanos > last_event_nanos) { + last_event_nanos = tp_nanos; return tp; } else { - return wall_clock::time_point(nanoseconds((++_last_event_nanos) * 100)); + return wall_clock::time_point(nanoseconds((++last_event_nanos) * 100)); } } - /** - * Reset the make_monotonic_tp() state machine. - */ - void reset_monotonic_tp() { - _last_event_nanos = 0; - } - - virtual void kick() override; - /** * Tries to create a table with a given name and using the provided CQL * command. @@ -179,14 +151,17 @@ private: * Flush mutations of one particular tracing session. First "events" * mutations and then, when they are complete, a "sessions" mutation. * - * @param session_id ID of a tracing session - * @param mutation_makers a pair of a "sessions" mutation maker and an array - * of "events" mutations makers. + * @note This function guaranties that it'll handle exactly the same number + * of records @param records had when the function was invoked. + * + * @param records records describing the session's records * * @return A future that resolves when applying of above mutations is * complete. */ - future<> flush_one_session_mutations(utils::UUID session_id, std::pair& mutation_makers); + future<> flush_one_session_mutations(lw_shared_ptr records); + + future<> apply_events_mutation(lw_shared_ptr records, std::deque& events_records); /** * Get a schema_ptr by a table (UU)ID. If not found will try to get it by @@ -230,33 +205,26 @@ private: */ bool cache_events_table_handles(const schema_ptr& s); - mutation make_session_mutation(const utils::UUID& session_id, - gms::inet_address client, - const std::unordered_map& parameters, - const sstring& request, - long started_at, - const sstring& command, - int elapsed, - gc_clock::duration ttl); + /** + * Create a mutation for a new session record + * + * @param all_records_handle handle to access an object with all records of this session + * + * @return the relevant mutation + */ + mutation make_session_mutation(const one_session_records& all_records_handle); /** * Create a mutation for a new trace point record * - * @param session_id tracing session ID - * @param message trace record message - * @param elapsed time elapsed since begin() till this trace point - * @param thread_name - * @param ttl - * @param event_time_stamp wall clock time point of this trace event + * @param session_records handle to access an object with all records of this session. + * It's needed here in order to update the last event's mutation + * timestamp value stored inside it. + * @param record data describing this trace event * * @return the relevant mutation */ - mutation make_event_mutation(const utils::UUID& session_id, - const sstring& message, - int elapsed, - const sstring& thread_name, - gc_clock::duration ttl, - wall_clock::time_point event_time_stamp); + mutation make_event_mutation(one_session_records& session_records, const event_record& record); }; struct bad_column_family : public std::exception { diff --git a/tracing/trace_state.cc b/tracing/trace_state.cc index 55cc30cd97..cc06ad63f5 100644 --- a/tracing/trace_state.cc +++ b/tracing/trace_state.cc @@ -47,14 +47,14 @@ namespace tracing { -static logging::logger logger("trace_state"); +logging::logger trace_state_logger("trace_state"); -std::unordered_map trace_state::get_params() { +void trace_state::build_parameters_map() { if (!_params_ptr) { - return {}; + return; } - std::unordered_map params_map; + auto& params_map = _records->session_rec.parameters; params_values& vals = *_params_ptr; if (vals.batchlog_endpoints) { @@ -80,8 +80,6 @@ std::unordered_map trace_state::get_params() { if (vals.user_timestamp) { params_map.emplace("user_timestamp", seastar::format("{:d}", *vals.user_timestamp)); } - - return params_map; } trace_state::~trace_state() { @@ -92,32 +90,29 @@ trace_state::~trace_state() { // event and we don't want to cripple the primary session by // "stealing" one trace() event from it. // - // We do want to report it in statistics however. If for instance - // there are a lot of tracing sessions that only open itself and - // then do nothing - they will create a lot of session_record events - // and we do want to know about it. - ++_pending_trace_events; - _local_backend.write_session_record(_session_id, _client, get_params(), std::move(_request), _started_at, _type, elapsed(), _ttl); - } + // We do want to account them however. If for instance there are a + // lot of tracing sessions that only open itself and then do nothing + // - they will create a lot of session_record events and we do want + // to handle this case properly. + _records->consume_from_budget(); - _local_tracing_ptr->end_session(); - - if (_write_on_close) { - _local_tracing_ptr->write_pending_records(); - } - - // update some stats and get out... - auto& tracing_stats = _local_tracing_ptr->stats; - - tracing_stats.trace_events_count += _pending_trace_events; - - if (_pending_trace_events >= tracing::max_trace_events_per_session) { - logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", _session_id); - - if (++tracing_stats.max_traces_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - logger.warn("Maximum traces per session limit is hit {} times", tracing_stats.max_traces_threshold_hits); + // build_parameters_map() may throw. We don't want to record the + // session's record in this case since its data may be incomplete. + // These events should be really rare however, therefore we don't + // want to optimize this flow (e.g. rollback the corresponding + // events' records that have already been sent to I/O). + try { + build_parameters_map(); + _records->session_rec.elapsed = elapsed(); + } catch (...) { + // Bump up an error counter, drop any pending events records and + // continue + ++_local_tracing_ptr->stats.trace_errors; + _records->events_recs.clear(); } } + + _local_tracing_ptr->end_session(std::move(_records), _write_on_close); } } } diff --git a/tracing/trace_state.hh b/tracing/trace_state.hh index 6d30fa8ceb..c04bc88ea6 100644 --- a/tracing/trace_state.hh +++ b/tracing/trace_state.hh @@ -50,17 +50,17 @@ namespace tracing { +extern logging::logger trace_state_logger; + class trace_state final { using clock_type = std::chrono::steady_clock; private: - utils::UUID _session_id; - trace_type _type; + lw_shared_ptr _records; bool _write_on_close; // Used for calculation of time passed since the beginning of a tracing // session till each tracing event. clock_type::time_point _start; - gc_clock::duration _ttl; // TRUE for a primary trace_state object bool _primary; bool _tracing_began = false; @@ -69,7 +69,6 @@ private: sstring _request; int _pending_trace_events = 0; shared_ptr _local_tracing_ptr; - i_tracing_backend_helper& _local_backend; struct params_values { std::experimental::optional> batchlog_endpoints; @@ -107,23 +106,24 @@ private: public: trace_state(trace_type type, bool write_on_close, const std::experimental::optional& session_id = std::experimental::nullopt) - : _session_id(session_id ? *session_id : utils::UUID_gen::get_time_UUID()) - , _type(type) - , _write_on_close(write_on_close) - , _ttl(ttl_by_type(_type)) + : _write_on_close(write_on_close) , _primary(!session_id) , _local_tracing_ptr(tracing::get_local_tracing_instance().shared_from_this()) - , _local_backend(_local_tracing_ptr->backend_helper()) - { } + { + _records = make_lw_shared(); + _records->session_id = session_id ? *session_id : utils::UUID_gen::get_time_UUID(); + _records->ttl = ttl_by_type(type); + _records->session_rec.command = type; + } ~trace_state(); const utils::UUID& get_session_id() const { - return _session_id; + return _records->session_id; } trace_type get_type() const { - return _type; + return _records->session_rec.command; } bool get_write_on_close() const { @@ -163,9 +163,9 @@ private: */ void begin(sstring request, gms::inet_address client) { begin(); - _started_at = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - _request = std::move(request); - _client = std::move(client); + _records->session_rec.client = client; + _records->session_rec.request = std::move(request); + _records->session_rec.started_at = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); } template @@ -249,7 +249,12 @@ private: _params_ptr->user_timestamp.emplace(val); } - std::unordered_map get_params(); + /** + * Fill the map in a session's record with the values set so far. + * + * @param params_map the map to fill + */ + void build_parameters_map(); /** * Add a single trace entry - a special case for a simple string. @@ -298,12 +303,39 @@ inline void trace_state::trace(sstring message) { throw std::logic_error("trying to use a trace() before begin() for \"" + message + "\" tracepoint"); } - if (_pending_trace_events >= tracing::max_trace_events_per_session) { + // We don't want the total amount of pending, active and flushing records to + // bypass two times the maximum number of pending records. + // + // If either records are being created too fast or a backend doesn't + // keep up we want to start dropping records. + // In any case, this should be rare, therefore we don't try to optimize this + // flow. + if (!_local_tracing_ptr->have_records_budget()) { + tracing_logger.trace("{}: Maximum number of traces is reached. Some traces are going to be dropped", get_session_id()); + if ((++_local_tracing_ptr->stats.dropped_records) % tracing::log_warning_period == 1) { + tracing_logger.warn("Maximum records limit is hit {} times", _local_tracing_ptr->stats.dropped_records); + } + return; } - _local_backend.write_event_record(_session_id, std::move(message), elapsed(), _ttl, i_tracing_backend_helper::wall_clock::now()); - ++_pending_trace_events; + try { + _records->events_recs.emplace_back(std::move(message), elapsed(), i_tracing_backend_helper::wall_clock::now()); + _records->consume_from_budget(); + + // If we have aggregated enough records - schedule them for write already. + // + // We prefer the traces to be written after the session is over. However + // if there is a session that creates a lot of traces - we want to write + // them before we start to drop new records. + if (_records->events_recs.size() >= tracing::exp_trace_events_per_session) { + _local_tracing_ptr->schedule_for_write(_records); + _local_tracing_ptr->write_maybe(); + } + } catch (...) { + // Bump up an error counter and ignore + ++_local_tracing_ptr->stats.trace_errors; + } } template diff --git a/tracing/tracing.cc b/tracing/tracing.cc index 54eeb56bba..53e0553562 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -44,7 +44,7 @@ namespace tracing { -static logging::logger logger("tracing"); +logging::logger tracing_logger("tracing"); const gc_clock::duration tracing::tracing::write_period = std::chrono::seconds(2); @@ -60,16 +60,16 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) , _registrations{ scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "max_sessions_threshold_hits") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_sessions_threshold_hits)), + , "total_operations", "dropped_sessions") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_sessions)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "max_traces_threshold_hits") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.max_traces_threshold_hits)), + , "total_operations", "dropped_records") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.dropped_records)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "total_operations", "trace_events_count") - , scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_events_count)), + , "total_operations", "trace_records_count") + , scollectd::make_typed(scollectd::data_type::DERIVE, stats.trace_records_count)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance , "total_operations", "trace_errors") @@ -80,17 +80,21 @@ tracing::tracing(const sstring& tracing_backend_helper_class_name) , scollectd::make_typed(scollectd::data_type::GAUGE, _active_sessions)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "queue_length", "pending_for_write_sessions") - , scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_sessions)), + , "queue_length", "cached_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _cached_records)), scollectd::add_polled_metric(scollectd::type_instance_id("tracing" , scollectd::per_cpu_plugin_instance - , "queue_length", "flushing_sessions") - , scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_sessions))} + , "queue_length", "pending_for_write_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _pending_for_write_records_count)), + scollectd::add_polled_metric(scollectd::type_instance_id("tracing" + , scollectd::per_cpu_plugin_instance + , "queue_length", "flushing_records") + , scollectd::make_typed(scollectd::data_type::GAUGE, _flushing_records))} , _gen(std::random_device()()) { try { _tracing_backend_helper_ptr = create_object(tracing_backend_helper_class_name, *this); } catch (no_such_class& e) { - logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name); + tracing_logger.error("Can't create tracing backend helper {}: not supported", tracing_backend_helper_class_name); throw; } catch (...) { throw; @@ -108,16 +112,17 @@ future<> tracing::create_tracing(const sstring& tracing_backend_class_name) { trace_state_ptr tracing::create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id) { trace_state_ptr tstate; try { - if (_active_sessions + _pending_for_write_sessions + _flushing_sessions > 2 * max_pending_for_write_sessions) { + // Don't create a session if its records are likely to be dropped + if (!have_records_budget(exp_trace_events_per_session) || _active_sessions >= max_pending_sessions + write_event_sessions_threshold) { if (session_id) { - logger.trace("{}: Maximum sessions count is reached. Dropping a secondary session", session_id); + tracing_logger.trace("{}: Too many outstanding tracing records or sessions. Dropping a secondary session", session_id); } else { - logger.trace("Maximum sessions count is reached. Dropping a primary session"); + tracing_logger.trace("Too many outstanding tracing records or sessions. Dropping a primary session"); } - if (++stats.max_sessions_threshold_hits % tracing::max_threshold_hits_warning_period == 1) { - logger.warn("Maximum sessions limit is hit {} times: open_sessions {}, pending_for_flush_sessions {}, flushing_sessions {}", - stats.max_sessions_threshold_hits, _active_sessions, _pending_for_write_sessions, _flushing_sessions); + if (++stats.dropped_sessions % tracing::log_warning_period == 1) { + tracing_logger.warn("Dropped {} sessions: open_sessions {}, cached_records {} pending_for_write_records {}, flushing_records {}", + stats.dropped_sessions, _active_sessions, _cached_records, _pending_for_write_records_count, _flushing_records); } return trace_state_ptr(); @@ -142,18 +147,22 @@ void tracing::write_timer_callback() { return; } - logger.trace("Timer kicks in: {}", _pending_for_write_sessions ? "writing" : "not writing"); + tracing_logger.trace("Timer kicks in: {}", _pending_for_write_records_bulk.size() ? "writing" : "not writing"); write_pending_records(); _write_timer.arm(write_period); } future<> tracing::shutdown() { - logger.info("Asked to shut down"); + tracing_logger.info("Asked to shut down"); + if (_down) { + throw std::logic_error("tracing: shutdown() called for the service that is already down"); + } + write_pending_records(); _down = true; _write_timer.cancel(); return _tracing_backend_helper_ptr->stop().then([] { - logger.info("Tracing is down"); + tracing_logger.info("Tracing is down"); }); } @@ -173,7 +182,11 @@ void tracing::set_trace_probability(double p) { _trace_probability = p; _normalized_trace_probability = std::llround(_trace_probability * (_gen.max() + 1)); - logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability); -} + tracing_logger.info("Setting tracing probability to {} (normalized {})", _trace_probability, _normalized_trace_probability); +} + +one_session_records::one_session_records() + : backend_state_ptr(tracing::get_local_tracing_instance().allocate_backend_session_state()) + , budget_ptr(tracing::get_local_tracing_instance().get_cached_records_ptr()) {} } diff --git a/tracing/tracing.hh b/tracing/tracing.hh index 8b720e0dcc..b032c1372a 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -51,6 +51,8 @@ namespace tracing { +extern logging::logger tracing_logger; + class trace_state; class tracing; @@ -99,6 +101,13 @@ public: { } }; +struct one_session_records; +using records_bulk = std::deque>; + +struct backend_session_state_base { + virtual ~backend_session_state_base() {}; +}; + struct i_tracing_backend_helper { using wall_clock = std::chrono::system_clock; @@ -112,74 +121,170 @@ public: virtual future<> stop() = 0; /** - * Write a new tracing session record + * Write a bulk of tracing records. * - * @param session_id tracing session ID - * @param client client IP - * @param parameters optional parameters - * @param request request we are tracing - * @param started_at amount of milliseconds passed since Epoch before this - * session is started (on a Coordinator Node) - * @param command a type of this trace - * @param elapsed number of microseconds this tracing session took - * @param ttl TTL of a session record + * This function has to clear a scheduled state of each one_session_records object + * in the @param bulk after it has been actually passed to the backend for writing. + * + * @param bulk a bulk of records */ - virtual void write_session_record(const utils::UUID& session_id, - gms::inet_address client, - std::unordered_map parameters, - sstring request, - long started_at, - trace_type command, - int elapsed, - gc_clock::duration ttl) = 0; + virtual void write_records_bulk(records_bulk& bulk) = 0; - /** - * Write a new tracing event record - * @param session_id tracing session ID - * @param message tracing message - * @param elapsed number of microseconds passed since a beginning of a - * corresponding tracing session till this event - * @param ttl TTL of the event record - * @param event_time_point time point when a record was taken - */ - virtual void write_event_record(const utils::UUID& session_id, - sstring message, - int elapsed, - gc_clock::duration ttl, - wall_clock::time_point event_time_point) = 0; + virtual std::unique_ptr allocate_session_state() const = 0; private: - /** - * Commit all pending tracing records to the underlying storage. - * The implementation has to call tracing::tracing::write_complete(nr) for - * each "nr" completed records once they are written to the backend. - */ - virtual void kick() = 0; - friend class tracing; }; +struct event_record { + sstring message; + int elapsed; + i_tracing_backend_helper::wall_clock::time_point event_time_point; + + event_record(sstring message_, int elapsed_, i_tracing_backend_helper::wall_clock::time_point event_time_point_) + : message(std::move(message_)) + , elapsed(elapsed_) + , event_time_point(event_time_point_) {} +}; + +struct session_record { + gms::inet_address client; + std::unordered_map parameters; + sstring request; + long started_at = 0; + trace_type command = trace_type::NONE; + int elapsed = -1; + + bool ready() const { + return elapsed >= 0; + } +}; + +class one_session_records { +public: + utils::UUID session_id; + session_record session_rec; + gc_clock::duration ttl; + std::deque events_recs; + std::unique_ptr backend_state_ptr; + + // A pointer to the records counter of the corresponding state new records + // of this tracing session should consume from (e.g. "cached" or "pending + // for write"). + uint64_t* budget_ptr; + + one_session_records(); + + /** + * Consume a single record from the per-shard budget. + */ + void consume_from_budget() { + ++(*budget_ptr); + } + + /** + * Should be called when a record is scheduled for write. + * From that point till data_consumed() call all new records will be written + * in the next write event. + */ + inline void set_pending_for_write(); + + /** + * Should be called after all data pending to be written in this record has + * been processed. + * From that point on new records are cached internally and have to be + * explicitly committed for write in order to be written during the write event. + */ + inline void data_consumed(); + + bool is_pending_for_write() const { + return _is_pending_for_write; + } + + uint64_t size() const { + return events_recs.size() + session_rec.ready(); + } + +private: + bool _is_pending_for_write = false; +}; + using trace_state_ptr = lw_shared_ptr; class tracing : public seastar::async_sharded_service { public: static const gc_clock::duration write_period; - static constexpr int max_pending_for_write_sessions = 1000; - static constexpr int max_trace_events_per_session = 30; - // Number of max threshold XXX hits when an info message is printed - static constexpr int max_threshold_hits_warning_period = 10000; + // maximum number of sessions pending for write per shard + static constexpr int max_pending_sessions = 1000; + // expectation of an average number of trace records per session + static constexpr int exp_trace_events_per_session = 10; + // maximum allowed pending records per-shard + static constexpr int max_pending_trace_records = max_pending_sessions * exp_trace_events_per_session; + // number of pending sessions that would trigger a write event + static constexpr int write_event_sessions_threshold = 100; + // number of pending records that would trigger a write event + static constexpr int write_event_records_threshold = write_event_sessions_threshold * exp_trace_events_per_session; + // Number of events when an info message is printed + static constexpr int log_warning_period = 10000; struct stats { - uint64_t max_sessions_threshold_hits = 0; - uint64_t max_traces_threshold_hits = 0; - uint64_t trace_events_count = 0; + uint64_t dropped_sessions = 0; + uint64_t dropped_records = 0; + uint64_t trace_records_count = 0; uint64_t trace_errors = 0; } stats; private: + // A number of currently active tracing sessions uint64_t _active_sessions = 0; - uint64_t _pending_for_write_sessions = 0; - uint64_t _flushing_sessions = 0; + + // Below are 3 counters that describe the total amount of tracing records on + // this shard. Each counter describes a state in which a record may be. + // + // Each record may only be in a specific state at every point of time and + // thereby it must be accounted only in one and only one of the three + // counters below at any given time. + // + // The sum of all three counters should not be greater than + // (max_pending_trace_records + write_event_records_threshold) at any time + // (actually it can get as high as a value above plus (max_pending_sessions) + // if all sessions are primary but we won't take this into an account for + // simplicity). + // + // The same is about the number of outstanding sessions: it may not be + // greater than (max_pending_sessions + write_event_sessions_threshold) at + // any time. + // + // If total number of tracing records is greater or equal to the limit + // above, the new trace point is going to be dropped. + // + // If current number or records plus the expected number of trace records + // per session (exp_trace_events_per_session) is greater than the limit + // above new sessions will be dropped. A new session will also be dropped if + // there are too many active sessions. + // + // When the record or a session is dropped the appropriate statistics + // counters are updated and there is a rate-limited warning message printed + // to the log. + // + // Every time a number of records pending for write is greater or equal to + // (write_event_records_threshold) or a number of sessions pending for + // write is greater or equal to (write_event_sessions_threshold) a write + // event is issued. + // + // Every 2 seconds a timer would write all pending for write records + // available so far. + + // Total number of records cached in the active sessions that are not going + // to be written in the next write event + uint64_t _cached_records = 0; + // Total number of records that are currently being written to I/O + uint64_t _flushing_records = 0; + // Total number of records in the _pending_for_write_records_bulk. All of + // them are going to be written to the I/O during the next write event. + uint64_t _pending_for_write_records_count = 0; + + records_bulk _pending_for_write_records_bulk; timer _write_timer; bool _down = false; std::unique_ptr _tracing_backend_helper_ptr; @@ -228,21 +333,20 @@ public: future<> shutdown(); void write_pending_records() { - // if service is down - do nothing - if (_down) { - return; + if (_pending_for_write_records_bulk.size()) { + _flushing_records += _pending_for_write_records_count; + stats.trace_records_count += _pending_for_write_records_count; + _pending_for_write_records_count = 0; + _tracing_backend_helper_ptr->write_records_bulk(_pending_for_write_records_bulk); + _pending_for_write_records_bulk.clear(); } - - _flushing_sessions += _pending_for_write_sessions; - _pending_for_write_sessions = 0; - _tracing_backend_helper_ptr->kick(); } void write_complete(uint64_t nr = 1) { - if (nr > _flushing_sessions) { - throw std::logic_error("completing more sessions than there are pending"); + if (nr > _flushing_records) { + throw std::logic_error(seastar::format("completing more records ({:d}) than there are pending ({:d})", nr, _flushing_records)); } - _flushing_sessions -= nr; + _flushing_records -= nr; } /** @@ -256,14 +360,35 @@ public: */ trace_state_ptr create_session(trace_type type, bool write_on_close, const std::experimental::optional& session_id = std::experimental::nullopt); - void end_session() { - --_active_sessions; - ++_pending_for_write_sessions; - if (_pending_for_write_sessions >= max_pending_for_write_sessions) { + void write_maybe() { + if (_pending_for_write_records_count >= write_event_records_threshold || _pending_for_write_records_bulk.size() >= write_event_sessions_threshold) { write_pending_records(); } } + void end_session(lw_shared_ptr records, bool write_now) { + --_active_sessions; + + // if service is down - drop the records and return + if (_down) { + return; + } + + try { + schedule_for_write(std::move(records)); + } catch (...) { + // OOM: bump up the error counter and ignore + ++stats.trace_errors; + return; + } + + if (write_now) { + write_pending_records(); + } else { + write_maybe(); + } + } + /** * Sets a probability for tracing a CQL request. * @@ -281,7 +406,64 @@ public: return _normalized_trace_probability != 0 && _gen() < _normalized_trace_probability; } + std::unique_ptr allocate_backend_session_state() const { + return _tracing_backend_helper_ptr->allocate_session_state(); + } + + /** + * Checks if there is enough budget for the @param nr new records + * @param nr number of new records + * + * @return TRUE if there is enough budget, FLASE otherwise + */ + bool have_records_budget(uint64_t nr = 1) { + // We don't want the total amount of pending, active and flushing records to + // bypass the maximum number of pending records plus the number of + // records that are possibly being written write now. + // + // If either records are being created too fast or a backend doesn't + // keep up we want to start dropping records. + // In any case, this should be rare. + if (_pending_for_write_records_count + _cached_records + _flushing_records + nr > max_pending_trace_records + write_event_records_threshold) { + return false; + } + + return true; + } + + uint64_t* get_pending_records_ptr() { + return &_pending_for_write_records_count; + } + + uint64_t* get_cached_records_ptr() { + return &_cached_records; + } + + void schedule_for_write(lw_shared_ptr records) { + if (records->is_pending_for_write()) { + return; + } + + _pending_for_write_records_bulk.emplace_back(records); + records->set_pending_for_write(); + + // move the current records from a "cached" to "pending for write" state + auto current_records_num = records->size(); + _cached_records -= current_records_num; + _pending_for_write_records_count += current_records_num; + } + private: void write_timer_callback(); }; + +void one_session_records::set_pending_for_write() { + _is_pending_for_write = true; + budget_ptr = tracing::get_local_tracing_instance().get_pending_records_ptr(); +} + +void one_session_records::data_consumed() { + _is_pending_for_write = false; + budget_ptr = tracing::get_local_tracing_instance().get_cached_records_ptr(); +} }