cql server: Only parallelize relevant cql requests

The cql server uses an execution stage to process and execute queries,
however, processing stage is best utilized when having a recurrent flow
that needs to be called repeatedly since it better utilizes the
instruction cache.
Up until now, every request was sent through the processing stage, but
most requests are not meant to be executed repeatedly with high volume.
This change processes and executes the data queries asynchronously,
through an execution stage, and all of the rest are processed one by
one, only continuing once the request has been done end to end.

Tests:
Unit tests in dev and debug.

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>

Closes #12202
This commit is contained in:
Eliran Sinvani
2022-12-05 16:44:02 +02:00
committed by Avi Kivity
parent b7851ab1ec
commit 5a5514d052

View File

@@ -670,17 +670,33 @@ future<> cql_server::connection::process_request() {
_pending_requests_gate.leave();
});
auto istream = buf.get_istream();
(void)_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit)
.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
try {
write_response(response_f.get0(), std::move(mem_permit), _compression);
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
} catch (...) {
clogger.error("request processing failed: {}", std::current_exception());
}
});
return make_ready_future<>();
// Parallelize only the performance sensitive requests:
// QUERY, PREPARE, EXECUTE, BATCH
bool should_paralelize = (op == uint8_t(cql_binary_opcode::QUERY) ||
op == uint8_t(cql_binary_opcode::PREPARE) ||
op == uint8_t (cql_binary_opcode::EXECUTE) ||
op == uint8_t(cql_binary_opcode::BATCH));
future<foreign_ptr<std::unique_ptr<cql_server::response>>> request_process_future = should_paralelize ?
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
try {
write_response(response_f.get0(), std::move(mem_permit), _compression);
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
} catch (...) {
clogger.error("request processing failed: {}", std::current_exception());
}
});
if (should_paralelize) {
return make_ready_future<>();
} else {
return request_response_future;
}
});
});
});