From 5a5514d052da074becd0bfa56031081fb28ee8a3 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 5 Dec 2022 16:44:02 +0200 Subject: [PATCH] cql server: Only parallelize relevant cql requests The cql server uses an execution stage to process and execute queries, however, processing stage is best utilized when having a recurrent flow that needs to be called repeatedly since it better utilizes the instruction cache. Up until now, every request was sent through the processing stage, but most requests are not meant to be executed repeatedly with high volume. This change processes and executes the data queries asynchronously, through an execution stage, and all of the rest are processed one by one, only continuing once the request has been done end to end. Tests: Unit tests in dev and debug. Signed-off-by: Eliran Sinvani Closes #12202 --- transport/server.cc | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/transport/server.cc b/transport/server.cc index 71de9244d5..4b81b19b36 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -670,17 +670,33 @@ future<> cql_server::connection::process_request() { _pending_requests_gate.leave(); }); auto istream = buf.get_istream(); - (void)_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) - .then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future>> response_f) mutable { - try { - write_response(response_f.get0(), std::move(mem_permit), _compression); - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); - } catch (...) { - clogger.error("request processing failed: {}", std::current_exception()); - } - }); - return make_ready_future<>(); + + // Parallelize only the performance sensitive requests: + // QUERY, PREPARE, EXECUTE, BATCH + bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) || + op == uint8_t(cql_binary_opcode::PREPARE) || + op == uint8_t (cql_binary_opcode::EXECUTE) || + op == uint8_t(cql_binary_opcode::BATCH)); + + future>> request_process_future = should_paralelize ? + _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) : + process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit); + + future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future>> response_f) mutable { + try { + write_response(response_f.get0(), std::move(mem_permit), _compression); + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); + } catch (...) { + clogger.error("request processing failed: {}", std::current_exception()); + } + }); + + if (should_paralelize) { + return make_ready_future<>(); + } else { + return request_response_future; + } }); }); });