From 2f69e90fc90cf0c70eb5a768c932ef57da9eedde Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 18 Sep 2020 15:48:40 +0300 Subject: [PATCH 1/6] table_helper: Use local db variable The .setup_keyspace() method already has the db variable in this continuation lambda. Signed-off-by: Pavel Emelyanov --- table_helper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/table_helper.cc b/table_helper.cc index 483b0660b2..61c6b0e95b 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -135,7 +135,7 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false).get(); } - qs.get_client_state().set_keyspace(cql3::get_local_query_processor().db(), keyspace_name); + qs.get_client_state().set_keyspace(db, keyspace_name); // Create tables From f5d39b963840375a55803b5a096b7a6dddfd68c7 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 18 Sep 2020 15:52:23 +0300 Subject: [PATCH 2/6] table_helper: Use local qp as setup_table argument The goal is to make table_helper API require the query_processor reference and use it where needed. The .setup_table() is private method, and still grabs the query processor reference itself. Since its futures do noth reshard, it's safe to carry the query processor reference through. Signed-off-by: Pavel Emelyanov --- table_helper.cc | 19 ++++++++++--------- table_helper.hh | 8 +++++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/table_helper.cc b/table_helper.cc index 61c6b0e95b..1fc14cb512 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -26,8 +26,7 @@ #include "cql3/statements/modification_statement.hh" #include "database.hh" -future<> table_helper::setup_table() const { - auto& qp = cql3::get_local_query_processor(); +future<> table_helper::setup_table(cql3::query_processor& qp) const { auto& db = qp.db(); if (db.has_schema(_keyspace, _name)) { @@ -67,13 +66,14 @@ future<> table_helper::cache_table_info(service::query_state& qs) { return now(); } - return cql3::get_local_query_processor().prepare(_insert_cql, qs.get_client_state(), false) + cql3::query_processor& qp = cql3::get_local_query_processor(); + return qp.prepare(_insert_cql, qs.get_client_state(), false) .then([this] (shared_ptr msg_ptr) noexcept { _prepared_stmt = std::move(msg_ptr->get_prepared()); shared_ptr cql_stmt = _prepared_stmt->statement; _insert_stmt = dynamic_pointer_cast(cql_stmt); _is_fallback_stmt = false; - }).handle_exception_type([this, &qs] (exceptions::invalid_request_exception& eptr) { + }).handle_exception_type([this, &qs, &qp] (exceptions::invalid_request_exception& eptr) { // the non-fallback statement can't be prepared if (!_insert_cql_fallback) { return make_exception_future(eptr); @@ -82,17 +82,17 @@ future<> table_helper::cache_table_info(service::query_state& qs) { // we have already prepared the fallback statement return now(); } - return cql3::get_local_query_processor().prepare(_insert_cql_fallback.value(), qs.get_client_state(), false) + return qp.prepare(_insert_cql_fallback.value(), qs.get_client_state(), false) .then([this] (shared_ptr msg_ptr) noexcept { _prepared_stmt = std::move(msg_ptr->get_prepared()); shared_ptr cql_stmt = _prepared_stmt->statement; _insert_stmt = dynamic_pointer_cast(cql_stmt); _is_fallback_stmt = true; }); - }).handle_exception([this] (auto eptr) { + }).handle_exception([this, &qp] (auto eptr) { // One of the possible causes for an error here could be the table that doesn't exist. //FIXME: discarded future. - (void)this->setup_table().discard_result(); + (void)this->setup_table(qp).discard_result(); // We throw the bad_column_family exception because the caller // expects and accounts this type of errors. @@ -124,7 +124,8 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl } } return seastar::async([&keyspace_name, replication_factor, &qs, tables] { - auto& db = cql3::get_local_query_processor().db(); + cql3::query_processor& qp = cql3::get_local_query_processor(); + database& db = qp.db(); // Create a keyspace if (!db.has_keyspace(keyspace_name)) { @@ -141,7 +142,7 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl // Create tables size_t n = tables.size(); for (size_t i = 0; i < n; ++i) { - tables[i]->setup_table().get(); + tables[i]->setup_table(qp).get(); } }); } else { diff --git a/table_helper.hh b/table_helper.hh index 4ebf2a55fc..1ceed24d5a 100644 --- a/table_helper.hh +++ b/table_helper.hh @@ -26,9 +26,11 @@ #include "service/migration_manager.hh" -namespace cql3::statements { +namespace cql3 { +class query_processor; +namespace statements { class modification_statement; -} +}} /** * \class table_helper @@ -64,7 +66,7 @@ public: * @return A future that resolves when the operation is complete. Any * possible errors are ignored. */ - future<> setup_table() const; + future<> setup_table(cql3::query_processor& qp) const; /** * @return a future that resolves when the given t_helper is ready to be used for From b18522a7abfe9585a61274b533da7c53ba180694 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 18 Sep 2020 16:01:36 +0300 Subject: [PATCH 3/6] table_helper: Require local query processor in calls Keeping the query processor reference on the table_helper in raii manner seems waistful, the only user of it -- the trace_keyspace_helper -- has a bunch of helpers on board, each would then keep its own copy for no gain. At the same time the trace_keyspace_helper already gets the query processor for its needs, so it can share one with table_helper-s. Signed-off-by: Pavel Emelyanov --- table_helper.cc | 12 +++++------- table_helper.hh | 10 +++++----- tracing/trace_keyspace_helper.cc | 21 +++++++++++---------- tracing/trace_keyspace_helper.hh | 2 +- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/table_helper.cc b/table_helper.cc index 1fc14cb512..e8cd190b0b 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -57,7 +57,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp) const { return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});; } -future<> table_helper::cache_table_info(service::query_state& qs) { +future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) { if (!_prepared_stmt) { // if prepared statement has been invalidated - drop cached pointers _insert_stmt = nullptr; @@ -66,7 +66,6 @@ future<> table_helper::cache_table_info(service::query_state& qs) { return now(); } - cql3::query_processor& qp = cql3::get_local_query_processor(); return qp.prepare(_insert_cql, qs.get_client_state(), false) .then([this] (shared_ptr msg_ptr) noexcept { _prepared_stmt = std::move(msg_ptr->get_prepared()); @@ -106,8 +105,8 @@ future<> table_helper::cache_table_info(service::query_state& qs) { }); } -future<> table_helper::insert(service::query_state& qs, noncopyable_function opt_maker) { - return cache_table_info(qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable { +future<> table_helper::insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function opt_maker) { + return cache_table_info(qp, qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable { return do_with(opt_maker(), [this, &qs] (auto& opts) { opts.prepare(_prepared_stmt->bound_names); return _insert_stmt->execute(service::get_storage_proxy().local(), qs, opts); @@ -115,7 +114,7 @@ future<> table_helper::insert(service::query_state& qs, noncopyable_function table_helper::setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { +future<> table_helper::setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { if (this_shard_id() == 0) { size_t n = tables.size(); for (size_t i = 0; i < n; ++i) { @@ -123,8 +122,7 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace"); } } - return seastar::async([&keyspace_name, replication_factor, &qs, tables] { - cql3::query_processor& qp = cql3::get_local_query_processor(); + return seastar::async([&qp, &keyspace_name, replication_factor, &qs, tables] { database& db = qp.db(); // Create a keyspace diff --git a/table_helper.hh b/table_helper.hh index 1ceed24d5a..ad043a88e8 100644 --- a/table_helper.hh +++ b/table_helper.hh @@ -72,7 +72,7 @@ public: * @return a future that resolves when the given t_helper is ready to be used for * data insertion. */ - future<> cache_table_info(service::query_state&); + future<> cache_table_info(cql3::query_processor& qp, service::query_state&); /** * @return The table name @@ -98,15 +98,15 @@ public: */ template requires seastar::CanInvoke - future<> insert(service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) { - return insert(qs, noncopyable_function([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable { + future<> insert(cql3::query_processor& qp, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) { + return insert(qp, qs, noncopyable_function([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable { return apply(opt_maker, std::move(args)); })); } - future<> insert(service::query_state& qs, noncopyable_function opt_maker); + future<> insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function opt_maker); - static future<> setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables); + static future<> setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables); /** * Makes a monotonically increasing value in 100ns ("nanos") based on the given time diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index a3124a0eef..a6628cb462 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -242,7 +242,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr) } future<> trace_keyspace_helper::start() { - return table_helper::setup_keyspace(KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); + cql3::query_processor& qp = cql3::get_local_query_processor(); + return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } void trace_keyspace_helper::write_one_session_records(lw_shared_ptr records) { @@ -415,17 +416,16 @@ std::vector trace_keyspace_helper::make_event_mutation_data(one return values; } -future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr records, std::deque& events_records) { +future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr records, std::deque& events_records) { if (events_records.empty()) { return now(); } - return _events.cache_table_info(_dummy_query_state).then([this, records, &events_records] { + return _events.cache_table_info(qp, _dummy_query_state).then([this, &qp, records, &events_records] { tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id); std::vector modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false)); std::vector> values; - auto& qp = cql3::get_local_query_processor(); values.reserve(events_records.size()); std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); }); @@ -462,15 +462,16 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr(records->backend_state_ptr.get()); semaphore& write_sem = backend_state_ptr->write_sem; return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] { - return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] { + cql3::query_processor& qp = cql3::get_local_query_processor(); + return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] { if (session_record_is_ready) { // if session is finished - store a session and a session time index entries tlogger.trace("{}: going to store a session event", records->session_id); - return _sessions.insert(_dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, records] { + return _sessions.insert(qp, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, records] { tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name()); - return _sessions_time_idx.insert(_dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records)); - }).then([this, records] { + return _sessions_time_idx.insert(qp, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records)); + }).then([this, &qp, records] { if (!records->do_log_slow_query) { return now(); } @@ -478,9 +479,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptrsession_rec.started_at)); tlogger.trace("{}: going to store a slow query event", records->session_id); - return _slow_query_log.insert(_dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, records, start_time_id] { + return _slow_query_log.insert(qp, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, records, start_time_id] { tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name()); - return _slow_query_log_time_idx.insert(_dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id); + return _slow_query_log_time_idx.insert(qp, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id); }); }); } else { diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index b8ec8ce27c..dcc0ae822c 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -131,7 +131,7 @@ private: * @note A caller must ensure that @param events_records is alive till the * returned future resolves. */ - future<> apply_events_mutation(lw_shared_ptr records, std::deque& events_records); + future<> apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr records, std::deque& events_records); /** * Create a mutation data for a new session record From b5f136c65141ba7d475e30edd69021bd9af3f5de Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 18 Sep 2020 16:10:39 +0300 Subject: [PATCH 4/6] main: Start tracing in main Move the tracing::start_tracing() out of the storage_service::join_cluster. It anyway happens at the end of the join, so the logic is not changed, but it becomes possible to patch tracing further. Signed-off-by: Pavel Emelyanov --- main.cc | 3 +++ service/storage_service.cc | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/main.cc b/main.cc index 29c637d726..66d60fb9eb 100644 --- a/main.cc +++ b/main.cc @@ -1065,6 +1065,9 @@ int main(int ac, char** av) { sst_format_selector.sync(); ss.join_cluster().get(); + supervisor::notify("starting tracing"); + tracing::tracing::start_tracing().get(); + startlog.info("SSTable data integrity checker is {}.", cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled"); diff --git a/service/storage_service.cc b/service/storage_service.cc index fcb7777e8f..950c475783 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -600,9 +600,6 @@ void storage_service::join_token_ring(int delay) { // Retrieve the latest CDC generation seen in gossip (if any). scan_cdc_generations(); - - supervisor::notify("starting tracing"); - tracing::tracing::start_tracing().get(); } void storage_service::mark_existing_views_as_built() { From 87f1223965a43a5b04ec31d9af6110fcbe2f63a1 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 22 Sep 2020 16:10:32 +0300 Subject: [PATCH 5/6] tracing: Push query processor through init methods The goal is to make tracing keyspace helper reference query processor, so this patch adds the needed arguments through the initialization stack. Signed-off-by: Pavel Emelyanov --- main.cc | 2 +- tracing/trace_keyspace_helper.cc | 2 +- tracing/trace_keyspace_helper.hh | 2 +- tracing/tracing.cc | 10 +++++----- tracing/tracing.hh | 8 +++++--- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/main.cc b/main.cc index 66d60fb9eb..2f52440b30 100644 --- a/main.cc +++ b/main.cc @@ -1066,7 +1066,7 @@ int main(int ac, char** av) { ss.join_cluster().get(); supervisor::notify("starting tracing"); - tracing::tracing::start_tracing().get(); + tracing::tracing::start_tracing(qp).get(); startlog.info("SSTable data integrity checker is {}.", cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled"); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index a6628cb462..d3149077b6 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -241,7 +241,7 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr) }); } -future<> trace_keyspace_helper::start() { +future<> trace_keyspace_helper::start(cql3::query_processor& _qp) { cql3::query_processor& qp = cql3::get_local_query_processor(); return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index dcc0ae822c..fb8197668d 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -88,7 +88,7 @@ public: // // TODO: Create a stub_tracing_session object to discard the traces // requested during the initialization phase. - virtual future<> start() override; + virtual future<> start(cql3::query_processor& qp) override; virtual future<> stop() override { return _pending_writes.close(); diff --git a/tracing/tracing.cc b/tracing/tracing.cc index 024d252394..e4cecfe47d 100644 --- a/tracing/tracing.cc +++ b/tracing/tracing.cc @@ -102,9 +102,9 @@ future<> tracing::create_tracing(const backend_registry& br, sstring tracing_bac return tracing_instance().start(std::ref(br), std::move(tracing_backend_class_name)); } -future<> tracing::start_tracing() { - return tracing_instance().invoke_on_all([] (tracing& local_tracing) { - return local_tracing.start(); +future<> tracing::start_tracing(sharded& qp) { + return tracing_instance().invoke_on_all([&qp] (tracing& local_tracing) { + return local_tracing.start(qp.local()); }); } @@ -146,7 +146,7 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info } } -future<> tracing::start() { +future<> tracing::start(cql3::query_processor& qp) { try { _tracing_backend_helper_ptr = _backend_registry.create_backend(_tracing_backend_helper_class_name, *this); } catch (no_such_tracing_backend& e) { @@ -156,7 +156,7 @@ future<> tracing::start() { throw; } - return _tracing_backend_helper_ptr->start().then([this] { + return _tracing_backend_helper_ptr->start(qp).then([this] { _down = false; _write_timer.arm(write_period); }); diff --git a/tracing/tracing.hh b/tracing/tracing.hh index c62f0c0a15..4fb86213e5 100644 --- a/tracing/tracing.hh +++ b/tracing/tracing.hh @@ -53,6 +53,8 @@ #include "log.hh" #include "seastarx.hh" +namespace cql3 { class query_processor; } + namespace tracing { using elapsed_clock = std::chrono::steady_clock; @@ -176,7 +178,7 @@ protected: public: i_tracing_backend_helper(tracing& tr) : _local_tracing(tr) {} virtual ~i_tracing_backend_helper() {} - virtual future<> start() = 0; + virtual future<> start(cql3::query_processor& qp) = 0; virtual future<> stop() = 0; /** @@ -431,11 +433,11 @@ public: } static future<> create_tracing(const backend_registry& br, sstring tracing_backend_helper_class_name); - static future<> start_tracing(); + static future<> start_tracing(sharded& qp); tracing(const backend_registry& br, sstring tracing_backend_helper_class_name); // Initialize a tracing backend (e.g. tracing_keyspace or logstash) - future<> start(); + future<> start(cql3::query_processor& qp); future<> stop(); From e7f74449a62cf0bedc1c3380ad5b0808b2e1e51c Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Tue, 22 Sep 2020 16:14:36 +0300 Subject: [PATCH 6/6] tracing: Keep qp anchor on backend The query processor is required in table_helper's used by tracing. Now everything is ready to push the query processor reference from main down to the table helpers. Because of the current initialization sequence it's only possible to have the started query processor at the .start_tracing() time. Earlier, when the sharded is started the query processor is not yet started, so tracing keeps a pointer on local query processor. When tracing is stopped, the pointer is null-ed. This is safe (but an assert is put when dereferencing it), because on stop trace writes' gate is closed and the query processor is only used in them. Also there's still a chance that tracing remains started in case of start abort, but this is on-par with the current code -- sharded query processor is not stopped, so the memory is not freed. Signed-off-by: Pavel Emelyanov --- main.cc | 7 +++++++ tracing/trace_keyspace_helper.cc | 9 ++++++--- tracing/trace_keyspace_helper.hh | 3 ++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/main.cc b/main.cc index 2f52440b30..08cff921fd 100644 --- a/main.cc +++ b/main.cc @@ -1067,6 +1067,13 @@ int main(int ac, char** av) { supervisor::notify("starting tracing"); tracing::tracing::start_tracing(qp).get(); + /* + * FIXME -- tracing is stopped inside drain_on_shutdown, which + * is deferred later on. If the start aborts before it, the + * tracing will remain started and will continue referencing + * the query processor. Nowadays the latter is not stopped + * either, but when it will, this place shold be fixed too. + */ startlog.info("SSTable data integrity checker is {}.", cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled"); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index d3149077b6..ef8499a18d 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -241,8 +241,8 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr) }); } -future<> trace_keyspace_helper::start(cql3::query_processor& _qp) { - cql3::query_processor& qp = cql3::get_local_query_processor(); +future<> trace_keyspace_helper::start(cql3::query_processor& qp) { + _qp_anchor = &qp; return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } @@ -462,7 +462,10 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr(records->backend_state_ptr.get()); semaphore& write_sem = backend_state_ptr->write_sem; return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] { - cql3::query_processor& qp = cql3::get_local_query_processor(); + // This code is inside the _pending_writes gate and the qp pointer + // is cleared on ::stop() after the gate is closed. + assert(_qp_anchor != nullptr); + cql3::query_processor& qp = *_qp_anchor; return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] { if (session_record_is_ready) { diff --git a/tracing/trace_keyspace_helper.hh b/tracing/trace_keyspace_helper.hh index fb8197668d..29d06c1eec 100644 --- a/tracing/trace_keyspace_helper.hh +++ b/tracing/trace_keyspace_helper.hh @@ -66,6 +66,7 @@ private: int64_t _slow_query_last_nanos = 0; service::query_state _dummy_query_state; + cql3::query_processor* _qp_anchor; table_helper _sessions; table_helper _sessions_time_idx; table_helper _events; @@ -91,7 +92,7 @@ public: virtual future<> start(cql3::query_processor& qp) override; virtual future<> stop() override { - return _pending_writes.close(); + return _pending_writes.close().then([this] { _qp_anchor = nullptr; }); }; virtual void write_records_bulk(records_bulk& bulk) override;