transport server: fix "request size too large" handling

Calling _read_buf.close() doesn't imply eof(), some data
may have already been read into kernel or client buffers
and will be returned next time read() is called.
When the _server._max_request_size limit was exceeded
and the _read_buf was closed, the process_request method
finished and we started processing the next request in
connection::process. The unread data from _read_buf was
treated as the header of the next request frame, resulting
in "Invalid or unsupported protocol version" error.

The existing test_shed_too_large_request was adjusted.
It was originally written with the assumption that the data
of a large query would simply be dropped from the socket
and the connection could be used to handle the
next requests. This behaviour was changed in scylladb#8800,
now the connection is closed on the Scylla side and
can no longer be used. To check there are no errors
in this case, we use Scylla metrics, getting them
from the Scylla Prometheus API.
This commit is contained in:
Petr Gusev
2022-12-12 20:33:58 +04:00
parent 0904f98ebf
commit 3263523b54
2 changed files with 47 additions and 10 deletions

View File

@@ -7,9 +7,10 @@
#############################################################################
import pytest
import re
from cassandra.cluster import NoHostAvailable
from cassandra.protocol import InvalidRequest
from util import unique_name
from util import unique_name, new_cql
import requests
@pytest.fixture(scope="module")
@@ -31,12 +32,47 @@ def table1(cql, test_keyspace):
# 5. Hence, a 30MiB request will be estimated to take around 60MiB RAM,
# which is enough to trigger shedding.
# See also #8193.
@pytest.mark.skip(reason="highly depends on configuration")
#
# Big request causes the entire connection to be closed (see #8800 for the reasons),
# this results in NoHostAvailable on the client.
# We check that there are no unexpected protocol_errors using Scylla Prometheus API.
# Such errors occurred before, when before closing the connection, the remaining
# bytes of the current request were not read to the end and were treated as
# the beginning of the next request.
#
# Have no idea why, but on CI the actual memory limit for CQL requests is ~100MiB,
# so we use ~60MiB request (which is estimated to take ~120MiB according to the logic above)
# to guarantee shedding.
def test_shed_too_large_request(cql, table1, scylla_only):
prepared = cql.prepare(f"INSERT INTO {table1} (p,t1,t2,t3,t4,t5,t6) VALUES (42,?,?,?,?,?,?)")
a_5mb_string = 'x'*5*1024*1024
with pytest.raises(InvalidRequest, match=re.compile('large', re.IGNORECASE)):
cql.execute(prepared, [a_5mb_string]*6)
def get_protocol_errors():
result = 0
for line in requests.get(f'http://{cql.cluster.contact_points[0]}:9180/metrics').text.split('\n'):
if not line.startswith('scylla_transport_cql_errors_total') or 'protocol_error' not in line:
continue
result += int(line.split(' ')[1])
return result
# protocol_errors metric is always non-zero, since the
# cassandra python driver use these errors to negotiate the protocol version
protocol_errors_before = get_protocol_errors()
# separate session is needed due to the cql_test_connection fixture,
# which checks that the session is not broken at the end of the test
with new_cql(cql) as ncql:
prepared = ncql.prepare(f"INSERT INTO {table1} (p,t1,t2,t3,t4,t5,t6) VALUES (42,?,?,?,?,?,?)")
# With release builds of Scylla, information that the socket is closed reaches the client driver
# before it has time to process the error message written by Scylla to the socket.
# The driver ignores unread bytes from the socket (this looks like a bug),
# tries to establish a connection with another node, and throws a NoHostAvailable exception if it fails.
# In the debug builds, the driver can have time to grab the error from the socket,
# and we get InvalidRequest exception.
with pytest.raises((NoHostAvailable, InvalidRequest),
match="request size too large|Unable to complete the operation against any hosts"):
a_5mb_string = 'x'*10*1024*1024
ncql.execute(prepared, [a_5mb_string]*6)
protocol_errors_after = get_protocol_errors()
assert protocol_errors_after == protocol_errors_before
cql.execute(prepared, ["small_string"]*6)
res = [row for row in cql.execute(f"SELECT p, t3 FROM {table1}")]
assert len(res) == 1 and res[0].p == 42 and res[0].t3 == "small_string"

View File

@@ -34,6 +34,7 @@
#include <seastar/core/metrics.hh>
#include <seastar/net/byteorder.hh>
#include <seastar/util/lazy.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/core/execution_stage.hh>
#include "utils/result_try.hh"
#include "utils/result_combinators.hh"
@@ -627,9 +628,9 @@ future<> cql_server::connection::process_request() {
f.length, mem_estimate, _server._max_request_size);
clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
return std::exchange(_ready_to_respond, make_ready_future<>()).then([this, length = f.length] {
return _read_buf.close();
});
return std::exchange(_ready_to_respond, make_ready_future<>())
.then([this] { return _read_buf.close(); })
.then([this] { return util::skip_entire_stream(_read_buf); });
}
if (_server._stats.requests_serving > _server._max_concurrent_requests) {