transport server: fix "request size too large" handling
Calling _read_buf.close() doesn't imply eof(), some data may have already been read into kernel or client buffers and will be returned next time read() is called. When the _server._max_request_size limit was exceeded and the _read_buf was closed, the process_request method finished and we started processing the next request in connection::process. The unread data from _read_buf was treated as the header of the next request frame, resulting in "Invalid or unsupported protocol version" error. The existing test_shed_too_large_request was adjusted. It was originally written with the assumption that the data of a large query would simply be dropped from the socket and the connection could be used to handle the next requests. This behaviour was changed in scylladb#8800, now the connection is closed on the Scylla side and can no longer be used. To check there are no errors in this case, we use Scylla metrics, getting them from the Scylla Prometheus API.
This commit is contained in:
@@ -7,9 +7,10 @@
|
||||
#############################################################################
|
||||
|
||||
import pytest
|
||||
import re
|
||||
from cassandra.cluster import NoHostAvailable
|
||||
from cassandra.protocol import InvalidRequest
|
||||
from util import unique_name
|
||||
from util import unique_name, new_cql
|
||||
import requests
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@@ -31,12 +32,47 @@ def table1(cql, test_keyspace):
|
||||
# 5. Hence, a 30MiB request will be estimated to take around 60MiB RAM,
|
||||
# which is enough to trigger shedding.
|
||||
# See also #8193.
|
||||
@pytest.mark.skip(reason="highly depends on configuration")
|
||||
#
|
||||
# Big request causes the entire connection to be closed (see #8800 for the reasons),
|
||||
# this results in NoHostAvailable on the client.
|
||||
# We check that there are no unexpected protocol_errors using Scylla Prometheus API.
|
||||
# Such errors occurred before, when before closing the connection, the remaining
|
||||
# bytes of the current request were not read to the end and were treated as
|
||||
# the beginning of the next request.
|
||||
#
|
||||
# Have no idea why, but on CI the actual memory limit for CQL requests is ~100MiB,
|
||||
# so we use ~60MiB request (which is estimated to take ~120MiB according to the logic above)
|
||||
# to guarantee shedding.
|
||||
def test_shed_too_large_request(cql, table1, scylla_only):
|
||||
prepared = cql.prepare(f"INSERT INTO {table1} (p,t1,t2,t3,t4,t5,t6) VALUES (42,?,?,?,?,?,?)")
|
||||
a_5mb_string = 'x'*5*1024*1024
|
||||
with pytest.raises(InvalidRequest, match=re.compile('large', re.IGNORECASE)):
|
||||
cql.execute(prepared, [a_5mb_string]*6)
|
||||
def get_protocol_errors():
|
||||
result = 0
|
||||
for line in requests.get(f'http://{cql.cluster.contact_points[0]}:9180/metrics').text.split('\n'):
|
||||
if not line.startswith('scylla_transport_cql_errors_total') or 'protocol_error' not in line:
|
||||
continue
|
||||
result += int(line.split(' ')[1])
|
||||
return result
|
||||
|
||||
# protocol_errors metric is always non-zero, since the
|
||||
# cassandra python driver use these errors to negotiate the protocol version
|
||||
protocol_errors_before = get_protocol_errors()
|
||||
|
||||
# separate session is needed due to the cql_test_connection fixture,
|
||||
# which checks that the session is not broken at the end of the test
|
||||
with new_cql(cql) as ncql:
|
||||
prepared = ncql.prepare(f"INSERT INTO {table1} (p,t1,t2,t3,t4,t5,t6) VALUES (42,?,?,?,?,?,?)")
|
||||
|
||||
# With release builds of Scylla, information that the socket is closed reaches the client driver
|
||||
# before it has time to process the error message written by Scylla to the socket.
|
||||
# The driver ignores unread bytes from the socket (this looks like a bug),
|
||||
# tries to establish a connection with another node, and throws a NoHostAvailable exception if it fails.
|
||||
# In the debug builds, the driver can have time to grab the error from the socket,
|
||||
# and we get InvalidRequest exception.
|
||||
with pytest.raises((NoHostAvailable, InvalidRequest),
|
||||
match="request size too large|Unable to complete the operation against any hosts"):
|
||||
a_5mb_string = 'x'*10*1024*1024
|
||||
ncql.execute(prepared, [a_5mb_string]*6)
|
||||
protocol_errors_after = get_protocol_errors()
|
||||
assert protocol_errors_after == protocol_errors_before
|
||||
cql.execute(prepared, ["small_string"]*6)
|
||||
res = [row for row in cql.execute(f"SELECT p, t3 FROM {table1}")]
|
||||
assert len(res) == 1 and res[0].p == 42 and res[0].t3 == "small_string"
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/net/byteorder.hh>
|
||||
#include <seastar/util/lazy.hh>
|
||||
#include <seastar/util/short_streams.hh>
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "utils/result_try.hh"
|
||||
#include "utils/result_combinators.hh"
|
||||
@@ -627,9 +628,9 @@ future<> cql_server::connection::process_request() {
|
||||
f.length, mem_estimate, _server._max_request_size);
|
||||
clogger.debug("{}: {}, request dropped", _client_state.get_remote_address(), message);
|
||||
write_response(make_error(stream, exceptions::exception_code::INVALID, message, tracing::trace_state_ptr()));
|
||||
return std::exchange(_ready_to_respond, make_ready_future<>()).then([this, length = f.length] {
|
||||
return _read_buf.close();
|
||||
});
|
||||
return std::exchange(_ready_to_respond, make_ready_future<>())
|
||||
.then([this] { return _read_buf.close(); })
|
||||
.then([this] { return util::skip_entire_stream(_read_buf); });
|
||||
}
|
||||
|
||||
if (_server._stats.requests_serving > _server._max_concurrent_requests) {
|
||||
|
||||
Reference in New Issue
Block a user