From dc588e6e7ba47aaa75df8e844fcf9122be80e2e5 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 10 Feb 2020 13:38:53 +0200 Subject: [PATCH 1/3] alternator: pass tracing state explicitly instead of relying on it been in the client_state Multiple requests can use the same client_state simultaneously, so it is not safe to use it as a container for a tracing state which is per request. This is not yet an issue for the alternator since it creates new client_state object for each request, but first of all it should not and second trace state will be dropped from the client_state, by later patch. (cherry picked from commit 38fcab3db465954421641ea13ca397a39413948c) --- alternator/executor.cc | 82 ++++++++++++++++++++++-------------------- alternator/executor.hh | 24 ++++++------- alternator/server.cc | 34 +++++++++--------- alternator/server.hh | 2 +- 4 files changed, 73 insertions(+), 69 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 35f6749617..d79a51b22c 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -299,14 +299,14 @@ static void describe_key_schema(rjson::value& parent, const schema& schema, std: } -future executor::describe_table(client_state& client_state, std::string content) { +future executor::describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.describe_table++; rjson::value request = rjson::parse(content); elogger.trace("Describing table {}", request); schema_ptr schema = get_table(_proxy, request); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); rjson::value table_description = rjson::empty_object(); rjson::set(table_description, "TableName", rjson::from_string(schema->cf_name())); @@ -378,13 +378,13 @@ future executor::describe_table(client_state& client_sta return make_ready_future(make_jsonable(std::move(response))); } -future executor::delete_table(client_state& client_state, std::string content) { +future executor::delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.delete_table++; rjson::value request = rjson::parse(content); elogger.trace("Deleting table {}", request); std::string table_name = get_table_name(request); - tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name); + tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name); if (!_proxy.get_db().local().has_schema(KEYSPACE_NAME, table_name)) { throw api_error("ResourceNotFoundException", @@ -481,14 +481,14 @@ static std::pair parse_key_schema(const rjson::value& } -future executor::create_table(client_state& client_state, std::string content) { +future executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.create_table++; rjson::value table_info = rjson::parse(content); elogger.trace("Creating table {}", table_info); std::string table_name = get_table_name(table_info); const rjson::value& attribute_definitions = table_info["AttributeDefinitions"]; - tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, table_name); + tracing::add_table_name(trace_state, KEYSPACE_NAME, table_name); schema_builder builder(KEYSPACE_NAME, table_name); auto [hash_key, range_key] = parse_key_schema(table_info); @@ -749,14 +749,14 @@ static future> maybe_get_previous_item( bool need_read_before_write, alternator::stats& stats); -future executor::put_item(client_state& client_state, std::string content) { +future executor::put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.put_item++; auto start_time = std::chrono::steady_clock::now(); rjson::value update_info = rjson::parse(content); elogger.trace("Updating value {}", update_info); schema_ptr schema = get_table(_proxy, update_info); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); if (rjson::find(update_info, "ConditionExpression")) { throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator"); @@ -766,6 +766,7 @@ future executor::put_item(client_state& client_state, st // FIXME: Need to support also the ALL_OLD option. See issue #5053. throw api_error("ValidationException", format("Unsupported ReturnValues={} for PutItem operation", return_values)); } + const bool has_expected = update_info.HasMember("Expected"); const rjson::value& item = update_info["Item"]; @@ -774,11 +775,11 @@ future executor::put_item(client_state& client_state, st return maybe_get_previous_item(_proxy, client_state, schema, item, has_expected, _stats).then( [this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m), - &client_state, start_time] (std::unique_ptr previous_item) mutable { + &client_state, start_time, trace_state] (std::unique_ptr previous_item) mutable { if (has_expected) { verify_expected(update_info, previous_item); } - return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () { + return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () { _stats.api_operations.put_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.put_item_latency._count + 1); // Without special options on what to return, PutItem returns nothing. return make_ready_future(json_string("")); @@ -806,13 +807,13 @@ static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr sc return m; } -future executor::delete_item(client_state& client_state, std::string content) { +future executor::delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.delete_item++; auto start_time = std::chrono::steady_clock::now(); rjson::value update_info = rjson::parse(content); schema_ptr schema = get_table(_proxy, update_info); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); if (rjson::find(update_info, "ConditionExpression")) { throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator"); @@ -831,11 +832,11 @@ future executor::delete_item(client_state& client_state, return maybe_get_previous_item(_proxy, client_state, schema, key, has_expected, _stats).then( [this, schema, has_expected, update_info = rjson::copy(update_info), m = std::move(m), - &client_state, start_time] (std::unique_ptr previous_item) mutable { + &client_state, start_time, trace_state] (std::unique_ptr previous_item) mutable { if (has_expected) { verify_expected(update_info, previous_item); } - return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () { + return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () { _stats.api_operations.delete_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.delete_item_latency._count + 1); // Without special options on what to return, DeleteItem returns nothing. return make_ready_future(json_string("")); @@ -868,7 +869,7 @@ struct primary_key_equal { } }; -future executor::batch_write_item(client_state& client_state, std::string content) { +future executor::batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.batch_write_item++; rjson::value batch_info = rjson::parse(content); rjson::value& request_items = batch_info["RequestItems"]; @@ -878,7 +879,7 @@ future executor::batch_write_item(client_state& client_s for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { schema_ptr schema = get_table_from_batch_request(_proxy, it); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); std::unordered_set used_keys(1, primary_key_hash{schema}, primary_key_equal{schema}); for (auto& request : it->value.GetArray()) { if (!request.IsObject() || request.MemberCount() != 1) { @@ -911,7 +912,7 @@ future executor::batch_write_item(client_state& client_s } } - return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([] () { + return _proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([] () { // Without special options on what to return, BatchWriteItem returns nothing, // unless there are UnprocessedItems - it's possible to just stop processing a batch // due to throttling. TODO(sarna): Consider UnprocessedItems when returning. @@ -1410,13 +1411,13 @@ static future> maybe_get_previous_item( } -future executor::update_item(client_state& client_state, std::string content) { +future executor::update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.update_item++; auto start_time = std::chrono::steady_clock::now(); rjson::value update_info = rjson::parse(content); elogger.trace("update_item {}", update_info); schema_ptr schema = get_table(_proxy, update_info); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); if (rjson::find(update_info, "ConditionExpression")) { throw api_error("ValidationException", "ConditionExpression is not yet implemented in alternator"); @@ -1472,7 +1473,7 @@ future executor::update_item(client_state& client_state, return maybe_get_previous_item(_proxy, client_state, schema, pk, ck, has_update_expression, expression, has_expected, _stats).then( [this, schema, expression = std::move(expression), has_update_expression, ck = std::move(ck), has_expected, update_info = rjson::copy(update_info), m = std::move(m), attrs_collector = std::move(attrs_collector), - attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time] (std::unique_ptr previous_item) mutable { + attribute_updates = rjson::copy(attribute_updates), ts, &client_state, start_time, trace_state] (std::unique_ptr previous_item) mutable { if (has_expected) { verify_expected(update_info, previous_item); } @@ -1603,7 +1604,7 @@ future executor::update_item(client_state& client_state, row.apply(row_marker(ts)); elogger.trace("Applying mutation {}", m); - return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), client_state.get_trace_state(), empty_service_permit()).then([this, start_time] () { + return _proxy.mutate(std::vector{std::move(m)}, db::consistency_level::LOCAL_QUORUM, default_timeout(), trace_state, empty_service_permit()).then([this, start_time] () { // Without special options on what to return, UpdateItem returns nothing. _stats.api_operations.update_item_latency.add(std::chrono::steady_clock::now() - start_time, _stats.api_operations.update_item_latency._count + 1); return make_ready_future(json_string("")); @@ -1630,7 +1631,7 @@ static db::consistency_level get_read_consistency(const rjson::value& request) { return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE; } -future executor::get_item(client_state& client_state, std::string content) { +future executor::get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.get_item++; auto start_time = std::chrono::steady_clock::now(); rjson::value table_info = rjson::parse(content); @@ -1638,7 +1639,7 @@ future executor::get_item(client_state& client_state, st schema_ptr schema = get_table(_proxy, table_info); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); rjson::value& query_key = table_info["Key"]; db::consistency_level cl = get_read_consistency(table_info); @@ -1673,7 +1674,7 @@ future executor::get_item(client_state& client_state, st }); } -future executor::batch_get_item(client_state& client_state, std::string content) { +future executor::batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { // FIXME: In this implementation, an unbounded batch size can cause // unbounded response JSON object to be buffered in memory, unbounded // parallelism of the requests, and unbounded amount of non-preemptable @@ -1701,7 +1702,7 @@ future executor::batch_get_item(client_state& client_sta for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { table_requests rs; rs.schema = get_table_from_batch_request(_proxy, it); - tracing::add_table_name(client_state.get_trace_state(), KEYSPACE_NAME, rs.schema->cf_name()); + tracing::add_table_name(trace_state, KEYSPACE_NAME, rs.schema->cf_name()); rs.cl = get_read_consistency(it->value); rs.attrs_to_get = calculate_attrs_to_get(it->value); auto& keys = (it->value)["Keys"]; @@ -1867,10 +1868,11 @@ static future do_query(schema_ptr schema, db::consistency_level cl, ::shared_ptr filtering_restrictions, service::client_state& client_state, - cql3::cql_stats& cql_stats) { + cql3::cql_stats& cql_stats, + tracing::trace_state_ptr trace_state) { ::shared_ptr paging_state = nullptr; - tracing::trace(client_state.get_trace_state(), "Performing a database query"); + tracing::trace(trace_state, "Performing a database query"); if (exclusive_start_key) { partition_key pk = pk_from_json(*exclusive_start_key, schema); @@ -1887,7 +1889,7 @@ static future do_query(schema_ptr schema, auto partition_slice = query::partition_slice(std::move(ck_bounds), {}, std::move(regular_columns), selection->get_query_options()); auto command = ::make_lw_shared(schema->id(), schema->version(), partition_slice, query::max_partitions); - auto query_state_ptr = std::make_unique(client_state, empty_service_permit()); + auto query_state_ptr = std::make_unique(client_state, trace_state, empty_service_permit()); command->slice.options.set(); auto query_options = std::make_unique(cl, infinite_timeout_config, std::vector{}); @@ -1919,7 +1921,7 @@ static future do_query(schema_ptr schema, // 2. Filtering - by passing appropriately created restrictions to pager as a last parameter // 3. Proper timeouts instead of gc_clock::now() and db::no_timeout // 4. Implement parallel scanning via Segments -future executor::scan(client_state& client_state, std::string content) { +future executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.scan++; rjson::value request_info = rjson::parse(content); elogger.trace("Scanning {}", request_info); @@ -1956,7 +1958,7 @@ future executor::scan(client_state& client_state, std::s partition_ranges = filtering_restrictions->get_partition_key_ranges(query_options); ck_bounds = filtering_restrictions->get_clustering_bounds(query_options); } - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats); + return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, trace_state); } static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) { @@ -2079,14 +2081,14 @@ calculate_bounds(schema_ptr schema, const rjson::value& conditions) { return {std::move(partition_ranges), std::move(ck_bounds)}; } -future executor::query(client_state& client_state, std::string content) { +future executor::query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content) { _stats.api_operations.query++; rjson::value request_info = rjson::parse(content); elogger.trace("Querying {}", request_info); schema_ptr schema = get_table_or_view(_proxy, request_info); - tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); + tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name()); rjson::value* exclusive_start_key = rjson::find(request_info, "ExclusiveStartKey"); db::consistency_level cl = get_read_consistency(request_info); @@ -2129,7 +2131,7 @@ future executor::query(client_state& client_state, std:: throw api_error("ValidationException", format("QueryFilter can only contain non-primary key attributes: Primary key attribute: {}", ck_defs.front()->name_as_text())); } } - return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats); + return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions), client_state, _stats.cql_stats, std::move(trace_state)); } static void validate_limit(int limit) { @@ -2238,18 +2240,20 @@ future<> executor::maybe_create_keyspace() { }); } -static void create_tracing_session(executor::client_state& client_state) { +static tracing::trace_state_ptr create_tracing_session() { tracing::trace_state_props_set props; props.set(); - client_state.create_tracing_session(tracing::trace_type::QUERY, props); + return tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, props); } -void executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) { +tracing::trace_state_ptr executor::maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query) { + tracing::trace_state_ptr trace_state; if (tracing::tracing::get_local_tracing_instance().trace_next_query()) { - create_tracing_session(client_state); - tracing::add_query(client_state.get_trace_state(), query); - tracing::begin(client_state.get_trace_state(), format("Alternator {}", op), client_state.get_client_address()); + trace_state = create_tracing_session(); + tracing::add_query(trace_state, query); + tracing::begin(trace_state, format("Alternator {}", op), client_state.get_client_address()); } + return trace_state; } future<> executor::start() { diff --git a/alternator/executor.hh b/alternator/executor.hh index 142d3f996d..f46aa03ff9 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -46,26 +46,26 @@ public: executor(service::storage_proxy& proxy, service::migration_manager& mm) : _proxy(proxy), _mm(mm) {} - future create_table(client_state& client_state, std::string content); - future describe_table(client_state& client_state, std::string content); - future delete_table(client_state& client_state, std::string content); - future put_item(client_state& client_state, std::string content); - future get_item(client_state& client_state, std::string content); - future delete_item(client_state& client_state, std::string content); - future update_item(client_state& client_state, std::string content); + future create_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future put_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future update_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); future list_tables(client_state& client_state, std::string content); - future scan(client_state& client_state, std::string content); + future scan(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); future describe_endpoints(client_state& client_state, std::string content, std::string host_header); - future batch_write_item(client_state& client_state, std::string content); - future batch_get_item(client_state& client_state, std::string content); - future query(client_state& client_state, std::string content); + future batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); + future query(client_state& client_state, tracing::trace_state_ptr trace_state, std::string content); future<> start(); future<> stop() { return make_ready_future<>(); } future<> maybe_create_keyspace(); - static void maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query); + static tracing::trace_state_ptr maybe_trace_query(client_state& client_state, sstring_view op, sstring_view query); }; } diff --git a/alternator/server.cc b/alternator/server.cc index c8cf35d7bf..7f54a66352 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -231,9 +231,9 @@ future server::handle_api_request(std::unique_ptr(executor::client_state::internal_tag()), [this, callback_it = std::move(callback_it), op = std::move(op), req = std::move(req)] (std::unique_ptr& client_state) mutable { client_state->set_raw_keyspace(executor::KEYSPACE_NAME); - executor::maybe_trace_query(*client_state, op, req->content); - tracing::trace(client_state->get_trace_state(), op); - return callback_it->second(_executor.local(), *client_state, std::move(req)); + tracing::trace_state_ptr trace_state = executor::maybe_trace_query(*client_state, op, req->content); + tracing::trace(trace_state, op); + return callback_it->second(_executor.local(), *client_state, trace_state, std::move(req)).finally([trace_state] {}); }); }); } @@ -253,21 +253,21 @@ void server::set_routes(routes& r) { server::server(seastar::sharded& e) : _executor(e), _key_cache(1024, 1min, slogger), _enforce_authorization(false) , _callbacks{ - {"CreateTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { - return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req)] { return e.create_table(client_state, req->content); }); } + {"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { + return e.maybe_create_keyspace().then([&e, &client_state, req = std::move(req), trace_state = std::move(trace_state)] () mutable { return e.create_table(client_state, std::move(trace_state), req->content); }); } }, - {"DescribeTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.describe_table(client_state, req->content); }}, - {"DeleteTable", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.delete_table(client_state, req->content); }}, - {"PutItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.put_item(client_state, req->content); }}, - {"UpdateItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.update_item(client_state, req->content); }}, - {"GetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.get_item(client_state, req->content); }}, - {"DeleteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.delete_item(client_state, req->content); }}, - {"ListTables", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.list_tables(client_state, req->content); }}, - {"Scan", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.scan(client_state, req->content); }}, - {"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }}, - {"BatchWriteItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.batch_write_item(client_state, req->content); }}, - {"BatchGetItem", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.batch_get_item(client_state, req->content); }}, - {"Query", [] (executor& e, executor::client_state& client_state, std::unique_ptr req) { return e.query(client_state, req->content); }}, + {"DescribeTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.describe_table(client_state, std::move(trace_state), req->content); }}, + {"DeleteTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.delete_table(client_state, std::move(trace_state), req->content); }}, + {"PutItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.put_item(client_state, std::move(trace_state), req->content); }}, + {"UpdateItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.update_item(client_state, std::move(trace_state), req->content); }}, + {"GetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.get_item(client_state, std::move(trace_state), req->content); }}, + {"DeleteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.delete_item(client_state, std::move(trace_state), req->content); }}, + {"ListTables", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.list_tables(client_state, req->content); }}, + {"Scan", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.scan(client_state, std::move(trace_state), req->content); }}, + {"DescribeEndpoints", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.describe_endpoints(client_state, req->content, req->get_header("Host")); }}, + {"BatchWriteItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.batch_write_item(client_state, std::move(trace_state), req->content); }}, + {"BatchGetItem", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.batch_get_item(client_state, std::move(trace_state), req->content); }}, + {"Query", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, std::unique_ptr req) { return e.query(client_state, std::move(trace_state), req->content); }}, } { } diff --git a/alternator/server.hh b/alternator/server.hh index ec9a14a4d6..9c5e2edfaf 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -31,7 +31,7 @@ namespace alternator { class server { - using alternator_callback = std::function(executor&, executor::client_state&, std::unique_ptr)>; + using alternator_callback = std::function(executor&, executor::client_state&, tracing::trace_state_ptr, std::unique_ptr)>; using alternator_callbacks_map = std::unordered_map; seastar::httpd::http_server_control _control; From 866c04dd6440cb51e752dd072019426aa5192659 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 10 Feb 2020 14:18:42 +0200 Subject: [PATCH 2/3] transport: pass tracing state explicitly instead of relying on it been in the client_state Multiple requests can use the same client_state simultaneously, so it is not safe to use it as a container for a tracing state which is per request. Currently next request may overwrite tracing state for previous one causing, in a best case, wrong trace to be taken or crash if overwritten pointer is freed prematurely. Fixes #5700 (cherry picked from commit 9f1f60fc380c0b51aae650dc623ad50ecd5e3ae6) --- transport/server.cc | 203 +++++++++++++++++++++++--------------------- transport/server.hh | 24 +++--- 2 files changed, 115 insertions(+), 112 deletions(-) diff --git a/transport/server.cc b/transport/server.cc index f481fa8bdf..4c74096cf5 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -372,6 +372,7 @@ future>> trace_props.set_if(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled()); trace_props.set_if(tracing_request != tracing_request_type::not_requested); + tracing::trace_state_ptr trace_state; if (trace_props) { if (cqlop == cql_binary_opcode::QUERY || @@ -379,15 +380,15 @@ future>> cqlop == cql_binary_opcode::EXECUTE || cqlop == cql_binary_opcode::BATCH) { trace_props.set_if(tracing_request == tracing_request_type::write_on_close); - client_state.create_tracing_session(tracing::trace_type::QUERY, trace_props); + trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props); } } - tracing::set_request_size(client_state.get_trace_state(), fbuf.bytes_left()); + tracing::set_request_size(trace_state, fbuf.bytes_left()); auto linearization_buffer = std::make_unique(); auto linearization_buffer_ptr = linearization_buffer.get(); - return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit)] () mutable { + return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable { // When using authentication, we need to ensure we are doing proper state transitions, // i.e. we cannot simply accept any query/exec ops unless auth is complete switch (client_state.get_auth_state()) { @@ -409,7 +410,7 @@ future>> break; } - tracing::set_username(client_state.get_trace_state(), client_state.user()); + tracing::set_username(trace_state, client_state.user()); auto wrap_in_foreign = [] (future> f) { return f.then([] (std::unique_ptr p) { @@ -418,19 +419,19 @@ future>> }; auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr); switch (cqlop) { - case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state)); - case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state)); - case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state)); - case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit)); - case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state)); - case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit)); - case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit))); - case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state)); + case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state, trace_state)); + case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state, trace_state)); + case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state, trace_state)); + case cql_binary_opcode::QUERY: return process_query(stream, std::move(in), client_state, std::move(permit), trace_state); + case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state, trace_state)); + case cql_binary_opcode::EXECUTE: return process_execute(stream, std::move(in), client_state, std::move(permit), trace_state); + case cql_binary_opcode::BATCH: return wrap_in_foreign(process_batch(stream, std::move(in), client_state, std::move(permit), trace_state)); + case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state, trace_state)); default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop))); } - }).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer)] (future>> f) -> foreign_ptr> { + }).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future>> f) -> foreign_ptr> { auto stop_trace = defer([&] { - tracing::stop_foreground(client_state.get_trace_state()); + tracing::stop_foreground(trace_state); }); --_server._requests_serving; try { @@ -461,28 +462,28 @@ future>> break; } - tracing::set_response_size(client_state.get_trace_state(), response->size()); + tracing::set_response_size(trace_state, response->size()); return response; } catch (const exceptions::unavailable_exception& ex) { - return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, client_state.get_trace_state()); + return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state); } catch (const exceptions::read_timeout_exception& ex) { - return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, client_state.get_trace_state()); + return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state); } catch (const exceptions::read_failure_exception& ex) { - return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, client_state.get_trace_state()); + return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state); } catch (const exceptions::mutation_write_timeout_exception& ex) { - return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, client_state.get_trace_state()); + return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state); } catch (const exceptions::mutation_write_failure_exception& ex) { - return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, client_state.get_trace_state()); + return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state); } catch (const exceptions::already_exists_exception& ex) { - return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, client_state.get_trace_state()); + return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state); } catch (const exceptions::prepared_query_not_found_exception& ex) { - return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, client_state.get_trace_state()); + return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state); } catch (const exceptions::cassandra_exception& ex) { - return make_error(stream, ex.code(), ex.what(), client_state.get_trace_state()); + return make_error(stream, ex.code(), ex.what(), trace_state); } catch (std::exception& ex) { - return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), client_state.get_trace_state()); + return make_error(stream, exceptions::exception_code::SERVER_ERROR, ex.what(), trace_state); } catch (...) { - return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", client_state.get_trace_state()); + return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state); } }); } @@ -695,8 +696,8 @@ future cql_server::connection::read_and_decompress_ return _buffer_reader.read_exactly(_read_buf, length); } -future> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state) -{ +future> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state, + tracing::trace_state_ptr trace_state) { auto options = in.read_string_map(); auto compression_opt = options.find("COMPRESSION"); if (compression_opt != options.end()) { @@ -712,33 +713,31 @@ future> cql_server::connection::process_st } auto& a = client_state.get_auth_service()->underlying_authenticator(); if (a.require_authentication()) { - return make_ready_future>(make_autheticate(stream, a.qualified_java_name(), client_state.get_trace_state())); + return make_ready_future>(make_autheticate(stream, a.qualified_java_name(), trace_state)); } - return make_ready_future>(make_ready(stream, client_state.get_trace_state())); + return make_ready_future>(make_ready(stream, trace_state)); } -future> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state) -{ +future> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, + tracing::trace_state_ptr trace_state) { auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge(); auto buf = in.read_raw_bytes_view(in.bytes_left()); auto challenge = sasl_challenge->evaluate_response(buf); if (sasl_challenge->is_complete()) { - return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge)](auth::authenticated_user user) mutable { + return sasl_challenge->get_authenticated_user().then([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](auth::authenticated_user user) mutable { client_state.set_login(std::move(user)); auto f = client_state.check_user_can_login(); - return f.then([this, stream, &client_state, challenge = std::move(challenge)]() mutable { - auto tr_state = client_state.get_trace_state(); - return make_ready_future>(make_auth_success(stream, std::move(challenge), tr_state)); + return f.then([this, stream, &client_state, challenge = std::move(challenge), trace_state]() mutable { + return make_ready_future>(make_auth_success(stream, std::move(challenge), trace_state)); }); }); } - auto tr_state = client_state.get_trace_state(); - return make_ready_future>(make_auth_challenge(stream, std::move(challenge), tr_state)); + return make_ready_future>(make_auth_challenge(stream, std::move(challenge), trace_state)); } -future> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state) -{ - return make_ready_future>(make_supported(stream, client_state.get_trace_state())); +future> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state, + tracing::trace_state_ptr trace_state) { + return make_ready_future>(make_supported(stream, std::move(trace_state))); } void @@ -753,22 +752,23 @@ make_result(int16_t stream, messages::result_message& msg, const tracing::trace_ static future>, unsigned>> process_query_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) { + const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state, + bool init_trace) { auto query = in.read_long_string_view(); - auto q_state = std::make_unique(client_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); auto& query_state = q_state->query_state; q_state->options = in.read_options(version, serialization_format, timeout_config, cql_config); auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); if (init_trace) { - tracing::set_page_size(query_state.get_trace_state(), options.get_page_size()); - tracing::set_consistency_level(query_state.get_trace_state(), options.get_consistency()); - tracing::set_optional_serial_consistency_level(query_state.get_trace_state(), options.get_serial_consistency()); - tracing::add_query(query_state.get_trace_state(), query); - tracing::set_user_timestamp(query_state.get_trace_state(), options.get_specific_options().timestamp); + tracing::set_page_size(trace_state, options.get_page_size()); + tracing::set_consistency_level(trace_state, options.get_consistency()); + tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency()); + tracing::add_query(trace_state, query); + tracing::set_user_timestamp(trace_state, options.get_specific_options().timestamp); - tracing::begin(query_state.get_trace_state(), "Execute CQL3 query", client_state.get_client_address()); + tracing::begin(trace_state, "Execute CQL3 query", client_state.get_client_address()); } return qp.local().process(query, query_state, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) { @@ -783,16 +783,18 @@ process_query_internal(service::client_state& client_state, distributed>> cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is, - service::client_state& cs, service_permit permit) { + service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) { return smp::submit_to(shard, _server._config.bounce_request_smp_service_group, - [this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () { + [this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), + gt = tracing::global_trace_state_ptr(std::move(trace_state))] () { service::client_state client_state = cs.get(); cql_server& server = s.get().local(); - return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream] + tracing::trace_state_ptr trace_state = gt; + return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state] (bytes_ostream& linearization_buffer, service::client_state& client_state) { request_reader in(is, linearization_buffer); return process_query_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, - server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) { + server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) { // result here has to be foreign ptr return std::get>>(std::move(msg)); }); @@ -801,27 +803,27 @@ cql_server::connection::process_query_on_shard(unsigned shard, uint16_t stream, } future>> -cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit) +cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { fragmented_temporary_buffer::istream is = in.get_stream(); return process_query_internal(client_state, _server._query_processor, in, stream, - _version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true) - .then([stream, &client_state, this, is, permit] (std::variant>, unsigned> msg) mutable { + _version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true) + .then([stream, &client_state, this, is, permit, trace_state] (std::variant>, unsigned> msg) mutable { unsigned* shard = std::get_if(&msg); if (shard) { - return process_query_on_shard(*shard, stream, is, client_state, std::move(permit)); + return process_query_on_shard(*shard, stream, is, client_state, std::move(permit), std::move(trace_state)); } return make_ready_future>>(std::get>>(std::move(msg))); }); } -future> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state) -{ +future> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, + tracing::trace_state_ptr trace_state) { auto query = sstring(in.read_long_string_view()); - tracing::add_query(client_state.get_trace_state(), query); - tracing::begin(client_state.get_trace_state(), "Preparing CQL3 query", client_state.get_client_address()); + tracing::add_query(trace_state, query); + tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address()); auto cpu_id = engine().cpu_id(); auto cpus = boost::irange(0u, smp::count); @@ -833,13 +835,13 @@ future> cql_server::connection::process_pr } else { return make_ready_future<>(); } - }).then([this, query, stream, &client_state] () mutable { - tracing::trace(client_state.get_trace_state(), "Done preparing on remote shards"); - return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state] (auto msg) { - tracing::trace(client_state.get_trace_state(), "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] { + }).then([this, query, stream, &client_state, trace_state] () mutable { + tracing::trace(trace_state, "Done preparing on remote shards"); + return _server._query_processor.local().prepare(std::move(query), client_state, false).then([this, stream, &client_state, trace_state] (auto msg) { + tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] { return messages::result_message::prepared::cql::get_id(msg); })); - return make_result(stream, *msg, client_state.get_trace_state(), _version); + return make_result(stream, *msg, trace_state, _version); }); }); } @@ -847,7 +849,8 @@ future> cql_server::connection::process_pr future>, unsigned>> process_execute_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, bool init_trace) { + const cql3::cql_config& cql_config, const ::timeout_config& timeout_config, service_permit permit, + tracing::trace_state_ptr trace_state, bool init_trace) { cql3::prepared_cache_key_type cache_key(in.read_short_bytes()); auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); bool needs_authorization = false; @@ -864,7 +867,7 @@ process_execute_internal(service::client_state& client_state, distributed(client_state, client_state.get_trace_state(), std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); auto& query_state = q_state->query_state; if (version == 1) { std::vector values; @@ -879,33 +882,33 @@ process_execute_internal(service::client_state& client_state, distributedraw_cql_statement); - tracing::add_prepared_statement(client_state.get_trace_state(), prepared); + tracing::set_page_size(trace_state, options.get_page_size()); + tracing::set_consistency_level(trace_state, options.get_consistency()); + tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency()); + tracing::add_query(trace_state, prepared->raw_cql_statement); + tracing::add_prepared_statement(trace_state, prepared); - tracing::begin(client_state.get_trace_state(), seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }), + tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }), client_state.get_client_address()); } auto stmt = prepared->statement; - tracing::trace(query_state.get_trace_state(), "Checking bounds"); + tracing::trace(trace_state, "Checking bounds"); if (stmt->get_bound_terms() != options.get_values_count()) { const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}", stmt->get_bound_terms(), options.get_values_count()); - tracing::trace(query_state.get_trace_state(), msg); + tracing::trace(trace_state, msg); throw exceptions::invalid_request_exception(msg); } options.prepare(prepared->bound_names); if (init_trace) { - tracing::add_prepared_query_options(client_state.get_trace_state(), options); + tracing::add_prepared_query_options(trace_state, options); } - tracing::trace(query_state.get_trace_state(), "Processing a statement"); + tracing::trace(trace_state, "Processing a statement"); return qp.local().process_statement_prepared(std::move(prepared), std::move(cache_key), query_state, options, needs_authorization) .then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version] (auto msg) { if (msg->move_to_shard()) { @@ -919,16 +922,18 @@ process_execute_internal(service::client_state& client_state, distributed>> cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is, - service::client_state& cs, service_permit permit) { + service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state) { return smp::submit_to(shard, _server._config.bounce_request_smp_service_group, - [this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit)] () { + [this, s = std::ref(_server.container()), is = std::move(is), cs = cs.move_to_other_shard(), stream, permit = std::move(permit), + gt = tracing::global_trace_state_ptr(std::move(trace_state))] () { service::client_state client_state = cs.get(); + tracing::trace_state_ptr trace_state = gt; cql_server& server = s.get().local(); - return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream] + return do_with(bytes_ostream(), std::move(client_state), [this, &server, is = std::move(is), stream, trace_state] (bytes_ostream& linearization_buffer, service::client_state& client_state) { request_reader in(is, linearization_buffer); return process_execute_internal(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, - server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), false).then([] (auto msg) { + server._cql_config, server.timeout_config(), /* FIXME */empty_service_permit(), trace_state, false).then([] (auto msg) { // result here has to be foreign ptr return std::get>>(std::move(msg)); }); @@ -937,22 +942,22 @@ cql_server::connection::process_execute_on_shard(unsigned shard, uint16_t stream } future>> cql_server::connection::process_execute(uint16_t stream, request_reader in, - service::client_state& client_state, service_permit permit) { + service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { fragmented_temporary_buffer::istream is = in.get_stream(); return process_execute_internal(client_state, _server._query_processor, in, stream, - _version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, true) - .then([stream, &client_state, this, is, permit] (std::variant>, unsigned> msg) mutable { + _version, _cql_serialization_format, _server._cql_config, _server.timeout_config(), permit, trace_state, true) + .then([stream, &client_state, this, is, permit, trace_state] (std::variant>, unsigned> msg) mutable { unsigned* shard = std::get_if(&msg); if (shard) { - return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit)); + return process_execute_on_shard(*shard, stream, is, client_state, std::move(permit), trace_state); } return make_ready_future>>(std::get>>(std::move(msg))); }); } future> -cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit) +cql_server::connection::process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state) { if (_version == 1) { throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol"); @@ -968,7 +973,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic modifications.reserve(n); values.reserve(n); - tracing::begin(client_state.get_trace_state(), "Execute batch of CQL3 queries", client_state.get_client_address()); + tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address()); for ([[gnu::unused]] auto i : boost::irange(0u, n)) { const auto kind = in.read_byte(); @@ -982,7 +987,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic auto query = in.read_long_string_view(); stmt_ptr = _server._query_processor.local().get_statement(query, client_state); ps = stmt_ptr->checked_weak_from_this(); - tracing::add_query(client_state.get_trace_state(), query); + tracing::add_query(trace_state, query); break; } case 1: { @@ -1001,7 +1006,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second; } - tracing::add_query(client_state.get_trace_state(), ps->raw_cql_statement); + tracing::add_query(trace_state, ps->raw_cql_statement); break; } default: @@ -1015,8 +1020,8 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic } ::shared_ptr modif_statement_ptr = static_pointer_cast(ps->statement); - tracing::add_table_name(client_state.get_trace_state(), modif_statement_ptr->keyspace(), modif_statement_ptr->column_family()); - tracing::add_prepared_statement(client_state.get_trace_state(), ps); + tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family()); + tracing::add_prepared_statement(trace_state, ps); modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization); @@ -1031,16 +1036,16 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic values.emplace_back(std::move(tmp)); } - auto q_state = std::make_unique(client_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); auto& query_state = q_state->query_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values))); auto& options = *q_state->options; - tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency()); - tracing::set_optional_serial_consistency_level(client_state.get_trace_state(), options.get_serial_consistency()); - tracing::add_prepared_query_options(client_state.get_trace_state(), options); - tracing::trace(client_state.get_trace_state(), "Creating a batch statement"); + tracing::set_consistency_level(trace_state, options.get_consistency()); + tracing::set_optional_serial_consistency_level(trace_state, options.get_serial_consistency()); + tracing::add_prepared_query_options(trace_state, options); + tracing::trace(trace_state, "Creating a batch statement"); auto batch = ::make_shared(cql3::statements::batch_statement::type(type), std::move(modifications), cql3::attributes::none(), _server._query_processor.local().get_cql_stats()); return _server._query_processor.local().process_batch(batch, query_state, options, std::move(pending_authorization_entries)) @@ -1050,15 +1055,15 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic } future> -cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state) -{ +cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state, + tracing::trace_state_ptr trace_state) { std::vector event_types; in.read_string_list(event_types); for (auto&& event_type : event_types) { auto et = parse_event_type(event_type); _server._notifier->register_event(et, this); } - return make_ready_future>(make_ready(stream, client_state.get_trace_state())); + return make_ready_future>(make_ready(stream, std::move(trace_state))); } std::unique_ptr cql_server::connection::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const diff --git a/transport/server.hh b/transport/server.hh index e24c154098..cf73e3a010 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -96,9 +96,6 @@ struct cql_query_state { service::query_state query_state; std::unique_ptr options; - cql_query_state(service::client_state& client_state, service_permit permit) - : query_state(client_state, std::move(permit)) - { } cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) : query_state(client_state, std::move(trace_state_ptr), std::move(permit)) { } @@ -197,14 +194,14 @@ private: cql_binary_frame_v3 parse_frame(temporary_buffer buf) const; future read_and_decompress_frame(size_t length, uint8_t flags); future> read_frame(); - future> process_startup(uint16_t stream, request_reader in, service::client_state& client_state); - future> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state); - future> process_options(uint16_t stream, request_reader in, service::client_state& client_state); - future>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit); - future> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state); - future>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit); - future> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit); - future> process_register(uint16_t stream, request_reader in, service::client_state& client_state); + future> process_startup(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state); + future> process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state); + future> process_options(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state); + future>> process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state); + future> process_prepare(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state); + future>> process_execute(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state); + future> process_batch(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit, tracing::trace_state_ptr trace_state); + future> process_register(uint16_t stream, request_reader in, service::client_state& client_state, tracing::trace_state_ptr trace_state); std::unique_ptr make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state) const; std::unique_ptr make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const; @@ -224,11 +221,12 @@ private: std::unique_ptr make_auth_challenge(int16_t, bytes, const tracing::trace_state_ptr& tr_state) const; future>> + process_execute_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is, - service::client_state& cs, service_permit permit); + service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state); future>> process_query_on_shard(unsigned shard, uint16_t stream, fragmented_temporary_buffer::istream is, - service::client_state& cs, service_permit permit); + service::client_state& cs, service_permit permit, tracing::trace_state_ptr trace_state); void write_response(foreign_ptr>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none); From 35b6505517a61a939e88f956a83ce6f984ed6381 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 10 Feb 2020 14:25:55 +0200 Subject: [PATCH 3/3] client_state: drop the pointer to a tracing state from client_state client_state is shared between requests and tracing state is per request. It is not safe to use the former as a container for the later since a state can be overwritten prematurely by subsequent requests. (cherry picked from commit 31cf2434d617561322df64f684224f87a4999bea) --- service/client_state.hh | 25 +++++-------------------- service/query_state.hh | 2 +- 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/service/client_state.hh b/service/client_state.hh index d1704243a7..7a0f0236b1 100644 --- a/service/client_state.hh +++ b/service/client_state.hh @@ -75,25 +75,22 @@ public: class client_state_for_another_shard { private: const client_state* _cs; - tracing::global_trace_state_ptr _trace_state; seastar::sharded* _auth_service; - client_state_for_another_shard(const client_state* cs, tracing::global_trace_state_ptr gt, - seastar::sharded* auth_service) : _cs(cs), _trace_state(gt), _auth_service(auth_service) {} + client_state_for_another_shard(const client_state* cs, seastar::sharded* auth_service) : _cs(cs), _auth_service(auth_service) {} friend client_state; public: client_state get() const { - return client_state(_cs, _trace_state, _auth_service); + return client_state(_cs, _auth_service); } }; private: - client_state(const client_state* cs, tracing::global_trace_state_ptr gt, seastar::sharded* auth_service) - : _keyspace(cs->_keyspace), _trace_state_ptr(gt), _user(cs->_user), _auth_state(cs->_auth_state), + client_state(const client_state* cs, seastar::sharded* auth_service) + : _keyspace(cs->_keyspace), _user(cs->_user), _auth_state(cs->_auth_state), _is_internal(cs->_is_internal), _is_thrift(cs->_is_thrift), _remote_address(cs->_remote_address), _auth_service(auth_service ? &auth_service->local() : nullptr) {} friend client_state_for_another_shard; private: sstring _keyspace; - tracing::trace_state_ptr _trace_state_ptr; #if 0 private static final Logger logger = LoggerFactory.getLogger(ClientState.class); public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION; @@ -138,18 +135,6 @@ public: struct internal_tag {}; struct external_tag {}; - void create_tracing_session(tracing::trace_type type, tracing::trace_state_props_set props) { - _trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(type, props); - } - - tracing::trace_state_ptr& get_trace_state() { - return _trace_state_ptr; - } - - const tracing::trace_state_ptr& get_trace_state() const { - return _trace_state_ptr; - } - auth_state get_auth_state() const noexcept { return _auth_state; } @@ -344,7 +329,7 @@ public: } client_state_for_another_shard move_to_other_shard() { - return client_state_for_another_shard(this, _trace_state_ptr, _auth_service ? &_auth_service->container() : nullptr); + return client_state_for_another_shard(this, _auth_service ? &_auth_service->container() : nullptr); } #if 0 diff --git a/service/query_state.hh b/service/query_state.hh index 353e59cbd2..0e9e3a5452 100644 --- a/service/query_state.hh +++ b/service/query_state.hh @@ -38,7 +38,7 @@ private: public: query_state(client_state& client_state, service_permit permit) : _client_state(client_state) - , _trace_state_ptr(_client_state.get_trace_state()) + , _trace_state_ptr(tracing::trace_state_ptr()) , _permit(std::move(permit)) { }