diff --git a/main.cc b/main.cc index 29c637d726..08cff921fd 100644 --- a/main.cc +++ b/main.cc @@ -1065,6 +1065,16 @@ int main(int ac, char** av) { sst_format_selector.sync(); ss.join_cluster().get(); + supervisor::notify("starting tracing"); + tracing::tracing::start_tracing(qp).get(); + /* + * FIXME -- tracing is stopped inside drain_on_shutdown, which + * is deferred later on. If the start aborts before it, the + * tracing will remain started and will continue referencing + * the query processor. Nowadays the latter is not stopped + * either, but when it will, this place shold be fixed too. + */ + startlog.info("SSTable data integrity checker is {}.", cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled"); diff --git a/service/storage_service.cc b/service/storage_service.cc index fcb7777e8f..950c475783 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -600,9 +600,6 @@ void storage_service::join_token_ring(int delay) { // Retrieve the latest CDC generation seen in gossip (if any). scan_cdc_generations(); - - supervisor::notify("starting tracing"); - tracing::tracing::start_tracing().get(); } void storage_service::mark_existing_views_as_built() { diff --git a/table_helper.cc b/table_helper.cc index 483b0660b2..e8cd190b0b 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -26,8 +26,7 @@ #include "cql3/statements/modification_statement.hh" #include "database.hh" -future<> table_helper::setup_table() const { - auto& qp = cql3::get_local_query_processor(); +future<> table_helper::setup_table(cql3::query_processor& qp) const { auto& db = qp.db(); if (db.has_schema(_keyspace, _name)) { @@ -58,7 +57,7 @@ future<> table_helper::setup_table() const { return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});; } -future<> table_helper::cache_table_info(service::query_state& qs) { +future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) { if (!_prepared_stmt) { // if prepared statement has been invalidated - drop cached pointers _insert_stmt = nullptr; @@ -67,13 +66,13 @@ future<> table_helper::cache_table_info(service::query_state& qs) { return now(); } - return cql3::get_local_query_processor().prepare(_insert_cql, qs.get_client_state(), false) + return qp.prepare(_insert_cql, qs.get_client_state(), false) .then([this] (shared_ptr msg_ptr) noexcept { _prepared_stmt = std::move(msg_ptr->get_prepared()); shared_ptr cql_stmt = _prepared_stmt->statement; _insert_stmt = dynamic_pointer_cast(cql_stmt); _is_fallback_stmt = false; - }).handle_exception_type([this, &qs] (exceptions::invalid_request_exception& eptr) { + }).handle_exception_type([this, &qs, &qp] (exceptions::invalid_request_exception& eptr) { // the non-fallback statement can't be prepared if (!_insert_cql_fallback) { return make_exception_future(eptr); @@ -82,17 +81,17 @@ future<> table_helper::cache_table_info(service::query_state& qs) { // we have already prepared the fallback statement return now(); } - return cql3::get_local_query_processor().prepare(_insert_cql_fallback.value(), qs.get_client_state(), false) + return qp.prepare(_insert_cql_fallback.value(), qs.get_client_state(), false) .then([this] (shared_ptr msg_ptr) noexcept { _prepared_stmt = std::move(msg_ptr->get_prepared()); shared_ptr cql_stmt = _prepared_stmt->statement; _insert_stmt = dynamic_pointer_cast(cql_stmt); _is_fallback_stmt = true; }); - }).handle_exception([this] (auto eptr) { + }).handle_exception([this, &qp] (auto eptr) { // One of the possible causes for an error here could be the table that doesn't exist. //FIXME: discarded future. - (void)this->setup_table().discard_result(); + (void)this->setup_table(qp).discard_result(); // We throw the bad_column_family exception because the caller // expects and accounts this type of errors. @@ -106,8 +105,8 @@ future<> table_helper::cache_table_info(service::query_state& qs) { }); } -future<> table_helper::insert(service::query_state& qs, noncopyable_function opt_maker) { - return cache_table_info(qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable { +future<> table_helper::insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function opt_maker) { + return cache_table_info(qp, qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable { return do_with(opt_maker(), [this, &qs] (auto& opts) { opts.prepare(_prepared_stmt->bound_names); return _insert_stmt->execute(service::get_storage_proxy().local(), qs, opts); @@ -115,7 +114,7 @@ future<> table_helper::insert(service::query_state& qs, noncopyable_function table_helper::setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { +future<> table_helper::setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { if (this_shard_id() == 0) { size_t n = tables.size(); for (size_t i = 0; i < n; ++i) { @@ -123,8 +122,8 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace"); } } - return seastar::async([&keyspace_name, replication_factor, &qs, tables] { - auto& db = cql3::get_local_query_processor().db(); + return seastar::async([&qp, &keyspace_name, replication_factor, &qs, tables] { + database& db = qp.db(); // Create a keyspace if (!db.has_keyspace(keyspace_name)) { @@ -135,13 +134,13 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false).get(); } - qs.get_client_state().set_keyspace(cql3::get_local_query_processor().db(), keyspace_name); + qs.get_client_state().set_keyspace(db, keyspace_name); // Create tables size_t n = tables.size(); for (size_t i = 0; i < n; ++i) { - tables[i]->setup_table().get(); + tables[i]->setup_table(qp).get(); } }); } else { diff --git a/table_helper.hh b/table_helper.hh index 4ebf2a55fc..ad043a88e8 100644 --- a/table_helper.hh +++ b/table_helper.hh @@ -26,9 +26,11 @@ #include "service/migration_manager.hh" -namespace cql3::statements { +namespace cql3 { +class query_processor; +namespace statements { class modification_statement; -} +}} /** * \class table_helper @@ -64,13 +66,13 @@ public: * @return A future that resolves when the operation is complete. Any * possible errors are ignored. */ - future<> setup_table() const; + future<> setup_table(cql3::query_processor& qp) const; /** * @return a future that resolves when the given t_helper is ready to be used for * data insertion. */ - future<> cache_table_info(service::query_state&); + future<> cache_table_info(cql3::query_processor& qp, service::query_state&); /** * @return The table name @@ -96,15 +98,15 @@ public: */ template requires seastar::CanInvoke - future<> insert(service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) { - return insert(qs, noncopyable_function([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable { + future<> insert(cql3::query_processor& qp, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) { + return insert(qp, qs, noncopyable_function([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable { return apply(opt_maker, std::move(args)); })); } - future<> insert(service::query_state& qs, noncopyable_function opt_maker); + future<> insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function opt_maker); - static future<> setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables); + static future<> setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables); /** * Makes a monotonically increasing value in 100ns ("nanos") based on the given time diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index a3124a0eef..ef8499a18d 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -241,8 +241,9 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr) }); } -future<> trace_keyspace_helper::start() { - return table_helper::setup_keyspace(KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); +future<> trace_keyspace_helper::start(cql3::query_processor& qp) { + _qp_anchor = &qp; + return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { @@ -415,17 +416,16 @@ std::vector trace_keyspace_helper::make_event_mutation_data(one return values; } -future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records, std::deque& events_records) { +future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr records, std::deque& events_records) { if (events_records.empty()) { return now(); } - return _events.cache_table_info(_dummy_query_state).then([this, records, &events_records] { + return _events.cache_table_info(qp, _dummy_query_state).then([this, &qp, records, &events_records] { tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id); std::vector modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false)); std::vector> values; - auto& qp = cql3::get_local_query_processor(); values.reserve(events_records.size()); std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); }); @@ -462,15 +462,19 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr(records->backend_state_ptr.get()); semaphore& write_sem = backend_state_ptr->write_sem; return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] { - return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] { + // This code is inside the _pending_writes gate and the qp pointer + // is cleared on ::stop() after the gate is closed. + assert(_qp_anchor != nullptr); + cql3::query_processor& qp = *_qp_anchor; + return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] { if (session_record_is_ready) { // if session is finished - store a session and a session time index entries tlogger.trace("{}: going to store a session event", records->session_id); - return _sessions.insert(_dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, records] { + return _sessions.insert(qp, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, records] { tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name()); - return _sessions_time_idx.insert(_dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records)); - }).then([this, records] { + return _sessions_time_idx.insert(qp, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records)); + }).then([this, &qp, records] { if (!records->do_log_slow_query) { return now(); } @@ -478,9 +482,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptrsession_rec.started_at)); tlogger.trace("{}: going to store a slow query event", records->session_id); - return _slow_query_log.insert(_dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, records, start_time_id] { + return _slow_query_log.insert(qp, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, records, start_time_id] { tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name()); - return _slow_query_log_time_idx.insert(_dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id); + return _slow_query_log_time_idx.insert(qp, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id); }); }); } else { diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index b8ec8ce27c..29d06c1eec 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -66,6 +66,7 @@ private: int64_t _slow_query_last_nanos = 0; service::query_state _dummy_query_state; + cql3::query_processor* _qp_anchor; table_helper _sessions; table_helper _sessions_time_idx; table_helper _events; @@ -88,10 +89,10 @@ public: // // TODO: Create a stub_tracing_session object to discard the traces // requested during the initialization phase. - virtual future<> start() override; + virtual future<> start(cql3::query_processor& qp) override; virtual future<> stop() override { - return _pending_writes.close(); + return _pending_writes.close().then([this] { _qp_anchor = nullptr; }); }; virtual void write_records_bulk(records_bulk& bulk) override; @@ -131,7 +132,7 @@ private: * @note A caller must ensure that @param events_records is alive till the * returned future resolves. */ - future<> apply_events_mutation(lw_shared_ptr records, std::deque& events_records); + future<> apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr records, std::deque& events_records); /** * Create a mutation data for a new session record diff --git a/tracing/tracing.cc b/tracing/tracing.cc index 024d252394..e4cecfe47d 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -102,9 +102,9 @@ future<> tracing::create_tracing(const backend_registry& br, sstring tracing_bac return tracing_instance().start(std::ref(br), std::move(tracing_backend_class_name)); } -future<> tracing::start_tracing() { - return tracing_instance().invoke_on_all([] (tracing& local_tracing) { - return local_tracing.start(); +future<> tracing::start_tracing(sharded& qp) { + return tracing_instance().invoke_on_all([&qp] (tracing& local_tracing) { + return local_tracing.start(qp.local()); }); } @@ -146,7 +146,7 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info } } -future<> tracing::start() { +future<> tracing::start(cql3::query_processor& qp) { try { _tracing_backend_helper_ptr = _backend_registry.create_backend(_tracing_backend_helper_class_name, *this); } catch (no_such_tracing_backend& e) { @@ -156,7 +156,7 @@ future<> tracing::start() { throw; } - return _tracing_backend_helper_ptr->start().then([this] { + return _tracing_backend_helper_ptr->start(qp).then([this] { _down = false; _write_timer.arm(write_period); }); diff --git a/tracing/tracing.hh b/tracing/tracing.hh index c62f0c0a15..4fb86213e5 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -53,6 +53,8 @@ #include "log.hh" #include "seastarx.hh" +namespace cql3 { class query_processor; } + namespace tracing { using elapsed_clock = std::chrono::steady_clock; @@ -176,7 +178,7 @@ protected: public: i_tracing_backend_helper(tracing& tr) : _local_tracing(tr) {} virtual ~i_tracing_backend_helper() {} - virtual future<> start() = 0; + virtual future<> start(cql3::query_processor& qp) = 0; virtual future<> stop() = 0; /** @@ -431,11 +433,11 @@ public: } static future<> create_tracing(const backend_registry& br, sstring tracing_backend_helper_class_name); - static future<> start_tracing(); + static future<> start_tracing(sharded& qp); tracing(const backend_registry& br, sstring tracing_backend_helper_class_name); // Initialize a tracing backend (e.g. tracing_keyspace or logstash) - future<> start(); + future<> start(cql3::query_processor& qp); future<> stop();