In this series we add support for forwarding strongly consistent CQL requests to suitable replicas, so that clients can issue reads/writes to any node and have the request executed on an appropriate tablet replica (and, for writes, on the Raft leader). We return the same CQL response as what the user would get while sending the request to the correct replica and we perform the same logging/stats updates on the request coordinator as if the coordinator was the appropriate replica. The core mechanism of forwarding a strongly consistent request is sending an RPC containing the user's cql request frame to the appropriate replica and returning back a ready, serialized `cql_transport::response`. We do this in the CQL server - it is most prepared for handling these types and forwarding a request containing a CQL frame allows us to reuse near-top-level methods for CQL request handling in the new RPC handler (such as the general `process`) For sending the RPC, the CQL server needs to obtain the information about who should it forward the request to. This requires knowledge about the tablet raft group members and leader. We obtain this information during the execution of a `cql3/strong_consistency` statement, and we return this information back to the CQL server using the generalized `bounce_to_shard` `response_message`, where we now store the information about either a shard, or a specific replica to which we should forward to. Similarly to `bounce_to_shard`, we need to handle this `result_message` in a loop - a replica may move during statement execution, or the Raft leader can change. We also use it for forwarding strongly consistent writes when we're not a member of the affected tablet raft group - in that case we need to forward the statement twice - once to any replica of the affected tablet, then that replica can find the leader and return this information to the coordinator, which allows the second request to be directed to the leader. This feature also allows passing through exception messages which happened on the target replica while executing the statement. For that, many methods of the `cql_transport::cql_server::connection` for creating error responses needed to be moved to `cql_transport::cql_server`. And for final exception handling on the coordinator, we added additional error info to the RPC response, so that the handling can be performed without having the `result_message::exception` or `exception_ptr` itself. Fixes [SCYLLADB-71](https://scylladb.atlassian.net/browse/SCYLLADB-71) [SCYLLADB-71]: https://scylladb.atlassian.net/browse/SCYLLADB-71?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Closes scylladb/scylladb#27517 * github.com:scylladb/scylladb: test: add tests for CQL forwarding transport: enable CQL forwarding for strong consistency statements transport: add remote statement preparation for CQL forwarding transport: handle redirect responses in CQL forwarding transport: add exception handling for forwarded CQL requests transport: add basic CQL request forwarding idl: add a representation of client_state for forwarding cql_server: handle query, execute, batch in one case transport: inline process_on_shard in cql_server::process transport: extract process() to cql_server transport: add messaging_service to cql_server transport: add response reconstruction helpers for forwarding transport: generalize the bounce result message for bouncing to other nodes strong consistency: redirect requests to live replicas from the same rack transport: pass foreign_ptr into sleep_until_timeout_passes and move it to cql_server transport: extract the error handling from process_request_one transport: move error response helpers from connection to cql_server
2653 lines
128 KiB
C++
2653 lines
128 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "server.hh"
|
|
|
|
#include "cql3/statements/batch_statement.hh"
|
|
#include "cql3/statements/modification_statement.hh"
|
|
#include <seastar/core/scheduling.hh>
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/coroutine/switch_to.hh>
|
|
#include <seastar/coroutine/try_future.hh>
|
|
#include "types/collection.hh"
|
|
#include "types/list.hh"
|
|
#include "types/set.hh"
|
|
#include "types/map.hh"
|
|
#include "types/vector.hh"
|
|
#include "dht/token-sharding.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include "service/storage_service.hh"
|
|
#include "service/memory_limiter.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "service/qos/service_level_controller.hh"
|
|
#include "db/consistency_level_type.hh"
|
|
#include "db/write_type.hh"
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/coroutine/as_future.hh>
|
|
#include <seastar/coroutine/try_future.hh>
|
|
#include <seastar/net/byteorder.hh>
|
|
#include <seastar/core/metrics.hh>
|
|
#include <seastar/net/byteorder.hh>
|
|
#include <seastar/net/tls.hh>
|
|
#include <seastar/util/lazy.hh>
|
|
#include <seastar/util/short_streams.hh>
|
|
#include <seastar/core/execution_stage.hh>
|
|
#include "utils/assert.hh"
|
|
#include "utils/exception_container.hh"
|
|
#include "utils/log.hh"
|
|
#include "utils/result_try.hh"
|
|
#include "utils/result_combinators.hh"
|
|
#include "db/operation_type.hh"
|
|
|
|
#include "enum_set.hh"
|
|
#include "service/query_state.hh"
|
|
#include "service/client_state.hh"
|
|
#include "exceptions/exceptions.hh"
|
|
#include "client_data.hh"
|
|
#include "cql3/query_processor.hh"
|
|
#include "auth/authenticator.hh"
|
|
|
|
#include <cassert>
|
|
#include <string>
|
|
|
|
#include <snappy-c.h>
|
|
#include <lz4.h>
|
|
|
|
#include "response.hh"
|
|
#include "request.hh"
|
|
|
|
#include "types/user.hh"
|
|
|
|
#include "transport/cql_protocol_extension.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "idl/forward_cql.dist.hh"
|
|
#include "utils/bit_cast.hh"
|
|
#include "utils/labels.hh"
|
|
#include "utils/result.hh"
|
|
#include "utils/reusable_buffer.hh"
|
|
#include "utils/histogram_metrics_helper.hh"
|
|
|
|
template<typename T = void>
|
|
using coordinator_result = exceptions::coordinator_result<T>;
|
|
|
|
namespace cql_transport {
|
|
|
|
static logging::logger clogger("cql_server");
|
|
|
|
/**
|
|
* Skip registering CQL metrics for these SGs - these are internal scheduling groups that are not supposed to handle CQL
|
|
* requests.
|
|
*/
|
|
static const std::vector<sstring> non_cql_scheduling_classes_names = {
|
|
"atexit",
|
|
"background_reclaim",
|
|
"compaction",
|
|
"gossip",
|
|
"main",
|
|
"mem_compaction",
|
|
"memtable",
|
|
"memtable_to_cache",
|
|
"streaming"
|
|
};
|
|
|
|
struct cql_frame_error : std::exception {
|
|
const char* what() const throw () override {
|
|
return "bad cql binary frame";
|
|
}
|
|
};
|
|
|
|
inline int16_t consistency_to_wire(db::consistency_level c)
|
|
{
|
|
switch (c) {
|
|
case db::consistency_level::ANY: return 0x0000;
|
|
case db::consistency_level::ONE: return 0x0001;
|
|
case db::consistency_level::TWO: return 0x0002;
|
|
case db::consistency_level::THREE: return 0x0003;
|
|
case db::consistency_level::QUORUM: return 0x0004;
|
|
case db::consistency_level::ALL: return 0x0005;
|
|
case db::consistency_level::LOCAL_QUORUM: return 0x0006;
|
|
case db::consistency_level::EACH_QUORUM: return 0x0007;
|
|
case db::consistency_level::SERIAL: return 0x0008;
|
|
case db::consistency_level::LOCAL_SERIAL: return 0x0009;
|
|
case db::consistency_level::LOCAL_ONE: return 0x000A;
|
|
default: throw std::runtime_error("Invalid consistency level");
|
|
}
|
|
}
|
|
|
|
sstring to_string(const event::topology_change::change_type t) {
|
|
using type = event::topology_change::change_type;
|
|
switch (t) {
|
|
case type::NEW_NODE: return "NEW_NODE";
|
|
case type::REMOVED_NODE: return "REMOVED_NODE";
|
|
case type::MOVED_NODE: return "MOVED_NODE";
|
|
}
|
|
throw std::invalid_argument("unknown change type");
|
|
}
|
|
|
|
sstring to_string(const event::client_routes_change::change_type t) {
|
|
using type = event::client_routes_change::change_type;
|
|
switch (t) {
|
|
case type::UPDATE_NODES: return "UPDATE_NODES";
|
|
}
|
|
throw std::invalid_argument("unknown change type");
|
|
}
|
|
|
|
sstring to_string(cql_binary_opcode op) {
|
|
switch(op) {
|
|
case cql_binary_opcode::ERROR: return "ERROR";
|
|
case cql_binary_opcode::STARTUP: return "STARTUP";
|
|
case cql_binary_opcode::READY: return "READY";
|
|
case cql_binary_opcode::AUTHENTICATE: return "AUTHENTICATE";
|
|
case cql_binary_opcode::CREDENTIALS: return "CREDENTIALS";
|
|
case cql_binary_opcode::OPTIONS: return "OPTIONS";
|
|
case cql_binary_opcode::SUPPORTED: return "SUPPORTED";
|
|
case cql_binary_opcode::QUERY: return "QUERY";
|
|
case cql_binary_opcode::RESULT: return "RESULT";
|
|
case cql_binary_opcode::PREPARE: return "PREPARE";
|
|
case cql_binary_opcode::EXECUTE: return "EXECUTE";
|
|
case cql_binary_opcode::REGISTER: return "REGISTER";
|
|
case cql_binary_opcode::EVENT: return "EVENT";
|
|
case cql_binary_opcode::BATCH: return "BATCH";
|
|
case cql_binary_opcode::AUTH_CHALLENGE: return "AUTH_CHALLENGE";
|
|
case cql_binary_opcode::AUTH_RESPONSE: return "AUTH_RESPONSE";
|
|
case cql_binary_opcode::AUTH_SUCCESS: return "AUTH_SUCCESS";
|
|
case cql_binary_opcode::OPCODES_COUNT: return "OPCODES_COUNT";
|
|
}
|
|
return format("Unknown CQL binary opcode {}", static_cast<unsigned>(op));
|
|
}
|
|
|
|
sstring to_string(const event::status_change::status_type t) {
|
|
using type = event::status_change::status_type;
|
|
switch (t) {
|
|
case type::UP: return "UP";
|
|
case type::DOWN: return "DOWN";
|
|
}
|
|
throw std::invalid_argument("unknown change type");
|
|
}
|
|
|
|
sstring to_string(const event::schema_change::change_type t) {
|
|
switch (t) {
|
|
case event::schema_change::change_type::CREATED: return "CREATED";
|
|
case event::schema_change::change_type::UPDATED: return "UPDATED";
|
|
case event::schema_change::change_type::DROPPED: return "DROPPED";
|
|
}
|
|
SCYLLA_ASSERT(false && "unreachable");
|
|
}
|
|
|
|
sstring to_string(const event::schema_change::target_type t) {
|
|
switch (t) {
|
|
case event::schema_change::target_type::KEYSPACE: return "KEYSPACE";
|
|
case event::schema_change::target_type::TABLE: return "TABLE";
|
|
case event::schema_change::target_type::TYPE: return "TYPE";
|
|
case event::schema_change::target_type::FUNCTION: return "FUNCTION";
|
|
case event::schema_change::target_type::AGGREGATE:return "AGGREGATE";
|
|
}
|
|
SCYLLA_ASSERT(false && "unreachable");
|
|
}
|
|
|
|
bool is_metadata_id_supported(const service::client_state& client_state) {
|
|
// TODO: metadata_id is mandatory in CQLv5, so extend the check below
|
|
// when CQLv5 support is implemented
|
|
return client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::USE_METADATA_ID);
|
|
}
|
|
|
|
utils::result_with_exception<event::event_type, exceptions::protocol_exception>
|
|
parse_event_type(const sstring& value)
|
|
{
|
|
if (value == "TOPOLOGY_CHANGE") {
|
|
return event::event_type::TOPOLOGY_CHANGE;
|
|
} else if (value == "STATUS_CHANGE") {
|
|
return event::event_type::STATUS_CHANGE;
|
|
} else if (value == "SCHEMA_CHANGE") {
|
|
return event::event_type::SCHEMA_CHANGE;
|
|
} else if (value == "CLIENT_ROUTES_CHANGE") {
|
|
return event::event_type::CLIENT_ROUTES_CHANGE;
|
|
} else {
|
|
return exceptions::protocol_exception(format("Invalid value '{}' for Event.Type", value));
|
|
}
|
|
}
|
|
|
|
cql_sg_stats::cql_sg_stats(maintenance_socket_enabled used_by_maintenance_socket)
|
|
: _cql_requests_stats(static_cast<uint8_t>(cql_binary_opcode::OPCODES_COUNT))
|
|
{
|
|
if (used_by_maintenance_socket) {
|
|
return;
|
|
}
|
|
|
|
auto& vector_ref = non_cql_scheduling_classes_names;
|
|
if (std::find(vector_ref.begin(), vector_ref.end(), current_scheduling_group().name()) != vector_ref.end()) {
|
|
return;
|
|
}
|
|
|
|
_use_metrics = true;
|
|
register_metrics();
|
|
}
|
|
|
|
void cql_sg_stats::register_metrics()
|
|
{
|
|
namespace sm = seastar::metrics;
|
|
auto new_metrics = sm::metric_groups();
|
|
std::vector<sm::metric_definition> transport_metrics;
|
|
auto cur_sg_name = current_scheduling_group().name();
|
|
|
|
for (uint8_t i = 0; i < static_cast<uint8_t>(cql_binary_opcode::OPCODES_COUNT); ++i) {
|
|
cql_binary_opcode opcode = cql_binary_opcode{i};
|
|
|
|
transport_metrics.emplace_back(
|
|
sm::make_counter("cql_requests_count", [this, opcode] { return get_cql_opcode_stats(opcode).request_size.count(); },
|
|
sm::description("Counts the total number of CQL messages of a specific kind."),
|
|
{{"kind", to_string(opcode)}, {"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
|
|
);
|
|
|
|
transport_metrics.emplace_back(
|
|
sm::make_counter("cql_request_bytes", [this, opcode] { return get_cql_opcode_stats(opcode).request_size.sum(); },
|
|
sm::description("Counts the total number of received bytes in CQL messages of a specific kind."),
|
|
{{"kind", to_string(opcode)}, {"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
|
|
);
|
|
|
|
transport_metrics.emplace_back(
|
|
sm::make_counter("cql_response_bytes", [this, opcode] { return get_cql_opcode_stats(opcode).response_size.sum(); },
|
|
sm::description("Counts the total number of sent response bytes for CQL requests of a specific kind."),
|
|
{{"kind", to_string(opcode)}, {"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
|
|
);
|
|
if (opcode == cql_binary_opcode::EXECUTE || opcode == cql_binary_opcode::BATCH || opcode == cql_binary_opcode::QUERY) {
|
|
transport_metrics.emplace_back(
|
|
sm::make_histogram("cql_request_histogram_bytes", [this, opcode] { return to_metrics_histogram(get_cql_opcode_stats(opcode).request_size); },
|
|
sm::description("A histogram of received bytes in CQL messages of a specific kind and specific scheduling group."),
|
|
{{"kind", to_string(opcode)}, {"scheduling_group_name", cur_sg_name}}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
);
|
|
|
|
transport_metrics.emplace_back(
|
|
sm::make_histogram("cql_response_histogram_bytes", [this, opcode] { return to_metrics_histogram(get_cql_opcode_stats(opcode).response_size); },
|
|
sm::description("A histogram of sent response bytes in CQL messages of a specific kind and specific scheduling group."),
|
|
{{"kind", to_string(opcode)}, {"scheduling_group_name", cur_sg_name}}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()
|
|
);
|
|
}
|
|
}
|
|
|
|
new_metrics.add_group("transport", std::move(transport_metrics));
|
|
_metrics = std::exchange(new_metrics, {});
|
|
}
|
|
|
|
void cql_sg_stats::rename_metrics() {
|
|
if (_use_metrics) {
|
|
register_metrics();
|
|
}
|
|
}
|
|
|
|
cql_server::cql_server(sharded<cql3::query_processor>& qp, auth::service& auth_service,
|
|
service::memory_limiter& ml, cql_server_config config,
|
|
qos::service_level_controller& sl_controller, gms::gossiper& g, scheduling_group_key stats_key,
|
|
maintenance_socket_enabled used_by_maintenance_socket,
|
|
netw::messaging_service& ms)
|
|
: server("CQLServer", clogger, generic_server::config{std::move(config.uninitialized_connections_semaphore_cpu_concurrency), config.request_timeout_on_shutdown_in_seconds})
|
|
, _query_processor(qp)
|
|
, _ms(ms)
|
|
, _config(std::move(config))
|
|
, _memory_available(ml.get_semaphore())
|
|
, _total_memory(ml.total_memory())
|
|
, _notifier(std::make_unique<event_notifier>(*this))
|
|
, _auth_service(auth_service)
|
|
, _sl_controller(sl_controller)
|
|
, _gossiper(g)
|
|
, _stats_key(stats_key)
|
|
, _used_by_maintenance_socket(used_by_maintenance_socket)
|
|
{
|
|
namespace sm = seastar::metrics;
|
|
|
|
if (used_by_maintenance_socket) {
|
|
return;
|
|
}
|
|
|
|
init_messaging_service();
|
|
|
|
auto ls = {
|
|
sm::make_counter("cql-connections", _stats.connects,
|
|
sm::description("Counts a number of client connections.")),
|
|
|
|
sm::make_gauge("current_connections", _stats.connections,
|
|
sm::description("Holds a current number of client connections."))(basic_level),
|
|
|
|
sm::make_counter("requests_served", _stats.requests_served,
|
|
sm::description("Counts a number of served requests."))(basic_level),
|
|
|
|
sm::make_gauge("requests_serving", _stats.requests_serving,
|
|
sm::description("Holds a number of requests that are being processed right now.")),
|
|
|
|
sm::make_gauge("requests_blocked_memory_current", [this] { return _memory_available.waiters(); },
|
|
sm::description(
|
|
seastar::format("Holds the number of requests that are currently blocked due to reaching the memory quota limit ({}B). "
|
|
"Non-zero value indicates that our bottleneck is memory and more specifically - the memory quota allocated for the \"CQL transport\" component.", _config.max_request_size))),
|
|
sm::make_counter("requests_blocked_memory", _stats.requests_blocked_memory,
|
|
sm::description(
|
|
seastar::format("Holds an incrementing counter with the requests that ever blocked due to reaching the memory quota limit ({}B). "
|
|
"The first derivative of this value shows how often we block due to memory exhaustion in the \"CQL transport\" component.", _config.max_request_size))),
|
|
sm::make_counter("requests_shed", _stats.requests_shed,
|
|
sm::description("Holds an incrementing counter with the requests that were shed due to overload (threshold configured via max_concurrent_requests_per_shard). "
|
|
"The first derivative of this value shows how often we shed requests due to overload in the \"CQL transport\" component."))(basic_level),
|
|
sm::make_counter("connections_shed", _shed_connections,
|
|
sm::description("Holds an incrementing counter with the CQL connections that were shed due to concurrency semaphore timeout (threshold configured via uninitialized_connections_semaphore_cpu_concurrency). "
|
|
"This typically can happen during connection storm. ")),
|
|
sm::make_counter("connections_blocked", _blocked_connections,
|
|
sm::description("Holds an incrementing counter with the CQL connections that were blocked before being processed due to threshold configured via uninitialized_connections_semaphore_cpu_concurrency. "
|
|
"Blocks are normal when we have multiple connections initialized at once. If connections are timing out and this value is high it indicates either connections storm or unusually slow processing.")),
|
|
sm::make_gauge("requests_memory_available", [this] { return _memory_available.current(); },
|
|
sm::description(
|
|
seastar::format("Holds the amount of available memory for admitting new requests (max is {}B)."
|
|
"Zero value indicates that our bottleneck is memory and more specifically - the memory quota allocated for the \"CQL transport\" component.", _config.max_request_size))),
|
|
sm::make_counter("requests_forwarded_successfully", _stats.requests_forwarded_successfully,
|
|
sm::description("Counts the number of requests that were forwarded to another replica and executed successfully there.")).set_skip_when_empty(),
|
|
|
|
sm::make_counter("requests_forwarded_failed", _stats.requests_forwarded_failed,
|
|
sm::description("Counts the number of requests that were forwarded to another replica but failed to execute there.")).set_skip_when_empty(),
|
|
|
|
sm::make_counter("requests_forwarded_redirected", _stats.requests_forwarded_redirected,
|
|
sm::description("Counts the number of requests that were forwarded to another replica but that replica responded with a redirect to another node."
|
|
"This can happen when replica has stale information about the cluster topology or when the request is handled by a node that is not a replica for the data being accessed by the request.")).set_skip_when_empty(),
|
|
sm::make_counter("requests_forwarded_prepared_not_found", _stats.requests_forwarded_prepared_not_found,
|
|
sm::description("Counts the number of requests that were forwarded to another replica but failed there because the statement was not prepared on the target."
|
|
"When this happens, the coordinator performs an additional remote call to prepare the statement on the replica and retries the EXECUTE request afterwards.")).set_skip_when_empty(),
|
|
};
|
|
|
|
std::vector<sm::metric_definition> transport_metrics;
|
|
for (auto& m : ls) {
|
|
transport_metrics.emplace_back(std::move(m));
|
|
}
|
|
|
|
sm::label cql_error_label("type");
|
|
for (const auto& e : exceptions::exception_map()) {
|
|
_stats.errors.insert({e.first, 0});
|
|
auto label_instance = cql_error_label(e.second);
|
|
|
|
transport_metrics.emplace_back(
|
|
sm::make_counter("cql_errors_total", sm::description("Counts the total number of returned CQL errors."),
|
|
{label_instance, basic_level},
|
|
[this, code = e.first] { auto it = _stats.errors.find(code); return it != _stats.errors.end() ? it->second : 0; }).set_skip_when_empty()
|
|
);
|
|
}
|
|
|
|
_metrics.add_group("transport", std::move(transport_metrics));
|
|
}
|
|
|
|
cql_server::~cql_server() = default;
|
|
|
|
future<> cql_server::stop() {
|
|
co_await when_all_succeed(uninit_messaging_service(), server::stop()).discard_result();
|
|
}
|
|
|
|
void cql_server::init_messaging_service() {
|
|
ser::forward_cql_rpc_verbs::register_forward_cql_execute(&_ms,
|
|
[this](const rpc::client_info& cinfo, rpc::opt_time_point timeout, unsigned shard, forward_cql_execute_request req) -> future<forward_cql_execute_response> {
|
|
auto src_host = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
|
clogger.trace("Handling forwarded CQL request from {} on shard {}", src_host, shard);
|
|
|
|
co_await utils::get_local_injector().inject("wait_before_handling_forwarded_request", utils::wait_for_message(60s));
|
|
|
|
co_return co_await container().invoke_on(shard, [src_host, req = std::move(req)] (cql_server& shard_svc) mutable -> future<forward_cql_execute_response> {
|
|
service::client_state cs(shard_svc._auth_service,
|
|
&shard_svc._sl_controller,
|
|
std::move(req.client_state));
|
|
tracing::trace_state_ptr trace_state_ptr;
|
|
if (req.trace_info) {
|
|
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*req.trace_info);
|
|
tracing::begin(trace_state_ptr);
|
|
}
|
|
service::query_state qs(cs, trace_state_ptr, empty_service_permit());
|
|
tracing::trace(qs.get_trace_state(), "Handling forwarded CQL request from {} on shard {}", src_host, this_shard_id());
|
|
|
|
auto f = co_await coroutine::as_future(shard_svc.handle_forward_execute(qs, req));
|
|
if (f.failed()) {
|
|
auto eptr = f.get_exception();
|
|
if (auto* prepared_not_found_eptr = try_catch<exceptions::prepared_query_not_found_exception>(eptr)) {
|
|
clogger.trace("Prepared statement not found, returning prepared_not_found status");
|
|
tracing::trace(qs.get_trace_state(), "Prepared statement not found, returning prepared_not_found status");
|
|
co_return forward_cql_execute_response{
|
|
.status = forward_cql_status::prepared_not_found,
|
|
.prepared_id = prepared_not_found_eptr->id,
|
|
};
|
|
}
|
|
|
|
auto response = shard_svc.handle_exception(0, eptr, qs.get_trace_state(), req.cql_version, cs);
|
|
clogger.trace("Execution of forwarded request failed with an error");
|
|
tracing::trace(qs.get_trace_state(), "Execution of forwarded request failed with an error");
|
|
|
|
auto flags = response->flags();
|
|
co_return forward_cql_execute_response{
|
|
.status = forward_cql_status::error,
|
|
.response_body = std::move(*response.get()).extract_body(),
|
|
.response_flags = flags,
|
|
.timeout = shard_svc.timeout_for_sleep(eptr),
|
|
};
|
|
}
|
|
co_return f.get();
|
|
});
|
|
});
|
|
|
|
ser::forward_cql_rpc_verbs::register_forward_cql_prepare(&_ms,
|
|
[this](const rpc::client_info& cinfo, rpc::opt_time_point timeout, forward_cql_prepare_request req) -> future<bytes> {
|
|
auto src_host = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
|
clogger.trace("Handling forward prepare request from {}", src_host);
|
|
|
|
service::client_state cs(_auth_service, &_sl_controller, std::move(req.client_state));
|
|
tracing::trace_state_ptr trace_state_ptr;
|
|
if (req.trace_info) {
|
|
trace_state_ptr = tracing::tracing::get_local_tracing_instance().create_session(*req.trace_info);
|
|
tracing::begin(trace_state_ptr);
|
|
}
|
|
service::query_state qs(cs, trace_state_ptr, empty_service_permit());
|
|
|
|
co_await _query_processor.invoke_on_all(
|
|
[&] (cql3::query_processor& qp) {
|
|
return qp.prepare(req.query_string, qs.get_client_state(), req.dialect).discard_result();
|
|
});
|
|
auto prepare_id = _query_processor.local().compute_id(req.query_string, qs.get_client_state().get_raw_keyspace(), req.dialect).key();
|
|
clogger.trace("Successfully prepared statement {} from forward request", prepare_id);
|
|
co_return prepare_id;
|
|
});
|
|
}
|
|
|
|
future<> cql_server::uninit_messaging_service() {
|
|
co_await ser::forward_cql_rpc_verbs::unregister(&_ms);
|
|
}
|
|
|
|
future<forward_cql_execute_response> cql_server::handle_forward_execute(
|
|
service::query_state& qs, forward_cql_execute_request& req)
|
|
{
|
|
auto opcode = static_cast<cql_binary_opcode>(req.opcode);
|
|
|
|
// Build non-owning temporary_buffer fragments over the bytes_ostream chunks.
|
|
// req is passed by reference and outlives the istream, so null deleters are safe.
|
|
std::vector<temporary_buffer<char>> fragments;
|
|
for (bytes_view frag : req.request_buffer.fragments()) {
|
|
fragments.emplace_back(
|
|
const_cast<char*>(reinterpret_cast<const char*>(frag.data())),
|
|
frag.size(),
|
|
deleter{}
|
|
);
|
|
}
|
|
fragmented_temporary_buffer::istream is(fragments, req.request_buffer.size());
|
|
|
|
bytes_ostream temp_linearization_buffer;
|
|
request_reader in(std::move(is), temp_linearization_buffer);
|
|
|
|
auto result = co_await coroutine::try_future(process(
|
|
0, // stream is only used when writing the response to the client and we handle it on the forwarding node. Here we only prepare the response body
|
|
in,
|
|
qs.get_client_state(),
|
|
empty_service_permit(),
|
|
qs.get_trace_state(),
|
|
opcode,
|
|
req.cql_version,
|
|
req.dialect,
|
|
std::move(req.cached_fn_calls),
|
|
handling_node_bounce::yes));
|
|
|
|
if (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&result)) {
|
|
auto host = (*bounce_msg)->move_to_host();
|
|
auto shard = (*bounce_msg)->move_to_shard().value();
|
|
co_return forward_cql_execute_response{
|
|
.status = forward_cql_status::redirect,
|
|
.target_host = host,
|
|
.target_shard = shard,
|
|
};
|
|
}
|
|
|
|
auto& final_result = std::get<cql_server::result_with_foreign_response_ptr>(result);
|
|
|
|
if (!final_result) {
|
|
co_return co_await coroutine::try_future(final_result.assume_error().as_exception_future<forward_cql_execute_response>());
|
|
}
|
|
|
|
auto response = std::move(final_result).assume_value();
|
|
clogger.trace("Execution of forwarded request succeeded");
|
|
tracing::trace(qs.get_trace_state(), "Execution of forwarded request succeeded");
|
|
|
|
auto flags = response->flags();
|
|
co_return forward_cql_execute_response{
|
|
.status = forward_cql_status::success,
|
|
.response_body = std::move(*response.get()).extract_body(),
|
|
.response_flags = flags,
|
|
};
|
|
}
|
|
|
|
future<foreign_ptr<std::unique_ptr<cql_transport::response>>>
|
|
cql_server::forward_cql(
|
|
locator::host_id target_host,
|
|
unsigned target_shard,
|
|
seastar::lowres_clock::time_point timeout,
|
|
bool is_write,
|
|
uint16_t stream,
|
|
tracing::trace_state_ptr trace_state,
|
|
forward_cql_execute_request req)
|
|
{
|
|
clogger.trace("Forwarding CQL request to {} shard {}", target_host, target_shard);
|
|
tracing::trace(trace_state, "Forwarding CQL request to {} shard {}", target_host, target_shard);
|
|
|
|
locator::host_id current_host = target_host;
|
|
unsigned current_shard = target_shard;
|
|
|
|
while (true) {
|
|
auto response_fut = co_await coroutine::as_future(ser::forward_cql_rpc_verbs::send_forward_cql_execute(&_ms, current_host, timeout, current_shard, req));
|
|
|
|
if (response_fut.failed()) {
|
|
std::exception_ptr eptr = response_fut.get_exception();
|
|
if (try_catch<seastar::rpc::timeout_error>(eptr)) {
|
|
clogger.debug("Forwarding CQL request to {} shard {} failed due to RPC timeout. Will return blanket timeout exception", target_host, target_shard);
|
|
eptr = is_write
|
|
? std::make_exception_ptr(exceptions::mutation_write_timeout_exception(
|
|
format("Write request timed out while forwarding to {}", target_host), db::consistency_level::ONE, 0, 0, db::write_type::SIMPLE))
|
|
: std::make_exception_ptr(exceptions::read_timeout_exception(
|
|
format("Read request timed out while forwarding to {}", target_host), db::consistency_level::ONE, 0, 0, false));
|
|
} else if (try_catch<seastar::rpc::closed_error>(eptr)) {
|
|
clogger.debug("Forwarding CQL request to {} shard {} failed due to RPC closed connection. Will return blanket failure exception", target_host, target_shard);
|
|
eptr = is_write
|
|
? std::make_exception_ptr(exceptions::mutation_write_failure_exception(
|
|
format("Write request failed while forwarding to {}", target_host), db::consistency_level::ONE, 0, 0, 0, db::write_type::SIMPLE))
|
|
: std::make_exception_ptr(exceptions::read_failure_exception(
|
|
format("Read request failed while forwarding to {}", target_host), db::consistency_level::ONE, 0, 0, 0, false));
|
|
} else {
|
|
clogger.debug("Forwarding CQL request to {} shard {} failed with an unexpected error: {}. Will return this error to the client", target_host, target_shard, eptr);
|
|
}
|
|
co_return coroutine::exception(std::move(eptr));
|
|
}
|
|
|
|
auto response = response_fut.get();
|
|
switch (response.status) {
|
|
case forward_cql_status::success:
|
|
_stats.requests_forwarded_successfully++;
|
|
clogger.trace("Forwarded CQL request executed successfully on replica: {}", target_host);
|
|
tracing::trace(trace_state, "Forwarded CQL request executed successfully on replica: {}", target_host);
|
|
co_return std::make_unique<cql_transport::response>(stream, cql_binary_opcode::RESULT, response.response_flags, std::move(response.response_body));
|
|
case forward_cql_status::error: {
|
|
_stats.requests_forwarded_failed++;
|
|
clogger.trace("Forwarded CQL request failed on replica: {}", target_host);
|
|
tracing::trace(trace_state, "Forwarded CQL request failed on replica: {}", target_host);
|
|
auto result = std::make_unique<cql_transport::response>(stream, cql_binary_opcode::ERROR,
|
|
response.response_flags, std::move(response.response_body));
|
|
if (response.timeout) {
|
|
co_return co_await sleep_until_timeout_passes(*response.timeout, std::move(result));
|
|
}
|
|
co_return std::move(result);
|
|
}
|
|
case forward_cql_status::prepared_not_found: {
|
|
_stats.requests_forwarded_prepared_not_found++;
|
|
cql3::prepared_cache_key_type cache_key(std::move(response.prepared_id), req.dialect);
|
|
|
|
auto prepared = _query_processor.local().get_prepared(cache_key);
|
|
if (!prepared) {
|
|
clogger.info("Prepared statement with cache key {} not found locally for prepare request, cannot prepare on target {}", cache_key.key(), current_host);
|
|
co_return coroutine::exception(std::make_exception_ptr(exceptions::prepared_query_not_found_exception(cache_key.key())));
|
|
}
|
|
|
|
tracing::trace(trace_state, "Prepared statement not found on target, preparing query on target node");
|
|
clogger.trace("Prepared statement {} not found on {}, preparing query on target node", cache_key.key(), current_host);
|
|
|
|
forward_cql_prepare_request prepare_req{
|
|
.query_string = prepared->statement->raw_cql_statement,
|
|
.client_state = req.client_state,
|
|
.trace_info = req.trace_info,
|
|
.dialect = req.dialect,
|
|
};
|
|
|
|
auto prepared_id = co_await ser::forward_cql_rpc_verbs::send_forward_cql_prepare(&_ms, current_host, timeout, prepare_req);
|
|
|
|
if (prepared_id != cache_key.key()) {
|
|
on_internal_error(clogger, format("Prepared ID returned from target node does not match local prepared ID for the same query. Local ID: {}, Target ID: {}", cache_key.key(), prepared_id));
|
|
}
|
|
continue;
|
|
}
|
|
case forward_cql_status::redirect:
|
|
_stats.requests_forwarded_redirected++;
|
|
tracing::trace(trace_state, "Target node redirecting to {} shard {}", response.target_host, response.target_shard);
|
|
clogger.trace("Redirect from {} to {} shard {}", current_host, response.target_host, response.target_shard);
|
|
current_host = response.target_host;
|
|
current_shard = response.target_shard;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
shared_ptr<generic_server::connection>
|
|
cql_server::make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units<named_semaphore_exception_factory> initial_sem_units) {
|
|
return make_shared<connection>(*this, server_addr, std::move(fd), std::move(addr), sem, std::move(initial_sem_units));
|
|
}
|
|
|
|
unsigned
|
|
cql_server::connection::frame_size() const {
|
|
return 9;
|
|
}
|
|
|
|
utils::result_with_exception<cql_binary_frame_v3, exceptions::protocol_exception, cql_frame_error>
|
|
cql_server::connection::parse_frame(temporary_buffer<char> buf) const {
|
|
if (buf.size() != frame_size()) {
|
|
return cql_frame_error();
|
|
}
|
|
cql_binary_frame_v3 v3;
|
|
switch (_version) {
|
|
case 3:
|
|
case 4: {
|
|
cql_binary_frame_v3 raw = read_unaligned<cql_binary_frame_v3>(buf.get());
|
|
v3 = net::ntoh(raw);
|
|
break;
|
|
}
|
|
default:
|
|
return exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", _version));
|
|
}
|
|
if (v3.version != _version) {
|
|
return exceptions::protocol_exception(format("Invalid message version. Got {:d} but previous messages on this connection had version {:d}", v3.version, _version));
|
|
}
|
|
return v3;
|
|
}
|
|
|
|
future<std::optional<cql_binary_frame_v3>>
|
|
cql_server::connection::read_frame() {
|
|
using ret_type = std::optional<cql_binary_frame_v3>;
|
|
if (!_version) {
|
|
// We don't know the frame size before reading the first frame,
|
|
// so read just one byte, and then read the rest of the frame.
|
|
return _read_buf.read_exactly(1).then([this] (temporary_buffer<char> buf) {
|
|
if (buf.empty()) {
|
|
return make_ready_future<ret_type>();
|
|
}
|
|
_version = buf[0];
|
|
if (_version < 3 || _version > current_version) {
|
|
auto client_version = _version;
|
|
_version = current_version;
|
|
return make_exception_future<ret_type>(exceptions::protocol_exception(format("Invalid or unsupported protocol version: {:d}", client_version)));
|
|
}
|
|
|
|
return _read_buf.read_exactly(frame_size() - 1).then([this] (temporary_buffer<char> tail) {
|
|
temporary_buffer<char> full(frame_size());
|
|
full.get_write()[0] = _version;
|
|
std::copy(tail.get(), tail.get() + tail.size(), full.get_write() + 1);
|
|
auto frame = parse_frame(std::move(full));
|
|
if (!frame) {
|
|
return std::move(frame).assume_error().as_exception_future<ret_type>();
|
|
}
|
|
// This is the very first frame, so reject obviously incorrect frames, to
|
|
// avoid allocating large amounts of memory for the message body
|
|
if (frame.value().length > 100'000) {
|
|
// The STARTUP message body is a [string map] containing just a few options,
|
|
// so it should be smaller that 100kB. See #4366.
|
|
return make_exception_future<ret_type>(exceptions::protocol_exception(format("Initial message size too large ({:d}), rejecting as invalid", uint32_t(frame.value().length))));
|
|
}
|
|
return make_ready_future<ret_type>(std::move(frame).value());
|
|
});
|
|
});
|
|
} else {
|
|
// Not the first frame, so we know the size.
|
|
return _read_buf.read_exactly(frame_size()).then([this] (temporary_buffer<char> buf) {
|
|
if (buf.empty()) {
|
|
return make_ready_future<ret_type>();
|
|
}
|
|
auto frame = parse_frame(std::move(buf));
|
|
if (!frame) {
|
|
return std::move(frame).assume_error().as_exception_future<ret_type>();
|
|
};
|
|
return make_ready_future<ret_type>(std::move(frame).value());
|
|
});
|
|
}
|
|
}
|
|
|
|
// This function intentionally sleeps to the end of the query timeout in CQL server.
|
|
// It was introduced to remove similar waiting in storage_proxy (ref. scylladb#3699),
|
|
// because storage proxy was blocking ERM (thus topology changes).
|
|
future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::sleep_until_timeout_passes(const seastar::lowres_clock::time_point& timeout, foreign_ptr<std::unique_ptr<cql_server::response>>&& resp) {
|
|
auto time_left = timeout - seastar::lowres_clock::now();
|
|
return seastar::sleep_abortable(time_left, _abort_source).then_wrapped([resp = std::move(resp)](auto&& f) mutable {
|
|
if (f.failed()) {
|
|
clogger.debug("Got exception {} while waiting for a request timeout.", f.get_exception());
|
|
}
|
|
// Return timeout error no matter if sleep was aborted or not
|
|
return utils::result_into_future<result_with_foreign_response_ptr>(std::move(resp));
|
|
});
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_unavailable_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t required, int32_t alive, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_consistency(cl);
|
|
response->write_int(required);
|
|
response->write_int(alive);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_read_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_consistency(cl);
|
|
response->write_int(received);
|
|
response->write_int(blockfor);
|
|
response->write_byte(data_present);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state, cql_protocol_version_type version)
|
|
{
|
|
if (version < 4) {
|
|
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
|
|
}
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_consistency(cl);
|
|
response->write_int(received);
|
|
response->write_int(blockfor);
|
|
response->write_int(numfailures);
|
|
response->write_byte(data_present);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_mutation_write_timeout_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_consistency(cl);
|
|
response->write_int(received);
|
|
response->write_int(blockfor);
|
|
response->write_string(format("{}", type));
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state, cql_protocol_version_type version)
|
|
{
|
|
if (version < 4) {
|
|
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
|
|
}
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_consistency(cl);
|
|
response->write_int(received);
|
|
response->write_int(blockfor);
|
|
response->write_int(numfailures);
|
|
response->write_string(format("{}", type));
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_string(ks_name);
|
|
response->write_string(cf_name);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_short_bytes(id);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_string(ks_name);
|
|
response->write_string(func_name);
|
|
response->write_string_list(args);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state)
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::make_rate_limit_error(int16_t stream, exceptions::exception_code err, sstring msg, db::operation_type op_type, bool rejected_by_coordinator, const tracing::trace_state_ptr& tr_state, bool has_rate_limit_extension)
|
|
{
|
|
if (!has_rate_limit_extension) {
|
|
return make_error(stream, exceptions::exception_code::CONFIG_ERROR, std::move(msg), tr_state);
|
|
}
|
|
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
|
response->write_int(static_cast<int32_t>(err));
|
|
response->write_string(msg);
|
|
response->write_byte(static_cast<uint8_t>(op_type));
|
|
response->write_byte(static_cast<uint8_t>(rejected_by_coordinator));
|
|
return response;
|
|
}
|
|
|
|
std::optional<seastar::lowres_clock::time_point> cql_server::timeout_for_sleep(std::exception_ptr eptr) const {
|
|
if (auto* exp = try_catch<exceptions::read_failure_exception_with_timeout>(eptr)) {
|
|
return exp->_timeout;
|
|
}
|
|
return std::nullopt;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::handle_exception(int16_t stream, std::exception_ptr eptr, const tracing::trace_state_ptr& trace_state, cql_protocol_version_type version, const service::client_state& client_state) {
|
|
if (auto* exp = try_catch<exceptions::unavailable_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in unavailable_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_unavailable_error(stream, exp->code(), exp->what(), exp->consistency, exp->required, exp->alive, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::read_failure_exception_with_timeout>(eptr)) {
|
|
clogger.debug("{}: request resulted in read_failure_exception_with_timeout, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
// Return read timeout exception, as we wait here until the timeout passes
|
|
return make_read_timeout_error(stream, exp->_timeout_exception.code(), exp->_timeout_exception.what(), exp->_timeout_exception.consistency, exp->_timeout_exception.received, exp->_timeout_exception.block_for, exp->_timeout_exception.data_present, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::read_timeout_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in read_timeout_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_read_timeout_error(stream, exp->code(), exp->what(), exp->consistency, exp->received, exp->block_for, exp->data_present, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::read_failure_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in read_failure_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_read_failure_error(stream, exp->code(), exp->what(), exp->consistency, exp->received, exp->failures, exp->block_for, exp->data_present, trace_state, version);
|
|
} else if (auto* exp = try_catch<exceptions::mutation_write_timeout_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in mutation_write_timeout_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_mutation_write_timeout_error(stream, exp->code(), exp->what(), exp->consistency, exp->received, exp->block_for, exp->type, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::mutation_write_failure_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in mutation_write_failure_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_mutation_write_failure_error(stream, exp->code(), exp->what(), exp->consistency, exp->received, exp->failures, exp->block_for, exp->type, trace_state, version);
|
|
} else if (auto* exp = try_catch<exceptions::already_exists_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in already_exists_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_already_exists_error(stream, exp->code(), exp->what(), exp->ks_name, exp->cf_name, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::prepared_query_not_found_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in unprepared_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_unprepared_error(stream, exp->code(), exp->what(), exp->id, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::function_execution_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in function_failure_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_function_failure_error(stream, exp->code(), exp->what(), exp->ks_name, exp->func_name, exp->args, trace_state);
|
|
} else if (auto* exp = try_catch<exceptions::rate_limit_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in rate_limit_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_rate_limit_error(stream, exp->code(), exp->what(), exp->op_type, exp->rejected_by_coordinator, trace_state, client_state.is_protocol_extension_set(cql_protocol_extension::RATE_LIMIT_ERROR));
|
|
} else if (auto* exp = try_catch<exceptions::cassandra_exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in cassandra_error, stream {}, code {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->code(), exp->what());
|
|
// Note: the CQL protocol specifies that many types of errors have
|
|
// mandatory parameters. These cassandra_exception subclasses MUST
|
|
// be handled above. This default "cassandra_exception" case is
|
|
// only appropriate for the specific types of errors which do not have
|
|
// additional information, such as invalid_request_exception.
|
|
// TODO: consider listing those types explicitly, instead of the
|
|
// catch-all type cassandra_exception.
|
|
try { ++_stats.errors[exp->code()]; } catch(...) {}
|
|
return make_error(stream, exp->code(), exp->what(), trace_state);
|
|
} else if (auto* exp = try_catch<std::exception>(eptr)) {
|
|
clogger.debug("{}: request resulted in error, stream {}, message [{}]",
|
|
client_state.get_remote_address(), stream, exp->what());
|
|
try { ++_stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
|
|
sstring msg = exp->what();
|
|
try {
|
|
std::rethrow_if_nested(*exp);
|
|
} catch (...) {
|
|
std::ostringstream ss;
|
|
ss << msg << ": " << std::current_exception();
|
|
msg = ss.str();
|
|
}
|
|
return make_error(stream, exceptions::exception_code::SERVER_ERROR, msg, trace_state);
|
|
} else {
|
|
clogger.debug("{}: request resulted in unknown error, stream {}",
|
|
client_state.get_remote_address(), stream);
|
|
try { ++_stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
|
|
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
|
|
}
|
|
}
|
|
future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
|
cql_server::connection::process_request_one(fragmented_temporary_buffer::istream fbuf, uint8_t op, uint16_t stream, service::client_state& client_state, tracing_request_type tracing_request, service_permit permit) {
|
|
using auth_state = service::client_state::auth_state;
|
|
|
|
auto cqlop = static_cast<cql_binary_opcode>(op);
|
|
tracing::trace_state_props_set trace_props;
|
|
|
|
trace_props.set_if<tracing::trace_state_props::log_slow_query>(tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled());
|
|
trace_props.set_if<tracing::trace_state_props::full_tracing>(tracing_request != tracing_request_type::not_requested);
|
|
tracing::trace_state_ptr trace_state;
|
|
|
|
if (trace_props) {
|
|
if (cqlop == cql_binary_opcode::QUERY ||
|
|
cqlop == cql_binary_opcode::PREPARE ||
|
|
cqlop == cql_binary_opcode::EXECUTE ||
|
|
cqlop == cql_binary_opcode::BATCH) {
|
|
trace_props.set_if<tracing::trace_state_props::write_on_close>(tracing_request == tracing_request_type::write_on_close);
|
|
trace_state = tracing::tracing::get_local_tracing_instance().create_session(tracing::trace_type::QUERY, trace_props);
|
|
}
|
|
}
|
|
|
|
cql_sg_stats::request_kind_stats& cql_stats = _server.get_cql_opcode_stats(cqlop);
|
|
tracing::set_request_size(trace_state, fbuf.bytes_left());
|
|
cql_stats.request_size.add(fbuf.bytes_left());
|
|
|
|
auto linearization_buffer = std::make_unique<bytes_ostream>();
|
|
auto linearization_buffer_ptr = linearization_buffer.get();
|
|
return futurize_invoke([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
|
|
// When using authentication, we need to ensure we are doing proper state transitions,
|
|
// i.e. we cannot simply accept any query/exec ops unless auth is complete
|
|
switch (client_state.get_auth_state()) {
|
|
case auth_state::UNINITIALIZED:
|
|
if (cqlop != cql_binary_opcode::STARTUP && cqlop != cql_binary_opcode::OPTIONS) {
|
|
return make_exception_future<process_fn_return_type>(exceptions::protocol_exception(format("Unexpected message {:d}, expecting STARTUP or OPTIONS", int(cqlop))));
|
|
}
|
|
break;
|
|
case auth_state::AUTHENTICATION:
|
|
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
|
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
|
return make_exception_future<process_fn_return_type>(exceptions::protocol_exception(format("Unexpected message {:d}, expecting {}", int(cqlop), "SASL_RESPONSE")));
|
|
}
|
|
break;
|
|
case auth_state::READY: default:
|
|
if (cqlop == cql_binary_opcode::STARTUP) {
|
|
return make_exception_future<process_fn_return_type>(exceptions::protocol_exception("Unexpected message STARTUP, the connection is already initialized"));
|
|
}
|
|
break;
|
|
}
|
|
|
|
tracing::set_username(trace_state, client_state.user());
|
|
|
|
auto wrap_in_foreign = [] (future<std::unique_ptr<cql_server::response>> f) -> future<process_fn_return_type> {
|
|
return f.then([] (std::unique_ptr<cql_server::response> p) {
|
|
return make_ready_future<process_fn_return_type>(make_foreign(std::move(p)));
|
|
});
|
|
};
|
|
auto in = request_reader(std::move(fbuf), *linearization_buffer_ptr);
|
|
switch (cqlop) {
|
|
case cql_binary_opcode::STARTUP: return wrap_in_foreign(process_startup(stream, std::move(in), client_state, trace_state));
|
|
case cql_binary_opcode::AUTH_RESPONSE: return wrap_in_foreign(process_auth_response(stream, std::move(in), client_state, trace_state));
|
|
case cql_binary_opcode::OPTIONS: return wrap_in_foreign(process_options(stream, std::move(in), client_state, trace_state));
|
|
case cql_binary_opcode::PREPARE: return wrap_in_foreign(process_prepare(stream, std::move(in), client_state, trace_state));
|
|
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state, trace_state));
|
|
case cql_binary_opcode::QUERY:
|
|
case cql_binary_opcode::EXECUTE:
|
|
case cql_binary_opcode::BATCH:
|
|
return _server.process(stream, std::move(in), client_state, std::move(permit), trace_state, cqlop,
|
|
_version, get_dialect());
|
|
default: return make_exception_future<process_fn_return_type>(exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop))));
|
|
}
|
|
}).then_wrapped([this, cqlop, &cql_stats, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<process_fn_return_type> f) {
|
|
auto stop_trace = defer([&] {
|
|
tracing::stop_foreground(trace_state);
|
|
});
|
|
--_server._stats.requests_serving;
|
|
|
|
return seastar::futurize_invoke([&] () {
|
|
if (f.failed()) {
|
|
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(f).get_exception());
|
|
}
|
|
|
|
result_with_foreign_response_ptr res = std::get<result_with_foreign_response_ptr>(f.get());
|
|
if (!res) {
|
|
return std::move(res).assume_error().as_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>();
|
|
}
|
|
|
|
auto response = std::move(res).assume_value();
|
|
auto res_op = response->opcode();
|
|
|
|
// and modify state now that we've generated a response.
|
|
switch (client_state.get_auth_state()) {
|
|
case auth_state::UNINITIALIZED:
|
|
if (cqlop == cql_binary_opcode::STARTUP) {
|
|
if (res_op == cql_binary_opcode::AUTHENTICATE) {
|
|
client_state.set_auth_state(auth_state::AUTHENTICATION);
|
|
} else if (res_op == cql_binary_opcode::READY) {
|
|
client_state.set_auth_state(auth_state::READY);
|
|
}
|
|
}
|
|
break;
|
|
case auth_state::AUTHENTICATION:
|
|
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
|
|
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
|
|
return make_exception_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop))));
|
|
}
|
|
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
|
|
client_state.set_auth_state(auth_state::READY);
|
|
}
|
|
break;
|
|
default:
|
|
case auth_state::READY:
|
|
break;
|
|
}
|
|
|
|
tracing::set_response_size(trace_state, response->size());
|
|
cql_stats.response_size.add(response->size());
|
|
return make_ready_future<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(response));
|
|
}).handle_exception([this, stream, &client_state, trace_state] (std::exception_ptr eptr) {
|
|
auto response = _server.handle_exception(stream, eptr, trace_state, _version, client_state);
|
|
if (auto timeout = _server.timeout_for_sleep(eptr)) {
|
|
// Return read timeout exception, as we wait here until the timeout passes
|
|
return _server.sleep_until_timeout_passes(*timeout, std::move(response));
|
|
}
|
|
return utils::result_into_future<result_with_foreign_response_ptr>(std::move(response));
|
|
});
|
|
});
|
|
}
|
|
|
|
cql_server::connection::connection(cql_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units<named_semaphore_exception_factory> initial_sem_units)
|
|
: generic_server::connection{server, std::move(fd), sem, std::move(initial_sem_units)}
|
|
, _server(server)
|
|
, _server_addr(server_addr)
|
|
, _client_state(service::client_state::external_tag{}, server._auth_service, &server._sl_controller, server.timeout_config(), addr, bool(server._used_by_maintenance_socket))
|
|
, _current_scheduling_group(server.get_scheduling_group_for_new_connection())
|
|
{
|
|
_shedding_timer.set_callback([this] {
|
|
clogger.debug("Shedding all incoming requests due to overload");
|
|
_shed_incoming_requests = true;
|
|
});
|
|
++_server._stats.connects;
|
|
++_server._stats.connections;
|
|
if (clogger.is_enabled(logging::log_level::trace)) {
|
|
const auto ip = get_client_state().get_client_address().addr();
|
|
const auto port = get_client_state().get_client_port();
|
|
clogger.trace("Advertising new connection from CQL client {}:{}", ip, port);
|
|
}
|
|
}
|
|
|
|
cql_server::connection::~connection() {
|
|
_server._notifier->unregister_connection(this);
|
|
--_server._stats.connections;
|
|
if (clogger.is_enabled(logging::log_level::trace)) {
|
|
const auto ip = get_client_state().get_client_address().addr();
|
|
const auto port = get_client_state().get_client_port();
|
|
clogger.trace("Advertising disconnection of CQL client {}:{}", ip, port);
|
|
}
|
|
}
|
|
|
|
client_data cql_server::connection::make_client_data() const {
|
|
client_data cd;
|
|
cd.ip = _client_state.get_client_address().addr();
|
|
cd.port = _client_state.get_client_port();
|
|
cd.shard_id = this_shard_id();
|
|
cd.protocol_version = _version;
|
|
cd.driver_name = _client_state.get_driver_name();
|
|
cd.driver_version = _client_state.get_driver_version();
|
|
if (const auto user_ptr = _client_state.user(); user_ptr) {
|
|
cd.username = user_ptr->name;
|
|
}
|
|
if (_ready) {
|
|
cd.connection_stage = client_connection_stage::ready;
|
|
} else if (_authenticating) {
|
|
cd.connection_stage = client_connection_stage::authenticating;
|
|
}
|
|
cd.scheduling_group_name = _current_scheduling_group.name();
|
|
cd.client_options = _client_state.get_client_options();
|
|
|
|
cd.ssl_enabled = _ssl_enabled;
|
|
cd.ssl_protocol = _ssl_protocol;
|
|
cd.ssl_cipher_suite = _ssl_cipher_suite;
|
|
|
|
return cd;
|
|
}
|
|
|
|
thread_local cql_server::connection::execution_stage_type
|
|
cql_server::connection::_process_request_stage{"transport", &connection::process_request_one};
|
|
|
|
void cql_server::connection::handle_error(future<>&& f) {
|
|
if (!f.failed()) {
|
|
return;
|
|
}
|
|
|
|
std::exception_ptr eptr = f.get_exception();
|
|
|
|
if (auto* ex = try_catch<exceptions::cassandra_exception>(eptr)) {
|
|
clogger.debug("{}: connection error, code {}, message [{}]", _client_state.get_remote_address(), ex->code(), ex->what());
|
|
try { ++_server._stats.errors[ex->code()]; } catch(...) {}
|
|
write_response(_server.make_error(0, ex->code(), ex->what(), tracing::trace_state_ptr()));
|
|
} else if (auto* ex = try_catch<std::exception>(eptr)) {
|
|
clogger.debug("{}: connection error, message [{}]", _client_state.get_remote_address(), ex->what());
|
|
try { ++_server._stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
|
|
write_response(_server.make_error(0, exceptions::exception_code::SERVER_ERROR, ex->what(), tracing::trace_state_ptr()));
|
|
} else {
|
|
clogger.debug("{}: connection error, unknown error", _client_state.get_remote_address());
|
|
try { ++_server._stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
|
|
write_response(_server.make_error(0, exceptions::exception_code::SERVER_ERROR, "unknown error", tracing::trace_state_ptr()));
|
|
}
|
|
}
|
|
|
|
future<> cql_server::connection::process_request() {
|
|
return read_frame().then_wrapped([this] (future<std::optional<cql_binary_frame_v3>>&& v) {
|
|
if (v.failed()) {
|
|
return std::move(v).discard_result();
|
|
}
|
|
|
|
auto maybe_frame = v.get();
|
|
if (!maybe_frame) {
|
|
// eof
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
auto& f = *maybe_frame;
|
|
|
|
const bool allow_shedding = _client_state.get_workload_type() == service::client_state::workload_type::interactive;
|
|
if (allow_shedding && _shed_incoming_requests) {
|
|
++_server._stats.requests_shed;
|
|
return _read_buf.skip(f.length).then([this, stream = f.stream] {
|
|
const char* message = "request shed due to coordinator overload";
|
|
clogger.debug("{}: {}, stream {}", _client_state.get_remote_address(), message, uint16_t(stream));
|
|
write_response(_server.make_error(stream, exceptions::exception_code::OVERLOADED,
|
|
message, tracing::trace_state_ptr()));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
tracing_request_type tracing_requested = tracing_request_type::not_requested;
|
|
if (f.flags & cql_frame_flags::tracing) {
|
|
// If tracing is requested for a specific CQL command - flush
|
|
// tracing info right after the command is over.
|
|
tracing_requested = tracing_request_type::write_on_close;
|
|
} else if (tracing::tracing::get_local_tracing_instance().trace_next_query()) {
|
|
tracing_requested = tracing_request_type::no_write_on_close;
|
|
}
|
|
|
|
auto op = f.opcode;
|
|
auto stream = f.stream;
|
|
auto mem_estimate = f.length * 2 + 8000; // Allow for extra copies and bookkeeping
|
|
if (mem_estimate > _server._config.max_request_size) {
|
|
const auto message = format("request size too large (frame size {:d}; estimate {:d}; allowed {:d})",
|
|
uint32_t(f.length), mem_estimate, _server._config.max_request_size);
|
|
clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
|
|
write_response(_server.make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
|
|
return std::exchange(_ready_to_respond, make_ready_future<>())
|
|
.then([this] { return _read_buf.close(); })
|
|
.then([this] { return util::skip_entire_stream(_read_buf); });
|
|
}
|
|
// Add some upper bound estimation from previous parsing
|
|
if (op == uint8_t(cql_binary_opcode::QUERY) || op == uint8_t(cql_binary_opcode::PREPARE)) {
|
|
mem_estimate += _server._query_processor.local().parsing_cost_estimate();
|
|
}
|
|
// Cap the estimate, otherwise get_units call later would deadlock
|
|
mem_estimate = std::min(mem_estimate, _server._total_memory);
|
|
|
|
if (_server._stats.requests_serving > _server._config.max_concurrent_requests) {
|
|
++_server._stats.requests_shed;
|
|
return _read_buf.skip(f.length).then([this, stream = f.stream] {
|
|
const auto message = format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}",
|
|
_server._stats.requests_serving);
|
|
clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
|
|
write_response(_server.make_error(stream, exceptions::exception_code::OVERLOADED,
|
|
message,
|
|
tracing::trace_state_ptr()));
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
const auto shedding_timeout = std::chrono::milliseconds(50);
|
|
auto fut = allow_shedding
|
|
? get_units(_server._memory_available, mem_estimate, shedding_timeout).then_wrapped([this, length = f.length] (auto f) {
|
|
try {
|
|
return make_ready_future<semaphore_units<>>(f.get());
|
|
} catch (semaphore_timed_out& sto) {
|
|
// Cancel shedding in case no more requests are going to do that on completion
|
|
if (_pending_requests_gate.get_count() == 0) {
|
|
_shed_incoming_requests = false;
|
|
}
|
|
return _read_buf.skip(length).then([sto = std::move(sto)] () mutable {
|
|
return make_exception_future<semaphore_units<>>(std::move(sto));
|
|
});
|
|
}
|
|
})
|
|
: get_units(_server._memory_available, mem_estimate);
|
|
if (_server._memory_available.waiters()) {
|
|
if (allow_shedding && !_shedding_timer.armed()) {
|
|
_shedding_timer.arm(shedding_timeout);
|
|
}
|
|
++_server._stats.requests_blocked_memory;
|
|
}
|
|
|
|
return fut.then_wrapped([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (auto mem_permit_fut) {
|
|
if (mem_permit_fut.failed()) {
|
|
// Ignore semaphore errors - they are expected if load shedding took place
|
|
mem_permit_fut.ignore_ready_future();
|
|
return make_ready_future<>();
|
|
}
|
|
semaphore_units<> mem_permit = mem_permit_fut.get();
|
|
return this->read_and_decompress_frame(length, flags).then([this, op, stream, tracing_requested, mem_permit = make_service_permit(std::move(mem_permit))] (fragmented_temporary_buffer buf) mutable {
|
|
|
|
++_server._stats.requests_served;
|
|
++_server._stats.requests_serving;
|
|
|
|
_pending_requests_gate.enter();
|
|
auto leave = defer([this] {
|
|
_shedding_timer.cancel();
|
|
_shed_incoming_requests = false;
|
|
_pending_requests_gate.leave();
|
|
});
|
|
auto istream = buf.get_istream();
|
|
|
|
|
|
// Parallelize only the performance sensitive requests:
|
|
// QUERY, PREPARE, EXECUTE, BATCH
|
|
bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
|
|
op == uint8_t(cql_binary_opcode::PREPARE) ||
|
|
op == uint8_t (cql_binary_opcode::EXECUTE) ||
|
|
op == uint8_t(cql_binary_opcode::BATCH));
|
|
|
|
future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
|
|
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
|
|
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
|
|
|
|
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
|
try {
|
|
if (response_f.failed()) {
|
|
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
|
|
clogger.error("{}: {}", _client_state.get_remote_address(), message);
|
|
write_response(_server.make_error(stream, exceptions::exception_code::SERVER_ERROR,
|
|
message,
|
|
tracing::trace_state_ptr()));
|
|
} else {
|
|
write_response(response_f.get(), std::move(mem_permit), _compression);
|
|
}
|
|
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
|
} catch (...) {
|
|
clogger.error("{}: request processing failed: {}",
|
|
_client_state.get_remote_address(), std::current_exception());
|
|
}
|
|
});
|
|
|
|
if (should_paralelize) {
|
|
return make_ready_future<>();
|
|
} else {
|
|
return request_response_future;
|
|
}
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
// Contiguous buffers for use with compression primitives.
|
|
// Be careful when dealing with them, because they are shared and
|
|
// can be modified on preemption points.
|
|
// See the comments on reusable_buffer for a discussion.
|
|
static utils::reusable_buffer_guard input_buffer_guard() {
|
|
using namespace std::chrono_literals;
|
|
static thread_local utils::reusable_buffer<lowres_clock> buf(600s);
|
|
return buf;
|
|
}
|
|
static utils::reusable_buffer_guard output_buffer_guard() {
|
|
using namespace std::chrono_literals;
|
|
static thread_local utils::reusable_buffer<lowres_clock> buf(600s);
|
|
return buf;
|
|
}
|
|
|
|
future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_frame(size_t length, uint8_t flags)
|
|
{
|
|
if (flags & cql_frame_flags::compression) {
|
|
if (_compression == cql_compression::lz4) {
|
|
if (length < 4) {
|
|
return make_exception_future<fragmented_temporary_buffer>(std::runtime_error(fmt::format("CQL frame truncated: expected to have at least 4 bytes, got {}", length)));
|
|
}
|
|
return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
|
|
auto input_buffer = input_buffer_guard();
|
|
auto output_buffer = output_buffer_guard();
|
|
auto v = fragmented_temporary_buffer::view(buf);
|
|
int32_t uncomp_len = read_simple<int32_t>(v);
|
|
if (uncomp_len < 0) {
|
|
return make_exception_future<fragmented_temporary_buffer>(std::runtime_error("CQL frame uncompressed length is negative: " + std::to_string(uncomp_len)));
|
|
}
|
|
auto in = input_buffer.get_linearized_view(v);
|
|
return utils::result_into_future(output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) -> utils::result_with_exception<size_t, std::runtime_error> {
|
|
auto ret = LZ4_decompress_safe(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data()), in.size(), out.size());
|
|
if (ret < 0) {
|
|
return bo::failure(std::runtime_error("CQL frame LZ4 uncompression failure"));
|
|
}
|
|
if (static_cast<size_t>(ret) != out.size()) { // ret is known to be positive here
|
|
return bo::failure(std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size"));
|
|
}
|
|
return bo::success(static_cast<size_t>(ret));
|
|
}));
|
|
});
|
|
} else if (_compression == cql_compression::snappy) {
|
|
return _buffer_reader.read_exactly(_read_buf, length).then([] (fragmented_temporary_buffer buf) {
|
|
auto input_buffer = input_buffer_guard();
|
|
auto output_buffer = output_buffer_guard();
|
|
auto in = input_buffer.get_linearized_view(fragmented_temporary_buffer::view(buf));
|
|
size_t uncomp_len;
|
|
if (snappy_uncompressed_length(reinterpret_cast<const char*>(in.data()), in.size(), &uncomp_len) != SNAPPY_OK) {
|
|
return make_exception_future<fragmented_temporary_buffer>(std::runtime_error("CQL frame Snappy uncompressed size is unknown"));
|
|
}
|
|
return utils::result_into_future(output_buffer.make_fragmented_temporary_buffer(uncomp_len, [&in] (bytes_mutable_view out) -> utils::result_with_exception<size_t, std::runtime_error> {
|
|
size_t output_len = out.size();
|
|
if (snappy_uncompress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &output_len) != SNAPPY_OK) {
|
|
return bo::failure(std::runtime_error("CQL frame Snappy uncompression failure"));
|
|
}
|
|
if (output_len != out.size()) {
|
|
return bo::failure(std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size"));
|
|
}
|
|
return bo::success(output_len);
|
|
}));
|
|
});
|
|
} else {
|
|
return make_exception_future<fragmented_temporary_buffer>(exceptions::protocol_exception("Unknown compression algorithm"));
|
|
}
|
|
}
|
|
return _buffer_reader.read_exactly(_read_buf, length);
|
|
}
|
|
|
|
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_startup(uint16_t stream, request_reader in, service::client_state& client_state,
|
|
tracing::trace_state_ptr trace_state) {
|
|
utils::result_with_exception_ptr<std::unordered_map<sstring, sstring>> o = in.read_string_map();
|
|
if (!o) {
|
|
co_return coroutine::exception(std::move(o).assume_error());
|
|
}
|
|
std::unordered_map<sstring, sstring> options = std::move(o).assume_value();
|
|
auto compression_opt = options.find("COMPRESSION");
|
|
if (compression_opt != options.end()) {
|
|
auto compression = compression_opt->second;
|
|
std::transform(compression.begin(), compression.end(), compression.begin(), ::tolower);
|
|
if (compression == "lz4") {
|
|
_compression = cql_compression::lz4;
|
|
} else if (compression == "snappy") {
|
|
_compression = cql_compression::snappy;
|
|
} else {
|
|
co_return coroutine::exception(std::make_exception_ptr(exceptions::protocol_exception(format("Unknown compression algorithm: {}", compression))));
|
|
}
|
|
}
|
|
|
|
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
|
|
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
|
|
}
|
|
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
|
|
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
|
|
}
|
|
|
|
// Store all received client options for later exposure in the system.clients 'client_options' column
|
|
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
|
|
// identical key/value sets across multiple connections (e.g., same driver name/version).
|
|
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
|
|
|
|
cql_protocol_extension_enum_set cql_proto_exts;
|
|
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
|
if (options.contains(protocol_extension_name(ext))) {
|
|
cql_proto_exts.set(ext);
|
|
}
|
|
}
|
|
_client_state.set_protocol_extensions(std::move(cql_proto_exts));
|
|
std::unique_ptr<cql_server::response> res;
|
|
if (auto& a = client_state.get_auth_service()->underlying_authenticator(); a.require_authentication()) {
|
|
_authenticating = true;
|
|
auto opt_user = co_await a.authenticate([this]() -> future<std::optional<auth::certificate_info>> {
|
|
auto dn_info = co_await tls::get_dn_information(this->_fd);
|
|
if (dn_info) {
|
|
co_return auth::certificate_info{ dn_info->subject, [this]() -> future<std::string> {
|
|
auto altnames = co_await tls::get_alt_name_information(this->_fd);
|
|
auto res = fmt::format("{}", fmt::join(altnames, ","));
|
|
co_return res;
|
|
} };
|
|
}
|
|
co_return std::nullopt;
|
|
});
|
|
if (opt_user) {
|
|
client_state.set_login(std::move(*opt_user));
|
|
co_await client_state.check_user_can_login();
|
|
client_state.maybe_update_per_service_level_params();
|
|
res = make_ready(stream, trace_state);
|
|
} else {
|
|
res = make_autheticate(stream, a.qualified_java_name(), trace_state);
|
|
}
|
|
} else {
|
|
update_scheduling_group();
|
|
_ready = true;
|
|
on_connection_ready();
|
|
res = make_ready(stream, trace_state);
|
|
}
|
|
|
|
co_return res;
|
|
}
|
|
|
|
void cql_server::connection::update_user_scheduling_group(const std::optional<auth::authenticated_user>& usr) {
|
|
auto shg_grp = _server._sl_controller.get_cached_user_scheduling_group(usr);
|
|
_current_scheduling_group = shg_grp;
|
|
switch_tenant([this, usr] (this auto self, noncopyable_function<future<> ()> process_loop) -> future<> {
|
|
co_return co_await _server._sl_controller.with_user_service_level(usr, std::move(process_loop));
|
|
});
|
|
}
|
|
|
|
void cql_server::connection::update_control_connection_scheduling_group() {
|
|
auto shg_grp = _server._sl_controller.get_scheduling_group(qos::service_level_controller::driver_service_level_name);
|
|
_current_scheduling_group = shg_grp;
|
|
switch_tenant([this] (this auto self, noncopyable_function<future<> ()> process_loop) -> future<> {
|
|
co_return co_await _server._sl_controller.with_service_level(qos::service_level_controller::driver_service_level_name, std::move(process_loop));
|
|
});
|
|
}
|
|
|
|
void cql_server::connection::update_scheduling_group() {
|
|
if (_client_state.is_control_connection()) {
|
|
update_control_connection_scheduling_group();
|
|
} else {
|
|
update_user_scheduling_group(_client_state.user());
|
|
}
|
|
}
|
|
|
|
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_auth_response(uint16_t stream, request_reader in, service::client_state& client_state,
|
|
tracing::trace_state_ptr trace_state) {
|
|
auto sasl_challenge = client_state.get_auth_service()->underlying_authenticator().new_sasl_challenge();
|
|
utils::result_with_exception_ptr<bytes_view> buf = in.read_raw_bytes_view(in.bytes_left());
|
|
if (!buf) {
|
|
return make_exception_future<std::unique_ptr<cql_server::response>>(std::move(buf).assume_error());
|
|
}
|
|
auto challenge = sasl_challenge->evaluate_response(buf.assume_value());
|
|
if (sasl_challenge->is_complete()) {
|
|
return sasl_challenge->get_authenticated_user().then_wrapped([this, sasl_challenge, stream, &client_state, challenge = std::move(challenge), trace_state](future<auth::authenticated_user> f) mutable {
|
|
bool failed = f.failed();
|
|
return audit::inspect_login(sasl_challenge->get_username(), client_state.get_client_address().addr(), failed).then(
|
|
[this, stream, challenge = std::move(challenge), &client_state, sasl_challenge, ff = std::move(f), trace_state = std::move(trace_state)] () mutable {
|
|
client_state.set_login(ff.get());
|
|
update_scheduling_group();
|
|
auto f = client_state.check_user_can_login();
|
|
return f.then([this, &client_state, stream, challenge = std::move(challenge), trace_state]() mutable {
|
|
client_state.maybe_update_per_service_level_params();
|
|
_authenticating = false;
|
|
_ready = true;
|
|
on_connection_ready();
|
|
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_success(stream, std::move(challenge), trace_state));
|
|
});
|
|
});
|
|
});
|
|
}
|
|
return make_ready_future<std::unique_ptr<cql_server::response>>(make_auth_challenge(stream, std::move(challenge), trace_state));
|
|
}
|
|
|
|
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_options(uint16_t stream, request_reader in, service::client_state& client_state,
|
|
tracing::trace_state_ptr trace_state) {
|
|
return make_ready_future<std::unique_ptr<cql_server::response>>(make_supported(stream, std::move(trace_state)));
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
make_result(int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
|
|
cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata = false);
|
|
|
|
static inline cql_server::result_with_foreign_response_ptr convert_error_message_to_coordinator_result(messages::result_message* msg) {
|
|
return std::move(*dynamic_cast<messages::result_message::exception*>(msg)).get_exception();
|
|
}
|
|
|
|
static future<cql_server::process_fn_return_type>
|
|
process_query_internal(service::client_state& client_state, sharded<cql3::query_processor>& qp, request_reader in,
|
|
uint16_t stream, cql_protocol_version_type version,
|
|
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, cql3::computed_function_values cached_pk_fn_calls,
|
|
cql3::dialect dialect) {
|
|
utils::result_with_exception_ptr<std::string_view> query = in.read_long_string_view();
|
|
if (!query) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(query).assume_error());
|
|
}
|
|
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
|
auto& query_state = q_state->query_state;
|
|
auto o = in.read_options(version, qp.local().get_cql_config());
|
|
if (!o) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
|
|
}
|
|
q_state->options = std::move(o).assume_value();
|
|
auto& options = *q_state->options;
|
|
if (!cached_pk_fn_calls.empty()) {
|
|
options.set_cached_pk_function_calls(std::move(cached_pk_fn_calls));
|
|
}
|
|
auto skip_metadata = options.skip_metadata();
|
|
|
|
if (init_trace) {
|
|
tracing::set_page_size(trace_state, options.get_page_size());
|
|
tracing::add_query(trace_state, query.assume_value());
|
|
tracing::set_common_query_parameters(trace_state, options.get_consistency(),
|
|
options.get_serial_consistency(), options.get_specific_options().timestamp);
|
|
|
|
tracing::begin(trace_state, "Execute CQL3 query", client_state.get_client_address());
|
|
}
|
|
|
|
return qp.local().execute_direct_without_checking_exception_message(query.assume_value(), query_state, dialect, options).then([q_state = std::move(q_state), stream, skip_metadata, version] (auto msg) {
|
|
if (msg->move_to_shard()) {
|
|
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
|
} else if (msg->is_exception()) {
|
|
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
|
} else {
|
|
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
|
|
|
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, cql_metadata_id_wrapper{}, skip_metadata)));
|
|
}
|
|
});
|
|
}
|
|
|
|
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_prepare(uint16_t stream, request_reader in, service::client_state& client_state,
|
|
tracing::trace_state_ptr trace_state) {
|
|
|
|
utils::result_with_exception_ptr<std::string_view> query_sv = in.read_long_string_view();
|
|
if (!query_sv) {
|
|
return make_exception_future<std::unique_ptr<cql_server::response>>(std::move(query_sv).assume_error());
|
|
}
|
|
auto query = sstring(query_sv.assume_value());
|
|
auto dialect = get_dialect();
|
|
|
|
tracing::add_query(trace_state, query);
|
|
tracing::begin(trace_state, "Preparing CQL3 query", client_state.get_client_address());
|
|
|
|
return _server._query_processor.invoke_on_others([query, &client_state, dialect] (auto& qp) mutable {
|
|
return qp.prepare(std::move(query), client_state, dialect).discard_result();
|
|
}).then([this, query, stream, &client_state, trace_state, dialect] () mutable {
|
|
tracing::trace(trace_state, "Done preparing on remote shards");
|
|
return _server._query_processor.local().prepare(std::move(query), client_state, dialect).then([this, stream, &client_state, trace_state] (auto msg) {
|
|
tracing::trace(trace_state, "Done preparing on a local shard - preparing a result. ID is [{}]", seastar::value_of([&msg] {
|
|
return messages::result_message::prepared::cql::get_id(msg);
|
|
}));
|
|
cql_metadata_id_wrapper metadata_id = is_metadata_id_supported(client_state)
|
|
? cql_metadata_id_wrapper(msg->get_metadata_id())
|
|
: cql_metadata_id_wrapper();
|
|
return make_result(stream, *msg, trace_state, _version, std::move(metadata_id));
|
|
});
|
|
});
|
|
}
|
|
|
|
static future<cql_server::process_fn_return_type>
|
|
process_execute_internal(service::client_state& client_state, sharded<cql3::query_processor>& qp, request_reader in,
|
|
uint16_t stream, cql_protocol_version_type version,
|
|
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, cql3::computed_function_values cached_pk_fn_calls,
|
|
cql3::dialect dialect) {
|
|
utils::result_with_exception_ptr<bytes> cache_key_bytes = in.read_short_bytes();
|
|
if (!cache_key_bytes) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(cache_key_bytes).assume_error());
|
|
}
|
|
cql3::prepared_cache_key_type cache_key(cache_key_bytes.assume_value(), dialect);
|
|
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
|
bool needs_authorization = false;
|
|
|
|
// First, try to lookup in the cache of already authorized statements. If the corresponding entry is not found there
|
|
// look for the prepared statement and then authorize it.
|
|
auto prepared = qp.local().get_prepared(client_state.user(), cache_key);
|
|
if (!prepared) {
|
|
needs_authorization = true;
|
|
prepared = qp.local().get_prepared(cache_key);
|
|
}
|
|
|
|
if (!prepared) {
|
|
throw exceptions::prepared_query_not_found_exception(id);
|
|
}
|
|
|
|
cql_metadata_id_wrapper metadata_id = cql_metadata_id_wrapper();
|
|
if (is_metadata_id_supported(client_state)) {
|
|
utils::result_with_exception_ptr<bytes> metadata_id_bytes = in.read_short_bytes();
|
|
if (!metadata_id_bytes) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(metadata_id_bytes).assume_error());
|
|
}
|
|
metadata_id = cql_metadata_id_wrapper(cql3::cql_metadata_id_type(std::move(metadata_id_bytes).assume_value()), prepared->get_metadata_id());
|
|
}
|
|
|
|
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
|
auto& query_state = q_state->query_state;
|
|
auto o = in.read_options(version, qp.local().get_cql_config());
|
|
if (!o) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
|
|
}
|
|
q_state->options = std::move(o).assume_value();
|
|
auto& options = *q_state->options;
|
|
if (!cached_pk_fn_calls.empty()) {
|
|
options.set_cached_pk_function_calls(std::move(cached_pk_fn_calls));
|
|
}
|
|
auto skip_metadata = options.skip_metadata();
|
|
|
|
if (init_trace) {
|
|
tracing::set_page_size(trace_state, options.get_page_size());
|
|
tracing::add_query(trace_state, prepared->statement->raw_cql_statement);
|
|
tracing::add_prepared_statement(trace_state, prepared);
|
|
tracing::set_common_query_parameters(trace_state, options.get_consistency(),
|
|
options.get_serial_consistency(), options.get_specific_options().timestamp);
|
|
|
|
tracing::begin(trace_state, seastar::value_of([&id] { return seastar::format("Execute CQL3 prepared query [{}]", id); }),
|
|
client_state.get_client_address());
|
|
}
|
|
|
|
auto stmt = prepared->statement;
|
|
tracing::trace(query_state.get_trace_state(), "Checking bounds");
|
|
if (stmt->get_bound_terms() != options.get_values_count()) {
|
|
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
|
|
stmt->get_bound_terms(),
|
|
options.get_values_count());
|
|
tracing::trace(query_state.get_trace_state(), "{}", msg);
|
|
throw exceptions::invalid_request_exception(msg);
|
|
}
|
|
|
|
options.prepare(prepared->bound_names);
|
|
|
|
if (init_trace) {
|
|
tracing::add_prepared_query_options(trace_state, options);
|
|
}
|
|
|
|
tracing::trace(trace_state, "Processing a statement");
|
|
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
|
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
|
|
if (msg->move_to_shard()) {
|
|
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
|
} else if (msg->is_exception()) {
|
|
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
|
} else {
|
|
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
|
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
|
}
|
|
});
|
|
}
|
|
|
|
static future<cql_server::process_fn_return_type>
|
|
process_batch_internal(service::client_state& client_state, sharded<cql3::query_processor>& qp, request_reader in,
|
|
uint16_t stream, cql_protocol_version_type version,
|
|
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, cql3::computed_function_values cached_pk_fn_calls, cql3::dialect dialect) {
|
|
const utils::result_with_exception_ptr<int8_t> type = in.read_byte();
|
|
if (!type) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(type).assume_error());
|
|
}
|
|
const utils::result_with_exception_ptr<uint16_t> n = in.read_short();
|
|
if (!n) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(n).assume_error());
|
|
}
|
|
|
|
std::vector<cql3::statements::batch_statement::single_statement> modifications;
|
|
std::vector<cql3::raw_value_view_vector_with_unset> values;
|
|
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
|
|
|
|
modifications.reserve(n.assume_value());
|
|
values.reserve(n.assume_value());
|
|
|
|
if (init_trace) {
|
|
tracing::begin(trace_state, "Execute batch of CQL3 queries", client_state.get_client_address());
|
|
}
|
|
|
|
for ([[gnu::unused]] auto i : std::views::iota(0u, n.assume_value())) {
|
|
const utils::result_with_exception_ptr<int8_t> kind = in.read_byte();
|
|
if (!kind) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(kind).assume_error());
|
|
}
|
|
|
|
std::unique_ptr<cql3::statements::prepared_statement> stmt_ptr;
|
|
cql3::statements::prepared_statement::checked_weak_ptr ps;
|
|
bool needs_authorization(kind.assume_value() == 0);
|
|
|
|
switch (kind.assume_value()) {
|
|
case 0: {
|
|
utils::result_with_exception_ptr<std::string_view> query = in.read_long_string_view();
|
|
if (!query) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(query).assume_error());
|
|
}
|
|
stmt_ptr = qp.local().get_statement(query.assume_value(), client_state, dialect);
|
|
ps = stmt_ptr->checked_weak_from_this();
|
|
if (init_trace) {
|
|
tracing::add_query(trace_state, query.assume_value());
|
|
}
|
|
break;
|
|
}
|
|
case 1: {
|
|
utils::result_with_exception_ptr<bytes> cache_key_bytes = in.read_short_bytes();
|
|
if (!cache_key_bytes) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(cache_key_bytes).assume_error());
|
|
}
|
|
cql3::prepared_cache_key_type cache_key(cache_key_bytes.assume_value(), dialect);
|
|
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
|
|
|
// First, try to lookup in the cache of already authorized statements. If the corresponding entry is not found there
|
|
// look for the prepared statement and then authorize it.
|
|
ps = qp.local().get_prepared(client_state.user(), cache_key);
|
|
if (!ps) {
|
|
ps = qp.local().get_prepared(cache_key);
|
|
if (!ps) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(exceptions::prepared_query_not_found_exception(id));
|
|
}
|
|
// authorize a particular prepared statement only once
|
|
needs_authorization = pending_authorization_entries.emplace(std::move(cache_key), ps->checked_weak_from_this()).second;
|
|
}
|
|
if (init_trace) {
|
|
tracing::add_query(trace_state, ps->statement->raw_cql_statement);
|
|
}
|
|
break;
|
|
}
|
|
default:
|
|
return make_exception_future<cql_server::process_fn_return_type>(exceptions::protocol_exception(
|
|
"Invalid query kind in BATCH messages. Must be 0 or 1 but got "
|
|
+ std::to_string(int(kind.assume_value()))));
|
|
}
|
|
|
|
if (dynamic_cast<cql3::statements::modification_statement*>(ps->statement.get()) == nullptr) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(exceptions::invalid_request_exception("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed."));
|
|
}
|
|
::shared_ptr<cql3::statements::modification_statement> modif_statement_ptr = static_pointer_cast<cql3::statements::modification_statement>(ps->statement);
|
|
if (init_trace) {
|
|
tracing::add_table_name(trace_state, modif_statement_ptr->keyspace(), modif_statement_ptr->column_family());
|
|
tracing::add_prepared_statement(trace_state, ps);
|
|
}
|
|
|
|
modifications.emplace_back(std::move(modif_statement_ptr), needs_authorization);
|
|
|
|
std::vector<cql3::raw_value_view> tmp;
|
|
cql3::unset_bind_variable_vector unset;
|
|
auto rvl = in.read_value_view_list(version, tmp, unset);
|
|
if (!rvl) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(rvl).assume_error());
|
|
}
|
|
|
|
auto stmt = ps->statement;
|
|
if (stmt->get_bound_terms() != tmp.size()) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(
|
|
exceptions::invalid_request_exception(format("There were {:d} markers(?) in CQL but {:d} bound variables",
|
|
stmt->get_bound_terms(), tmp.size())));
|
|
}
|
|
values.emplace_back(cql3::raw_value_view_vector_with_unset(std::move(tmp), std::move(unset)));
|
|
}
|
|
|
|
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
|
auto& query_state = q_state->query_state;
|
|
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
|
auto o = in.read_options(version, qp.local().get_cql_config());
|
|
if (!o) {
|
|
return make_exception_future<cql_server::process_fn_return_type>(std::move(o).assume_error());
|
|
}
|
|
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*o.assume_value()), std::move(values)));
|
|
auto& options = *q_state->options;
|
|
if (!cached_pk_fn_calls.empty()) {
|
|
options.set_cached_pk_function_calls(std::move(cached_pk_fn_calls));
|
|
}
|
|
|
|
if (init_trace) {
|
|
tracing::add_prepared_query_options(trace_state, options);
|
|
tracing::set_common_query_parameters(trace_state, options.get_consistency(),
|
|
options.get_serial_consistency(), options.get_specific_options().timestamp);
|
|
|
|
tracing::trace(trace_state, "Creating a batch statement");
|
|
}
|
|
|
|
auto batch = ::make_shared<cql3::statements::batch_statement>(cql3::statements::batch_statement::type(type.assume_value()), std::move(modifications), cql3::attributes::none(), qp.local().get_cql_stats());
|
|
return qp.local().execute_batch_without_checking_exception_message(batch, query_state, options, std::move(pending_authorization_entries))
|
|
.then([stream, batch, q_state = std::move(q_state), trace_state = query_state.get_trace_state(), version] (auto msg) {
|
|
if (msg->move_to_shard()) {
|
|
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
|
} else if (msg->is_exception()) {
|
|
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
|
} else {
|
|
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
|
|
|
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, trace_state, version, cql_metadata_id_wrapper{})));
|
|
}
|
|
});
|
|
}
|
|
|
|
// Copy the bytes from an istream into a bytes_ostream.
|
|
// This preserves fragmentation and avoids a single large allocation
|
|
// and the bytes can be used in an RPC verb
|
|
static bytes_ostream copy_istream(fragmented_temporary_buffer::istream is) {
|
|
bytes_ostream result;
|
|
auto view = is.read_view(is.bytes_left());
|
|
if (!view) {
|
|
std::rethrow_exception(view.assume_error());
|
|
}
|
|
result.write(view.assume_value());
|
|
return result;
|
|
}
|
|
|
|
future<cql_server::process_fn_return_type>
|
|
cql_server::process(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit,
|
|
tracing::trace_state_ptr trace_state, cql_binary_opcode opcode, cql_protocol_version_type version, cql3::dialect dialect,
|
|
cql3::computed_function_values cached_fn_calls, handling_node_bounce bounced)
|
|
{
|
|
fragmented_temporary_buffer::istream is = in.get_stream();
|
|
|
|
auto process_fn = [] (cql_binary_opcode op) {
|
|
switch (op) {
|
|
case cql_binary_opcode::EXECUTE:
|
|
return process_execute_internal;
|
|
case cql_binary_opcode::QUERY:
|
|
return process_query_internal;
|
|
case cql_binary_opcode::BATCH:
|
|
return process_batch_internal;
|
|
default:
|
|
throw std::runtime_error(format("Unsupported opcode for processing: {}", static_cast<int>(op)));
|
|
}
|
|
} (opcode);
|
|
|
|
bool init_trace = (bool)!bounced; // If the request was bounced, we already started the trace in the handler
|
|
auto msg = co_await coroutine::try_future(process_fn(client_state, _query_processor, in, stream,
|
|
version, permit, trace_state, init_trace, {}, dialect));
|
|
while (auto* bounce_msg = std::get_if<cql_server::result_with_bounce>(&msg)) {
|
|
auto shard = (*bounce_msg)->move_to_shard().value();
|
|
auto&& cached_vals = (*bounce_msg)->take_cached_pk_function_calls();
|
|
auto target_host = (*bounce_msg)->move_to_host();
|
|
auto my_host_id = _query_processor.local().proxy().get_token_metadata_ptr()->get_topology().my_host_id();
|
|
if (target_host == my_host_id) {
|
|
// Shard bounce
|
|
auto sg = _config.bounce_request_smp_service_group;
|
|
auto gcs = client_state.move_to_other_shard();
|
|
auto gt = tracing::global_trace_state_ptr(trace_state);
|
|
msg = co_await container().invoke_on(shard, sg, [&, stream, dialect, version] (cql_server& server) -> future<process_fn_return_type> {
|
|
bytes_ostream linearization_buffer;
|
|
request_reader in(is, linearization_buffer);
|
|
auto local_client_state = gcs.get();
|
|
auto local_trace_state = gt.get();
|
|
co_return co_await process_fn(local_client_state, server._query_processor, in, stream, version,
|
|
/* FIXME */empty_service_permit(), std::move(local_trace_state), false, cached_vals, dialect);
|
|
});
|
|
} else {
|
|
// Node bounce
|
|
if (bounced) {
|
|
// If we already bounced between nodes, return the bounce message to the caller
|
|
// to return a 'redirect' response to the bouncing node instead of bouncing again.
|
|
co_return std::move(msg);
|
|
}
|
|
|
|
tracing::trace(trace_state, "Forwarding to node {} shard {}", target_host, shard);
|
|
|
|
auto request_bytes = copy_istream(is);
|
|
|
|
forward_cql_execute_request req{
|
|
.request_buffer = std::move(request_bytes),
|
|
.opcode = static_cast<uint8_t>(opcode),
|
|
.cql_version = static_cast<uint8_t>(version),
|
|
.dialect = dialect,
|
|
.client_state = client_state,
|
|
.trace_info = tracing::make_trace_info(trace_state),
|
|
.cached_fn_calls = std::move(cached_fn_calls),
|
|
};
|
|
|
|
auto response = co_await forward_cql(
|
|
target_host, shard, (*bounce_msg)->timeout().value(), (*bounce_msg)->is_write().value(),
|
|
stream, trace_state, std::move(req));
|
|
|
|
co_return cql_server::result_with_foreign_response_ptr(std::move(response));
|
|
}
|
|
}
|
|
co_return std::move(msg);
|
|
}
|
|
|
|
cql3::dialect
|
|
cql_server::connection::get_dialect() const {
|
|
return cql3::dialect{
|
|
.duplicate_bind_variable_names_refer_to_same_variable = _server._config.cql_duplicate_bind_variable_names_refer_to_same_variable,
|
|
};
|
|
}
|
|
|
|
future<std::unique_ptr<cql_server::response>>
|
|
cql_server::connection::process_register(uint16_t stream, request_reader in, service::client_state& client_state,
|
|
tracing::trace_state_ptr trace_state) {
|
|
using ret_type = std::unique_ptr<cql_server::response>;
|
|
|
|
if (!_client_state.is_control_connection()) {
|
|
if (_server._sl_controller.has_service_level(qos::service_level_controller::driver_service_level_name)) {
|
|
_client_state.set_control_connection();
|
|
update_scheduling_group();
|
|
}
|
|
}
|
|
|
|
std::vector<sstring> event_types;
|
|
auto sl = in.read_string_list(event_types);
|
|
if (!sl) {
|
|
return make_exception_future<ret_type>(std::move(sl).assume_error());
|
|
}
|
|
for (auto&& event_type : event_types) {
|
|
utils::result_with_exception<event::event_type, exceptions::protocol_exception> et = parse_event_type(event_type);
|
|
if (!et) {
|
|
return std::move(et).assume_error().into_exception_future<ret_type>();
|
|
}
|
|
_server._notifier->register_event(std::move(et).value(), this);
|
|
}
|
|
_ready = true;
|
|
on_connection_ready();
|
|
return make_ready_future<ret_type>(make_ready(stream, std::move(trace_state)));
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::connection::make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const
|
|
{
|
|
return std::make_unique<cql_server::response>(stream, cql_binary_opcode::READY, tr_state);
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::connection::make_autheticate(int16_t stream, std::string_view clz, const tracing::trace_state_ptr& tr_state) const
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTHENTICATE, tr_state);
|
|
response->write_string(clz);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_success(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) const {
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTH_SUCCESS, tr_state);
|
|
response->write_bytes(std::move(b));
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::connection::make_auth_challenge(int16_t stream, bytes b, const tracing::trace_state_ptr& tr_state) const {
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::AUTH_CHALLENGE, tr_state);
|
|
response->write_bytes(std::move(b));
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const
|
|
{
|
|
std::multimap<sstring, sstring> opts;
|
|
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
|
|
opts.insert({"COMPRESSION", "lz4"});
|
|
opts.insert({"COMPRESSION", "snappy"});
|
|
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
|
|
// e.g. CQL driver configuration.
|
|
opts.insert({"CLIENT_OPTIONS", ""});
|
|
if (_server._config.allow_shard_aware_drivers) {
|
|
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
|
|
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
|
|
opts.insert({"SCYLLA_SHARDING_ALGORITHM", dht::cpu_sharding_algorithm_name()});
|
|
if (_server._config.shard_aware_transport_port) {
|
|
opts.insert({"SCYLLA_SHARD_AWARE_PORT", format("{:d}", *_server._config.shard_aware_transport_port)});
|
|
}
|
|
if (_server._config.shard_aware_transport_port_ssl) {
|
|
opts.insert({"SCYLLA_SHARD_AWARE_PORT_SSL", format("{:d}", *_server._config.shard_aware_transport_port_ssl)});
|
|
}
|
|
opts.insert({"SCYLLA_SHARDING_IGNORE_MSB", format("{:d}", _server._config.sharding_ignore_msb)});
|
|
opts.insert({"SCYLLA_PARTITIONER", _server._config.partitioner_name});
|
|
}
|
|
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
|
|
const sstring ext_key_name = protocol_extension_name(ext);
|
|
std::vector<sstring> params = additional_options_for_proto_ext(ext);
|
|
if (params.empty()) {
|
|
opts.emplace(ext_key_name, "");
|
|
} else {
|
|
for (sstring val : params) {
|
|
opts.emplace(ext_key_name, std::move(val));
|
|
}
|
|
}
|
|
}
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::SUPPORTED, tr_state);
|
|
response->write_string_multimap(std::move(opts));
|
|
return response;
|
|
}
|
|
|
|
class cql_server::fmt_visitor : public messages::result_message::visitor_base {
|
|
private:
|
|
uint8_t _version;
|
|
cql_server::response& _response;
|
|
bool _skip_metadata;
|
|
cql_metadata_id_wrapper _metadata_id;
|
|
public:
|
|
fmt_visitor(uint8_t version, cql_server::response& response, bool skip_metadata, cql_metadata_id_wrapper&& metadata_id)
|
|
: _version{version}
|
|
, _response{response}
|
|
, _skip_metadata{skip_metadata}
|
|
, _metadata_id(std::move(metadata_id))
|
|
{ }
|
|
|
|
virtual void visit(const messages::result_message::void_message&) override {
|
|
_response.write_int(0x0001);
|
|
}
|
|
|
|
virtual void visit(const messages::result_message::set_keyspace& m) override {
|
|
_response.write_int(0x0003);
|
|
_response.write_string(m.get_keyspace());
|
|
}
|
|
|
|
virtual void visit(const messages::result_message::prepared::cql& m) override {
|
|
_response.write_int(0x0004);
|
|
_response.write_short_bytes(m.get_id());
|
|
if (_metadata_id.has_response_metadata_id()) {
|
|
_response.write_short_bytes(_metadata_id.get_response_metadata_id()._metadata_id);
|
|
}
|
|
_response.write(m.metadata(), _version);
|
|
_response.write(*m.result_metadata(), _metadata_id);
|
|
}
|
|
|
|
virtual void visit(const messages::result_message::schema_change& m) override {
|
|
auto change = m.get_change();
|
|
switch (change->type) {
|
|
case event::event_type::SCHEMA_CHANGE: {
|
|
auto sc = static_pointer_cast<event::schema_change>(change);
|
|
_response.write_int(0x0005);
|
|
_response.serialize(*sc, _version);
|
|
break;
|
|
}
|
|
default:
|
|
SCYLLA_ASSERT(0);
|
|
}
|
|
}
|
|
|
|
virtual void visit(const messages::result_message::rows& m) override {
|
|
_response.write_int(0x0002);
|
|
auto& rs = m.rs();
|
|
_response.write(rs.get_metadata(), _metadata_id, _skip_metadata);
|
|
auto row_count_plhldr = _response.write_int_placeholder();
|
|
|
|
class visitor {
|
|
cql_server::response& _response;
|
|
int64_t _row_count = 0;
|
|
public:
|
|
visitor(cql_server::response& r) : _response(r) { }
|
|
|
|
void start_row() {
|
|
_row_count++;
|
|
}
|
|
void accept_value(std::optional<managed_bytes_view> cell) {
|
|
_response.write_value(cell);
|
|
}
|
|
void end_row() { }
|
|
|
|
int64_t row_count() const { return _row_count; }
|
|
};
|
|
|
|
auto v = visitor(_response);
|
|
rs.visit(v);
|
|
row_count_plhldr.write(v.row_count()); // even though the placeholder is for int32_t we won't overflow because of memory limits
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
make_result(int16_t stream, messages::result_message& msg, const tracing::trace_state_ptr& tr_state,
|
|
cql_protocol_version_type version, cql_metadata_id_wrapper&& metadata_id, bool skip_metadata) {
|
|
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::RESULT, tr_state);
|
|
if (!msg.warnings().empty() && version > 3) [[unlikely]] {
|
|
response->set_frame_flag(cql_frame_flags::warning);
|
|
response->write_string_list(msg.warnings());
|
|
}
|
|
if (msg.custom_payload()) {
|
|
response->set_frame_flag(cql_frame_flags::custom_payload);
|
|
response->write_string_bytes_map(msg.custom_payload().value());
|
|
}
|
|
cql_server::fmt_visitor fmt{version, *response, skip_metadata, std::move(metadata_id)};
|
|
msg.accept(fmt);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
cql_server::connection::make_topology_change_event(const event::topology_change& event) const
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
|
response->write_string("TOPOLOGY_CHANGE");
|
|
response->write_string(to_string(event.change));
|
|
response->write_inet(event.node);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
cql_server::connection::make_status_change_event(const event::status_change& event) const
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
|
response->write_string("STATUS_CHANGE");
|
|
response->write_string(to_string(event.status));
|
|
response->write_inet(event.node);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
cql_server::connection::make_schema_change_event(const event::schema_change& event) const
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
|
response->write_string("SCHEMA_CHANGE");
|
|
response->serialize(event, _version);
|
|
return response;
|
|
}
|
|
|
|
std::unique_ptr<cql_server::response>
|
|
cql_server::connection::make_client_routes_change_event(const event::client_routes_change& event) const
|
|
{
|
|
auto response = std::make_unique<cql_server::response>(-1, cql_binary_opcode::EVENT, tracing::trace_state_ptr());
|
|
response->write_string("CLIENT_ROUTES_CHANGE");
|
|
response->serialize(event, _version);
|
|
return response;
|
|
}
|
|
|
|
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit, cql_compression compression)
|
|
{
|
|
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable {
|
|
cql_server::response& r = *response;
|
|
auto del = make_deleter([response = std::move(response)] {});
|
|
return r.write_message(_write_buf, _version, compression, std::move(del));
|
|
});
|
|
}
|
|
|
|
future<> cql_server::response::write_message(output_stream<char>& out, uint8_t version, cql_compression compression, seastar::deleter del) {
|
|
if (compression != cql_compression::none) {
|
|
compress(compression);
|
|
}
|
|
utils::result_with_exception_ptr<temporary_buffer<char>> frame = make_frame(version, _body.size());
|
|
if (!frame) [[unlikely]] {
|
|
return make_exception_future<>(std::move(frame).assume_error());
|
|
}
|
|
return out.write(std::move(frame).assume_value()).then([this, &out, del = std::move(del)] mutable {
|
|
return do_for_each(_body.begin(), _body.end(), [&out, del = std::move(del)] (bytes_view fragment) mutable {
|
|
temporary_buffer<char> buf(reinterpret_cast<char*>(const_cast<signed char*>(fragment.data())), fragment.size(), del.share());
|
|
return out.write(std::move(buf));
|
|
}).then([&out] {
|
|
return out.flush();
|
|
});
|
|
});
|
|
}
|
|
|
|
void cql_server::response::compress(cql_compression compression)
|
|
{
|
|
switch (compression) {
|
|
case cql_compression::lz4:
|
|
compress_lz4();
|
|
break;
|
|
case cql_compression::snappy:
|
|
compress_snappy();
|
|
break;
|
|
default:
|
|
throw std::invalid_argument("Invalid CQL compression algorithm");
|
|
}
|
|
set_frame_flag(cql_frame_flags::compression);
|
|
}
|
|
|
|
void cql_server::response::compress_lz4()
|
|
{
|
|
auto input_buffer = input_buffer_guard();
|
|
auto output_buffer = output_buffer_guard();
|
|
|
|
auto in = input_buffer.get_linearized_view(_body);
|
|
size_t output_len = LZ4_COMPRESSBOUND(in.size()) + 4;
|
|
auto bytes_ostream = output_buffer.make_bytes_ostream(output_len, [&in] (bytes_mutable_view out) -> utils::result_with_exception<size_t, std::runtime_error> {
|
|
out.data()[0] = (in.size() >> 24) & 0xFF;
|
|
out.data()[1] = (in.size() >> 16) & 0xFF;
|
|
out.data()[2] = (in.size() >> 8) & 0xFF;
|
|
out.data()[3] = in.size() & 0xFF;
|
|
auto ret = LZ4_compress_default(reinterpret_cast<const char*>(in.data()), reinterpret_cast<char*>(out.data() + 4), in.size(), out.size() - 4);
|
|
if (ret == 0) {
|
|
return bo::failure(std::runtime_error("CQL frame LZ4 compression failure"));
|
|
}
|
|
return bo::success(static_cast<size_t>(ret) + 4);
|
|
});
|
|
if (!bytes_ostream) {
|
|
throw std::move(bytes_ostream).as_failure();
|
|
}
|
|
_body = std::move(bytes_ostream).value();
|
|
}
|
|
|
|
void cql_server::response::compress_snappy()
|
|
{
|
|
auto input_buffer = input_buffer_guard();
|
|
auto output_buffer = output_buffer_guard();
|
|
|
|
auto in = input_buffer.get_linearized_view(_body);
|
|
size_t output_len = snappy_max_compressed_length(in.size());
|
|
auto bytes_ostream = output_buffer.make_bytes_ostream(output_len, [&in] (bytes_mutable_view out) -> utils::result_with_exception<size_t, std::runtime_error> {
|
|
// FIXME: snappy internally performs allocations greater than 128 kiB.
|
|
const memory::scoped_large_allocation_warning_threshold slawt{256*1024};
|
|
size_t actual_len = out.size();
|
|
if (snappy_compress(reinterpret_cast<const char*>(in.data()), in.size(), reinterpret_cast<char*>(out.data()), &actual_len) != SNAPPY_OK) {
|
|
return bo::failure(std::runtime_error("CQL frame Snappy compression failure"));
|
|
}
|
|
return bo::success(actual_len);
|
|
});
|
|
if (!bytes_ostream) {
|
|
throw std::move(bytes_ostream).as_failure();
|
|
}
|
|
_body = std::move(bytes_ostream).value();
|
|
}
|
|
|
|
void cql_server::response::serialize(const event::schema_change& event, uint8_t version)
|
|
{
|
|
write_string(to_string(event.change));
|
|
write_string(to_string(event.target));
|
|
write_string(event.keyspace);
|
|
switch (event.target) {
|
|
case event::schema_change::target_type::KEYSPACE:
|
|
break;
|
|
case event::schema_change::target_type::TYPE:
|
|
case event::schema_change::target_type::TABLE:
|
|
write_string(event.arguments[0]);
|
|
break;
|
|
case event::schema_change::target_type::FUNCTION:
|
|
case event::schema_change::target_type::AGGREGATE:
|
|
write_string(event.arguments[0]);
|
|
write_string_list(std::vector<sstring>(event.arguments.begin() + 1, event.arguments.end()));
|
|
break;
|
|
}
|
|
}
|
|
|
|
void cql_server::response::serialize(const event::client_routes_change& event, uint8_t version)
|
|
{
|
|
write_string(to_string(event.change));
|
|
write_string_list(event.get_connection_ids());
|
|
write_string_list(event.get_host_ids());
|
|
}
|
|
|
|
void cql_server::response::write_byte(uint8_t b)
|
|
{
|
|
auto s = reinterpret_cast<const int8_t*>(&b);
|
|
_body.write(bytes_view(s, sizeof(b)));
|
|
}
|
|
|
|
void cql_server::response::write_int(int32_t n)
|
|
{
|
|
auto u = htonl(n);
|
|
auto *s = reinterpret_cast<const int8_t*>(&u);
|
|
_body.write(bytes_view(s, sizeof(u)));
|
|
}
|
|
|
|
cql_server::response::placeholder<int32_t> cql_server::response::write_int_placeholder() {
|
|
return placeholder<int32_t>(_body.write_place_holder(sizeof(int32_t)));
|
|
}
|
|
|
|
void cql_server::response::write_long(int64_t n)
|
|
{
|
|
auto u = htonq(n);
|
|
auto *s = reinterpret_cast<const int8_t*>(&u);
|
|
_body.write(bytes_view(s, sizeof(u)));
|
|
}
|
|
|
|
void cql_server::response::write_short(uint16_t n)
|
|
{
|
|
auto u = htons(n);
|
|
auto *s = reinterpret_cast<const int8_t*>(&u);
|
|
_body.write(bytes_view(s, sizeof(u)));
|
|
}
|
|
|
|
template<typename T>
|
|
inline
|
|
T cast_if_fits(size_t v) {
|
|
size_t max = std::numeric_limits<T>::max();
|
|
if (v > max) {
|
|
throw std::runtime_error(format("Value too large, {:d} > {:d}", v, max));
|
|
}
|
|
return static_cast<T>(v);
|
|
}
|
|
|
|
void cql_server::response::write_string(std::string_view s)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(s.size()));
|
|
_body.write(bytes_view(reinterpret_cast<const int8_t*>(s.data()), s.size()));
|
|
}
|
|
|
|
void cql_server::response::write_bytes_as_string(bytes_view s)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(s.size()));
|
|
_body.write(s);
|
|
}
|
|
|
|
void cql_server::response::write_long_string(const sstring& s)
|
|
{
|
|
write_int(cast_if_fits<int32_t>(s.size()));
|
|
_body.write(bytes_view(reinterpret_cast<const int8_t*>(s.data()), s.size()));
|
|
}
|
|
|
|
void cql_server::response::write_string_list(std::vector<sstring> string_list)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(string_list.size()));
|
|
for (auto&& s : string_list) {
|
|
write_string(s);
|
|
}
|
|
}
|
|
|
|
void cql_server::response::write_bytes(bytes b)
|
|
{
|
|
write_int(cast_if_fits<int32_t>(b.size()));
|
|
_body.write(b);
|
|
}
|
|
|
|
void cql_server::response::write_short_bytes(bytes b)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(b.size()));
|
|
_body.write(b);
|
|
}
|
|
|
|
void cql_server::response::write_inet(socket_address inet)
|
|
{
|
|
auto addr = inet.addr();
|
|
write_byte(uint8_t(addr.size()));
|
|
auto * p = static_cast<const int8_t*>(addr.data());
|
|
_body.write(bytes_view(p, addr.size()));
|
|
write_int(inet.port());
|
|
}
|
|
|
|
void cql_server::response::write_consistency(db::consistency_level c)
|
|
{
|
|
write_short(consistency_to_wire(c));
|
|
}
|
|
|
|
void cql_server::response::write_string_map(std::map<sstring, sstring> string_map)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(string_map.size()));
|
|
for (auto&& s : string_map) {
|
|
write_string(s.first);
|
|
write_string(s.second);
|
|
}
|
|
}
|
|
|
|
void cql_server::response::write_string_multimap(std::multimap<sstring, sstring> string_map)
|
|
{
|
|
std::vector<sstring> keys;
|
|
for (auto it = string_map.begin(), end = string_map.end(); it != end; it = string_map.upper_bound(it->first)) {
|
|
keys.push_back(it->first);
|
|
}
|
|
write_short(cast_if_fits<uint16_t>(keys.size()));
|
|
for (auto&& key : keys) {
|
|
std::vector<sstring> values;
|
|
auto range = string_map.equal_range(key);
|
|
for (auto it = range.first; it != range.second; ++it) {
|
|
values.push_back(it->second);
|
|
}
|
|
write_string(key);
|
|
write_string_list(values);
|
|
}
|
|
}
|
|
|
|
void cql_server::response::write_string_bytes_map(const std::unordered_map<sstring, bytes>& map)
|
|
{
|
|
write_short(cast_if_fits<uint16_t>(map.size()));
|
|
for (auto&& s : map) {
|
|
write_string(s.first);
|
|
write_bytes(s.second);
|
|
}
|
|
}
|
|
|
|
void cql_server::response::write_value(bytes_opt value)
|
|
{
|
|
if (!value) {
|
|
write_int(-1);
|
|
return;
|
|
}
|
|
|
|
write_int(value->size());
|
|
_body.write(*value);
|
|
}
|
|
|
|
void cql_server::response::write_value(std::optional<managed_bytes_view> value)
|
|
{
|
|
if (!value) {
|
|
write_int(-1);
|
|
return;
|
|
}
|
|
|
|
write_int(value->size_bytes());
|
|
while (!value->empty()) {
|
|
_body.write(value->current_fragment());
|
|
value->remove_current();
|
|
}
|
|
}
|
|
|
|
class type_codec {
|
|
private:
|
|
enum class type_id : int16_t {
|
|
CUSTOM = 0x0000,
|
|
ASCII = 0x0001,
|
|
BIGINT = 0x0002,
|
|
BLOB = 0x0003,
|
|
BOOLEAN = 0x0004,
|
|
COUNTER = 0x0005,
|
|
DECIMAL = 0x0006,
|
|
DOUBLE = 0x0007,
|
|
FLOAT = 0x0008,
|
|
INT = 0x0009,
|
|
TIMESTAMP = 0x000B,
|
|
UUID = 0x000C,
|
|
VARCHAR = 0x000D,
|
|
VARINT = 0x000E,
|
|
TIMEUUID = 0x000F,
|
|
INET = 0x0010,
|
|
DATE = 0x0011,
|
|
TIME = 0x0012,
|
|
SMALLINT = 0x0013,
|
|
TINYINT = 0x0014,
|
|
DURATION = 0x0015,
|
|
LIST = 0x0020,
|
|
MAP = 0x0021,
|
|
SET = 0x0022,
|
|
UDT = 0x0030,
|
|
TUPLE = 0x0031,
|
|
};
|
|
|
|
using type_id_to_type_type = std::unordered_map<data_type, type_id>;
|
|
|
|
static thread_local const type_id_to_type_type type_id_to_type;
|
|
public:
|
|
static void encode(cql_server::response& r, data_type type) {
|
|
type = type->underlying_type();
|
|
|
|
// For compatibility sake, we still return DateType as the timestamp type in resultSet metadata (#5723)
|
|
if (type == date_type) {
|
|
type = timestamp_type;
|
|
}
|
|
|
|
auto i = type_id_to_type.find(type);
|
|
if (i != type_id_to_type.end()) {
|
|
r.write_short(static_cast<std::underlying_type<type_id>::type>(i->second));
|
|
return;
|
|
}
|
|
|
|
if (type->is_reversed()) {
|
|
fail(unimplemented::cause::REVERSED);
|
|
}
|
|
if (type->is_user_type()) {
|
|
r.write_short(uint16_t(type_id::UDT));
|
|
auto udt = static_pointer_cast<const user_type_impl>(type);
|
|
r.write_string(udt->_keyspace);
|
|
r.write_bytes_as_string(udt->_name);
|
|
r.write_short(udt->size());
|
|
for (auto&& i : std::views::iota(0u, udt->size())) {
|
|
r.write_bytes_as_string(udt->field_name(i));
|
|
encode(r, udt->field_type(i));
|
|
}
|
|
return;
|
|
}
|
|
if (type->is_tuple()) {
|
|
r.write_short(uint16_t(type_id::TUPLE));
|
|
auto ttype = static_pointer_cast<const tuple_type_impl>(type);
|
|
r.write_short(ttype->size());
|
|
for (auto&& t : ttype->all_types()) {
|
|
encode(r, t);
|
|
}
|
|
return;
|
|
}
|
|
if (type->is_vector()) {
|
|
r.write_short(uint16_t(type_id::CUSTOM));
|
|
r.write_string(type->name());
|
|
return;
|
|
}
|
|
if (type->is_collection()) {
|
|
auto&& ctype = static_cast<const collection_type_impl*>(type.get());
|
|
if (ctype->get_kind() == abstract_type::kind::map) {
|
|
r.write_short(uint16_t(type_id::MAP));
|
|
auto&& mtype = static_cast<const map_type_impl*>(ctype);
|
|
encode(r, mtype->get_keys_type());
|
|
encode(r, mtype->get_values_type());
|
|
} else if (ctype->get_kind() == abstract_type::kind::set) {
|
|
r.write_short(uint16_t(type_id::SET));
|
|
auto&& stype = static_cast<const set_type_impl*>(ctype);
|
|
encode(r, stype->get_elements_type());
|
|
} else if (ctype->get_kind() == abstract_type::kind::list) {
|
|
r.write_short(uint16_t(type_id::LIST));
|
|
auto&& ltype = static_cast<const list_type_impl*>(ctype);
|
|
encode(r, ltype->get_elements_type());
|
|
} else {
|
|
abort();
|
|
}
|
|
return;
|
|
}
|
|
abort();
|
|
}
|
|
};
|
|
|
|
thread_local const type_codec::type_id_to_type_type type_codec::type_id_to_type {
|
|
{ ascii_type, type_id::ASCII },
|
|
{ long_type, type_id::BIGINT },
|
|
{ bytes_type, type_id::BLOB },
|
|
{ boolean_type, type_id::BOOLEAN },
|
|
{ counter_type, type_id::COUNTER },
|
|
{ decimal_type, type_id::DECIMAL },
|
|
{ double_type, type_id::DOUBLE },
|
|
{ float_type, type_id::FLOAT },
|
|
{ int32_type, type_id::INT },
|
|
{ byte_type, type_id::TINYINT },
|
|
{ duration_type, type_id::DURATION },
|
|
{ short_type, type_id::SMALLINT },
|
|
{ timestamp_type, type_id::TIMESTAMP },
|
|
{ uuid_type, type_id::UUID },
|
|
{ utf8_type, type_id::VARCHAR },
|
|
{ varint_type, type_id::VARINT },
|
|
{ timeuuid_type, type_id::TIMEUUID },
|
|
{ simple_date_type, type_id::DATE },
|
|
{ time_type, type_id::TIME },
|
|
{ inet_addr_type, type_id::INET },
|
|
};
|
|
|
|
void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_wrapper& metadata_id, bool no_metadata) {
|
|
auto flags = m.flags();
|
|
bool global_tables_spec = m.flags().contains<cql3::metadata::flag::GLOBAL_TABLES_SPEC>();
|
|
bool has_more_pages = m.flags().contains<cql3::metadata::flag::HAS_MORE_PAGES>();
|
|
|
|
if (no_metadata) {
|
|
flags.set<cql3::metadata::flag::NO_METADATA>();
|
|
}
|
|
|
|
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
|
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
|
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
|
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
|
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
|
no_metadata = false;
|
|
}
|
|
}
|
|
|
|
write_int(flags.mask());
|
|
write_int(m.column_count());
|
|
|
|
if (has_more_pages) {
|
|
write_value(m.paging_state()->serialize());
|
|
}
|
|
|
|
if (no_metadata) {
|
|
return;
|
|
}
|
|
|
|
if (flags.contains<cql3::metadata::flag::METADATA_CHANGED>()) {
|
|
write_short_bytes(metadata_id.get_response_metadata_id()._metadata_id);
|
|
}
|
|
|
|
auto names_i = m.get_names().begin();
|
|
|
|
if (global_tables_spec) {
|
|
auto first_spec = *names_i;
|
|
write_string(first_spec->ks_name);
|
|
write_string(first_spec->cf_name);
|
|
}
|
|
|
|
for (uint32_t i = 0; i < m.column_count(); ++i, ++names_i) {
|
|
lw_shared_ptr<cql3::column_specification> name = *names_i;
|
|
if (!global_tables_spec) {
|
|
write_string(name->ks_name);
|
|
write_string(name->cf_name);
|
|
}
|
|
write_string(name->name->text());
|
|
type_codec::encode(*this, name->type);
|
|
}
|
|
}
|
|
|
|
void cql_server::response::write(const cql3::prepared_metadata& m, uint8_t version)
|
|
{
|
|
bool global_tables_spec = m.flags().contains<cql3::prepared_metadata::flag::GLOBAL_TABLES_SPEC>();
|
|
|
|
write_int(m.flags().mask());
|
|
write_int(m.names().size());
|
|
|
|
if (version >= 4) {
|
|
if (!global_tables_spec) {
|
|
write_int(0);
|
|
} else {
|
|
write_int(m.partition_key_bind_indices().size());
|
|
for (uint16_t bind_index : m.partition_key_bind_indices()) {
|
|
write_short(bind_index);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (global_tables_spec) {
|
|
write_string(m.names()[0]->ks_name);
|
|
write_string(m.names()[0]->cf_name);
|
|
}
|
|
|
|
for (auto const& name : m.names()) {
|
|
if (!global_tables_spec) {
|
|
write_string(name->ks_name);
|
|
write_string(name->cf_name);
|
|
}
|
|
write_string(name->name->text());
|
|
type_codec::encode(*this, name->type);
|
|
}
|
|
}
|
|
|
|
bool cql_metadata_id_wrapper::has_request_metadata_id() const {
|
|
return _request_metadata_id.has_value();
|
|
}
|
|
|
|
bool cql_metadata_id_wrapper::has_response_metadata_id() const {
|
|
return _response_metadata_id.has_value();
|
|
}
|
|
|
|
const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_request_metadata_id() const {
|
|
if (!has_request_metadata_id()) {
|
|
on_internal_error(clogger, "request metadata_id is empty");
|
|
}
|
|
return _request_metadata_id.value();
|
|
}
|
|
|
|
const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata_id() const {
|
|
if (!has_response_metadata_id()) {
|
|
on_internal_error(clogger, "response metadata_id is empty");
|
|
}
|
|
return _response_metadata_id.value();
|
|
}
|
|
|
|
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
|
|
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
|
co_await for_each_gently([&ret] (const generic_server::connection& c) {
|
|
const connection& conn = dynamic_cast<const connection&>(c);
|
|
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
|
|
});
|
|
co_return ret;
|
|
}
|
|
|
|
future<> cql_server::update_connections_scheduling_group() {
|
|
return for_each_gently([] (generic_server::connection& conn) {
|
|
connection& cql_conn = dynamic_cast<connection&>(conn);
|
|
cql_conn.update_scheduling_group();
|
|
});
|
|
}
|
|
|
|
future<> cql_server::update_connections_service_level_params() {
|
|
return for_each_gently([this] (generic_server::connection& conn) {
|
|
connection& cql_conn = dynamic_cast<connection&>(conn);
|
|
auto& cs = cql_conn.get_client_state();
|
|
auto& user = cs.user();
|
|
|
|
if (user && user->name) {
|
|
auto slo = _sl_controller.find_cached_effective_service_level(user->name.value());
|
|
if (slo) {
|
|
cs.update_per_service_level_params(*slo);
|
|
}
|
|
}
|
|
cql_conn.update_scheduling_group();
|
|
});
|
|
}
|
|
|
|
future<std::vector<connection_service_level_params>> cql_server::get_connections_service_level_params() {
|
|
std::vector<connection_service_level_params> sl_params;
|
|
co_await for_each_gently([&sl_params] (const generic_server::connection& conn) {
|
|
auto& cql_conn = dynamic_cast<const connection&>(conn);
|
|
auto& client_state = cql_conn.get_client_state();
|
|
auto& user = client_state.user();
|
|
auto role_name = user
|
|
? (user->name ? *(user->name) : "ANONYMOUS")
|
|
: "UNAUTHENTICATED";
|
|
|
|
sl_params.emplace_back(std::move(role_name), client_state.get_timeout_config(), client_state.get_workload_type(), cql_conn.get_scheduling_group().name());
|
|
});
|
|
co_return sl_params;
|
|
}
|
|
|
|
}
|