diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index db5568a68a..9f5e1d60df 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -22,6 +22,7 @@ #include "db/config.hh" #include "data_dictionary/data_dictionary.hh" #include "hashers.hh" +#include "utils/error_injection.hh" namespace cql3 { @@ -600,6 +601,14 @@ query_processor::get_statement(const sstring_view& query, const service::client_ std::unique_ptr query_processor::parse_statement(const sstring_view& query) { try { + { + const char* error_injection_key = "query_processor-parse_statement-test_failure"; + utils::get_local_injector().inject(error_injection_key, [&]() { + if (query.find(error_injection_key) != sstring_view::npos) { + throw std::runtime_error(error_injection_key); + } + }); + } auto statement = util::do_with_parser(query, std::mem_fn(&cql3_parser::CqlParser::query)); if (!statement) { throw exceptions::syntax_exception("Parsing failed"); diff --git a/test/cql-pytest/test_batch.py b/test/cql-pytest/test_batch.py index 02a9fb46c4..f28cab50d3 100644 --- a/test/cql-pytest/test_batch.py +++ b/test/cql-pytest/test_batch.py @@ -7,8 +7,10 @@ # Tests for batch operations ############################################################################# from cassandra import InvalidRequest - +from cassandra.cluster import NoHostAvailable from util import new_test_table +from rest_api import scylla_inject_error + import pytest @@ -50,3 +52,27 @@ def test_error_is_raised_for_batch_size_above_threshold(cql, table1): from scylla.yaml.""" with pytest.raises(InvalidRequest, match="Batch too large"): cql.execute(generate_big_batch(table1, 1025)) + +# Test checks unexpected errors handling in CQL server. +# +# The original problem was that std::bad_alloc exception occurred while parsing a large batch request. +# This exception was caught by try/catch in cql_server::connection::process_request_one and +# an attempt was made to construct the error response message via make_error function. +# This attempt failed since the error message contained entire query and exceeded the limit of 64K +# in cql_server::response::write_string, causing "Value too large" exception to be thrown. +# This new exception reached the general handler in cql_server::connection::process_request, where +# it was just logged and no information about the problem was sent to the client. +# As a result, the client received a timeout exception after a while and +# no other information about the cause of the error. +# +# It is quite difficult to reproduce OOM in a test, so we use error injection instead. +# Passing injection_key in the body of the request ensures that the exception will be +# thrown only for this test request and will not affect other requests that +# the driver may send in the background. +@pytest.mark.asyncio +async def test_batch_with_error(cql, table1): + injection_key = 'query_processor-parse_statement-test_failure' + with scylla_inject_error(cql, injection_key, one_shot=False): + # exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver + with pytest.raises(NoHostAvailable, match="Value too large"): + cql.execute(generate_big_batch(table1, 100) + injection_key) diff --git a/transport/server.cc b/transport/server.cc index e1a9102ad4..dee16d9c21 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -694,14 +694,23 @@ future<> cql_server::connection::process_request() { _process_request_stage(this, istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit) : process_request_one(istream, op, stream, seastar::ref(_client_state), tracing_requested, mem_permit); - future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave)] (future>> response_f) mutable { - try { + future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future>> response_f) mutable { + try { + if (response_f.failed()) { + const auto message = format("request processing failed, error [{}]", response_f.get_exception()); + clogger.error("{}: {}", _client_state.get_remote_address(), message); + write_response(make_error(stream, exceptions::exception_code::SERVER_ERROR, + message, + tracing::trace_state_ptr())); + } else { write_response(response_f.get0(), std::move(mem_permit), _compression); - _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); - } catch (...) { - clogger.error("request processing failed: {}", std::current_exception()); } - }); + _ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {}); + } catch (...) { + clogger.error("{}: request processing failed: {}", + _client_state.get_remote_address(), std::current_exception()); + } + }); if (should_paralelize) { return make_ready_future<>();