transport: CQL tracing: instrument EXECUTE command

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
This commit is contained in:
Vlad Zolotarov
2016-06-29 16:19:52 +03:00
parent 89a49c346c
commit d21eaabcfe

View File

@@ -465,7 +465,9 @@ future<response_type>
auto cqlop = static_cast<cql_binary_opcode>(op);
if (tracing_request != tracing_request_type::not_requested) {
if (cqlop == cql_binary_opcode::QUERY || cqlop == cql_binary_opcode::PREPARE) {
if (cqlop == cql_binary_opcode::QUERY ||
cqlop == cql_binary_opcode::PREPARE ||
cqlop == cql_binary_opcode::EXECUTE) {
client_state.create_tracing_session(tracing::trace_type::QUERY, tracing_request == tracing_request_type::write_on_close);
}
}
@@ -810,16 +812,30 @@ future<response_type> cql_server::connection::process_execute(uint16_t stream, b
if (!prepared) {
throw exceptions::prepared_query_not_found_exception(id);
}
auto q_state = std::make_unique<cql_query_state>(client_state);
auto& query_state = q_state->query_state;
q_state->options = read_options(buf);
auto& options = *q_state->options;
options.prepare(prepared->bound_names);
tracing::set_page_size(client_state.get_trace_state(), options.get_page_size());
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency());
tracing::set_query(client_state.get_trace_state(), prepared->raw_cql_statement);
tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
client_state.get_client_address());
auto stmt = prepared->statement;
tracing::trace(query_state.get_trace_state(), "Checking bounds");
if (stmt->get_bound_terms() != options.get_values_count()) {
tracing::trace(query_state.get_trace_state(), "Invalid amount of bind variables: expected {:d} received {:d}", stmt->get_bound_terms(), options.get_values_count());
throw exceptions::invalid_request_exception("Invalid amount of bind variables");
}
return _server._query_processor.local().process_statement(stmt, query_state, options).then([this, stream, buf = std::move(buf)] (auto msg) {
tracing::trace(query_state.get_trace_state(), "Processing a statement");
return _server._query_processor.local().process_statement(stmt, query_state, options).then([this, stream, buf = std::move(buf), &query_state] (auto msg) {
tracing::trace(query_state.get_trace_state(), "Done processing - preparing a result");
return this->make_result(stream, msg);
}).then([&query_state, q_state = std::move(q_state), this] (auto&& response) {
/* Keep q_state alive. */