Merge "Use local reference on query_processor in tracing" from Pavel E

"
There are few places left that call for global query processor instance,
the tracing is one of them.

The query pressor is used mainly in table_helper, so this set mostly
shuffles its methods' arguments to deliver the needed reference. At the
end the main.cc code is patched to provide the query processor, which
is still global and not stopped, and is thus safe to be used anywhere.

tests: unit(dev), dtest(cql_tracing:dev)
"

* 'br-tracing-vs-query-processor' of https://github.com/xemul/scylla:
  tracing: Keep qp anchor on backend
  tracing: Push query processor through init methods
  main: Start tracing in main
  table_helper: Require local query processor in calls
  table_helper: Use local qp as setup_table argument
  table_helper: Use local db variable
This commit is contained in:
Avi Kivity
2020-10-06 18:04:24 +03:00
8 changed files with 63 additions and 48 deletions

10
main.cc
View File

@@ -1065,6 +1065,16 @@ int main(int ac, char** av) {
sst_format_selector.sync();
ss.join_cluster().get();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing(qp).get();
/*
* FIXME -- tracing is stopped inside drain_on_shutdown, which
* is deferred later on. If the start aborts before it, the
* tracing will remain started and will continue referencing
* the query processor. Nowadays the latter is not stopped
* either, but when it will, this place shold be fixed too.
*/
startlog.info("SSTable data integrity checker is {}.",
cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled");

View File

@@ -600,9 +600,6 @@ void storage_service::join_token_ring(int delay) {
// Retrieve the latest CDC generation seen in gossip (if any).
scan_cdc_generations();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing().get();
}
void storage_service::mark_existing_views_as_built() {

View File

@@ -26,8 +26,7 @@
#include "cql3/statements/modification_statement.hh"
#include "database.hh"
future<> table_helper::setup_table() const {
auto& qp = cql3::get_local_query_processor();
future<> table_helper::setup_table(cql3::query_processor& qp) const {
auto& db = qp.db();
if (db.has_schema(_keyspace, _name)) {
@@ -58,7 +57,7 @@ future<> table_helper::setup_table() const {
return service::get_local_migration_manager().announce_new_column_family(b.build(), false).discard_result().handle_exception([this] (auto ep) {});;
}
future<> table_helper::cache_table_info(service::query_state& qs) {
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) {
if (!_prepared_stmt) {
// if prepared statement has been invalidated - drop cached pointers
_insert_stmt = nullptr;
@@ -67,13 +66,13 @@ future<> table_helper::cache_table_info(service::query_state& qs) {
return now();
}
return cql3::get_local_query_processor().prepare(_insert_cql, qs.get_client_state(), false)
return qp.prepare(_insert_cql, qs.get_client_state(), false)
.then([this] (shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr) noexcept {
_prepared_stmt = std::move(msg_ptr->get_prepared());
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
_is_fallback_stmt = false;
}).handle_exception_type([this, &qs] (exceptions::invalid_request_exception& eptr) {
}).handle_exception_type([this, &qs, &qp] (exceptions::invalid_request_exception& eptr) {
// the non-fallback statement can't be prepared
if (!_insert_cql_fallback) {
return make_exception_future(eptr);
@@ -82,17 +81,17 @@ future<> table_helper::cache_table_info(service::query_state& qs) {
// we have already prepared the fallback statement
return now();
}
return cql3::get_local_query_processor().prepare(_insert_cql_fallback.value(), qs.get_client_state(), false)
return qp.prepare(_insert_cql_fallback.value(), qs.get_client_state(), false)
.then([this] (shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr) noexcept {
_prepared_stmt = std::move(msg_ptr->get_prepared());
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
_is_fallback_stmt = true;
});
}).handle_exception([this] (auto eptr) {
}).handle_exception([this, &qp] (auto eptr) {
// One of the possible causes for an error here could be the table that doesn't exist.
//FIXME: discarded future.
(void)this->setup_table().discard_result();
(void)this->setup_table(qp).discard_result();
// We throw the bad_column_family exception because the caller
// expects and accounts this type of errors.
@@ -106,8 +105,8 @@ future<> table_helper::cache_table_info(service::query_state& qs) {
});
}
future<> table_helper::insert(service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable {
future<> table_helper::insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker) {
return cache_table_info(qp, qs).then([this, &qs, opt_maker = std::move(opt_maker)] () mutable {
return do_with(opt_maker(), [this, &qs] (auto& opts) {
opts.prepare(_prepared_stmt->bound_names);
return _insert_stmt->execute(service::get_storage_proxy().local(), qs, opts);
@@ -115,7 +114,7 @@ future<> table_helper::insert(service::query_state& qs, noncopyable_function<cql
}).discard_result();
}
future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
future<> table_helper::setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
if (this_shard_id() == 0) {
size_t n = tables.size();
for (size_t i = 0; i < n; ++i) {
@@ -123,8 +122,8 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl
throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace");
}
}
return seastar::async([&keyspace_name, replication_factor, &qs, tables] {
auto& db = cql3::get_local_query_processor().db();
return seastar::async([&qp, &keyspace_name, replication_factor, &qs, tables] {
database& db = qp.db();
// Create a keyspace
if (!db.has_keyspace(keyspace_name)) {
@@ -135,13 +134,13 @@ future<> table_helper::setup_keyspace(const sstring& keyspace_name, sstring repl
service::get_local_migration_manager().announce_new_keyspace(ksm, api::min_timestamp, false).get();
}
qs.get_client_state().set_keyspace(cql3::get_local_query_processor().db(), keyspace_name);
qs.get_client_state().set_keyspace(db, keyspace_name);
// Create tables
size_t n = tables.size();
for (size_t i = 0; i < n; ++i) {
tables[i]->setup_table().get();
tables[i]->setup_table(qp).get();
}
});
} else {

View File

@@ -26,9 +26,11 @@
#include "service/migration_manager.hh"
namespace cql3::statements {
namespace cql3 {
class query_processor;
namespace statements {
class modification_statement;
}
}}
/**
* \class table_helper
@@ -64,13 +66,13 @@ public:
* @return A future that resolves when the operation is complete. Any
* possible errors are ignored.
*/
future<> setup_table() const;
future<> setup_table(cql3::query_processor& qp) const;
/**
* @return a future that resolves when the given t_helper is ready to be used for
* data insertion.
*/
future<> cache_table_info(service::query_state&);
future<> cache_table_info(cql3::query_processor& qp, service::query_state&);
/**
* @return The table name
@@ -96,15 +98,15 @@ public:
*/
template <typename OptMaker, typename... Args>
requires seastar::CanInvoke<OptMaker, Args...>
future<> insert(service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
future<> insert(cql3::query_processor& qp, service::query_state& qs, OptMaker opt_maker, Args... opt_maker_args) {
return insert(qp, qs, noncopyable_function<cql3::query_options ()>([opt_maker = std::move(opt_maker), args = std::make_tuple(std::move(opt_maker_args)...)] () mutable {
return apply(opt_maker, std::move(args));
}));
}
future<> insert(service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
future<> insert(cql3::query_processor& qp, service::query_state& qs, noncopyable_function<cql3::query_options ()> opt_maker);
static future<> setup_keyspace(const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
static future<> setup_keyspace(cql3::query_processor& qp, const sstring& keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables);
/**
* Makes a monotonically increasing value in 100ns ("nanos") based on the given time

View File

@@ -241,8 +241,9 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
});
}
future<> trace_keyspace_helper::start() {
return table_helper::setup_keyspace(KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
future<> trace_keyspace_helper::start(cql3::query_processor& qp) {
_qp_anchor = &qp;
return table_helper::setup_keyspace(qp, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx });
}
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
@@ -415,17 +416,16 @@ std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one
return values;
}
future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records) {
if (events_records.empty()) {
return now();
}
return _events.cache_table_info(_dummy_query_state).then([this, records, &events_records] {
return _events.cache_table_info(qp, _dummy_query_state).then([this, &qp, records, &events_records] {
tlogger.trace("{}: storing {} events records: parent_id {} span_id {}", records->session_id, events_records.size(), records->parent_id, records->my_span_id);
std::vector<cql3::statements::batch_statement::single_statement> modifications(events_records.size(), cql3::statements::batch_statement::single_statement(_events.insert_stmt(), false));
std::vector<std::vector<cql3::raw_value>> values;
auto& qp = cql3::get_local_query_processor();
values.reserve(events_records.size());
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
@@ -462,15 +462,19 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
auto backend_state_ptr = static_cast<trace_keyspace_backend_sesssion_state*>(records->backend_state_ptr.get());
semaphore& write_sem = backend_state_ptr->write_sem;
return with_semaphore(write_sem, 1, [this, records, session_record_is_ready, &events_records] {
return apply_events_mutation(records, events_records).then([this, session_record_is_ready, records] {
// This code is inside the _pending_writes gate and the qp pointer
// is cleared on ::stop() after the gate is closed.
assert(_qp_anchor != nullptr);
cql3::query_processor& qp = *_qp_anchor;
return apply_events_mutation(qp, records, events_records).then([this, &qp, session_record_is_ready, records] {
if (session_record_is_ready) {
// if session is finished - store a session and a session time index entries
tlogger.trace("{}: going to store a session event", records->session_id);
return _sessions.insert(_dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, records] {
return _sessions.insert(qp, _dummy_query_state, make_session_mutation_data, std::ref(*records)).then([this, &qp, records] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _sessions_time_idx.name());
return _sessions_time_idx.insert(_dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, records] {
return _sessions_time_idx.insert(qp, _dummy_query_state, make_session_time_idx_mutation_data, std::ref(*records));
}).then([this, &qp, records] {
if (!records->do_log_slow_query) {
return now();
}
@@ -478,9 +482,9 @@ future<> trace_keyspace_helper::flush_one_session_mutations(lw_shared_ptr<one_se
// if slow query log is requested - store a slow query log and a slow query log time index entries
auto start_time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(_slow_query_last_nanos, records->session_rec.started_at));
tlogger.trace("{}: going to store a slow query event", records->session_id);
return _slow_query_log.insert(_dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, records, start_time_id] {
return _slow_query_log.insert(qp, _dummy_query_state, make_slow_query_mutation_data, std::ref(*records), start_time_id).then([this, &qp, records, start_time_id] {
tlogger.trace("{}: going to store a {} entry", records->session_id, _slow_query_log_time_idx.name());
return _slow_query_log_time_idx.insert(_dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
return _slow_query_log_time_idx.insert(qp, _dummy_query_state, make_slow_query_time_idx_mutation_data, std::ref(*records), start_time_id);
});
});
} else {

View File

@@ -66,6 +66,7 @@ private:
int64_t _slow_query_last_nanos = 0;
service::query_state _dummy_query_state;
cql3::query_processor* _qp_anchor;
table_helper _sessions;
table_helper _sessions_time_idx;
table_helper _events;
@@ -88,10 +89,10 @@ public:
//
// TODO: Create a stub_tracing_session object to discard the traces
// requested during the initialization phase.
virtual future<> start() override;
virtual future<> start(cql3::query_processor& qp) override;
virtual future<> stop() override {
return _pending_writes.close();
return _pending_writes.close().then([this] { _qp_anchor = nullptr; });
};
virtual void write_records_bulk(records_bulk& bulk) override;
@@ -131,7 +132,7 @@ private:
* @note A caller must ensure that @param events_records is alive till the
* returned future resolves.
*/
future<> apply_events_mutation(lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
future<> apply_events_mutation(cql3::query_processor& qp, lw_shared_ptr<one_session_records> records, std::deque<event_record>& events_records);
/**
* Create a mutation data for a new session record

View File

@@ -102,9 +102,9 @@ future<> tracing::create_tracing(const backend_registry& br, sstring tracing_bac
return tracing_instance().start(std::ref(br), std::move(tracing_backend_class_name));
}
future<> tracing::start_tracing() {
return tracing_instance().invoke_on_all([] (tracing& local_tracing) {
return local_tracing.start();
future<> tracing::start_tracing(sharded<cql3::query_processor>& qp) {
return tracing_instance().invoke_on_all([&qp] (tracing& local_tracing) {
return local_tracing.start(qp.local());
});
}
@@ -146,7 +146,7 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info
}
}
future<> tracing::start() {
future<> tracing::start(cql3::query_processor& qp) {
try {
_tracing_backend_helper_ptr = _backend_registry.create_backend(_tracing_backend_helper_class_name, *this);
} catch (no_such_tracing_backend& e) {
@@ -156,7 +156,7 @@ future<> tracing::start() {
throw;
}
return _tracing_backend_helper_ptr->start().then([this] {
return _tracing_backend_helper_ptr->start(qp).then([this] {
_down = false;
_write_timer.arm(write_period);
});

View File

@@ -53,6 +53,8 @@
#include "log.hh"
#include "seastarx.hh"
namespace cql3 { class query_processor; }
namespace tracing {
using elapsed_clock = std::chrono::steady_clock;
@@ -176,7 +178,7 @@ protected:
public:
i_tracing_backend_helper(tracing& tr) : _local_tracing(tr) {}
virtual ~i_tracing_backend_helper() {}
virtual future<> start() = 0;
virtual future<> start(cql3::query_processor& qp) = 0;
virtual future<> stop() = 0;
/**
@@ -431,11 +433,11 @@ public:
}
static future<> create_tracing(const backend_registry& br, sstring tracing_backend_helper_class_name);
static future<> start_tracing();
static future<> start_tracing(sharded<cql3::query_processor>& qp);
tracing(const backend_registry& br, sstring tracing_backend_helper_class_name);
// Initialize a tracing backend (e.g. tracing_keyspace or logstash)
future<> start();
future<> start(cql3::query_processor& qp);
future<> stop();