transport server: fix unexpected server errors handling
If request processing ended with an error, it is worth sending the error to the client through make_error/write_response. Previously in this case we just wrote a message to the log and didn't handle the client connection in any way. As a result, the only thing the client got in this case was timeout error. A new test_batch_with_error is added. It is quite difficult to reproduce error condition in a test, so we use error injection instead. Passing injection_key in the body of the request ensures that the exception will be thrown only for this test request and will not affect other requests that the driver may send in the background. Closes: scylladb#12104
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "hashers.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
@@ -600,6 +601,14 @@ query_processor::get_statement(const sstring_view& query, const service::client_
|
||||
std::unique_ptr<raw::parsed_statement>
|
||||
query_processor::parse_statement(const sstring_view& query) {
|
||||
try {
|
||||
{
|
||||
const char* error_injection_key = "query_processor-parse_statement-test_failure";
|
||||
utils::get_local_injector().inject(error_injection_key, [&]() {
|
||||
if (query.find(error_injection_key) != sstring_view::npos) {
|
||||
throw std::runtime_error(error_injection_key);
|
||||
}
|
||||
});
|
||||
}
|
||||
auto statement = util::do_with_parser(query, std::mem_fn(&cql3_parser::CqlParser::query));
|
||||
if (!statement) {
|
||||
throw exceptions::syntax_exception("Parsing failed");
|
||||
|
||||
@@ -7,8 +7,10 @@
|
||||
# Tests for batch operations
|
||||
#############################################################################
|
||||
from cassandra import InvalidRequest
|
||||
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from util import new_test_table
|
||||
from rest_api import scylla_inject_error
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -50,3 +52,27 @@ def test_error_is_raised_for_batch_size_above_threshold(cql, table1):
|
||||
from scylla.yaml."""
|
||||
with pytest.raises(InvalidRequest, match="Batch too large"):
|
||||
cql.execute(generate_big_batch(table1, 1025))
|
||||
|
||||
# Test checks unexpected errors handling in CQL server.
|
||||
#
|
||||
# The original problem was that std::bad_alloc exception occurred while parsing a large batch request.
|
||||
# This exception was caught by try/catch in cql_server::connection::process_request_one and
|
||||
# an attempt was made to construct the error response message via make_error function.
|
||||
# This attempt failed since the error message contained entire query and exceeded the limit of 64K
|
||||
# in cql_server::response::write_string, causing "Value too large" exception to be thrown.
|
||||
# This new exception reached the general handler in cql_server::connection::process_request, where
|
||||
# it was just logged and no information about the problem was sent to the client.
|
||||
# As a result, the client received a timeout exception after a while and
|
||||
# no other information about the cause of the error.
|
||||
#
|
||||
# It is quite difficult to reproduce OOM in a test, so we use error injection instead.
|
||||
# Passing injection_key in the body of the request ensures that the exception will be
|
||||
# thrown only for this test request and will not affect other requests that
|
||||
# the driver may send in the background.
|
||||
@pytest.mark.asyncio
|
||||
async def test_batch_with_error(cql, table1):
|
||||
injection_key = 'query_processor-parse_statement-test_failure'
|
||||
with scylla_inject_error(cql, injection_key, one_shot=False):
|
||||
# exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver
|
||||
with pytest.raises(NoHostAvailable, match="Value too large"):
|
||||
cql.execute(generate_big_batch(table1, 100) + injection_key)
|
||||
|
||||
@@ -694,14 +694,23 @@ future<> cql_server::connection::process_request() {
|
||||
_process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) :
|
||||
process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit);
|
||||
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
if (response_f.failed()) {
|
||||
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
|
||||
clogger.error("{}: {}", _client_state.get_remote_address(), message);
|
||||
write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR,
|
||||
message,
|
||||
tracing::trace_state_ptr()));
|
||||
} else {
|
||||
write_response(response_f.get0(), std::move(mem_permit), _compression);
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
||||
} catch (...) {
|
||||
clogger.error("request processing failed: {}", std::current_exception());
|
||||
}
|
||||
});
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
||||
} catch (...) {
|
||||
clogger.error("{}: request processing failed: {}",
|
||||
_client_state.get_remote_address(), std::current_exception());
|
||||
}
|
||||
});
|
||||
|
||||
if (should_paralelize) {
|
||||
return make_ready_future<>();
|
||||
|
||||
Reference in New Issue
Block a user