Merge 'redis: support large redis message' from Takuya ASADA

If the message is larger than current buffer size, we need to consume
more data until we reach to tail of the message.
To do so, we need to return nullptr when it's not on the tail.

Fixes #7273

Closes #7903

* github.com:scylladb/scylla:
  redis: rename _args_size/_size_left There are two types of numerical parameter in redis protocol:  - *[0-9]+ defined array size  - $[0-9]+ defined string size
  redis: fix large message handling
This commit is contained in:
Nadav Har'El
2021-01-25 10:11:17 +02:00
2 changed files with 28 additions and 13 deletions

View File

@@ -41,19 +41,19 @@ action mark {
action start_blob {
g.mark_start(p);
_size_left = _arg_size;
_bytes_left = _bytes_count;
}
action start_command {
g.mark_start(p);
_size_left = _arg_size;
_bytes_left = _bytes_count;
}
action advance_blob {
auto len = std::min(static_cast<uint32_t>(pe - p), _size_left);
_size_left -= len;
auto len = std::min(static_cast<uint32_t>(pe - p), _bytes_left);
_bytes_left -= len;
p += len;
if (_size_left == 0) {
if (_bytes_left == 0) {
_req._args.push_back(str());
p--;
fret;
@@ -62,10 +62,10 @@ action advance_blob {
}
action advance_command {
auto len = std::min(static_cast<uint32_t>(pe - p), _size_left);
_size_left -= len;
auto len = std::min(static_cast<uint32_t>(pe - p), _bytes_left);
_bytes_left -= len;
p += len;
if (_size_left == 0) {
if (_bytes_left == 0) {
_req._command = str();
p--;
fret;
@@ -79,7 +79,7 @@ u32 = digit+ >{ _u32 = 0;} ${ _u32 *= 10; _u32 += fc - '0';};
args_count = '*' u32 crlf ${_req._args_count = _u32 - 1;};
blob := any+ >start_blob $advance_blob;
command := any+ >start_command $advance_command;
arg = '$' u32 crlf ${ _arg_size = _u32;};
arg = '$' u32 crlf ${ _bytes_count = _u32;};
main := (args_count (arg @{fcall command; } crlf) (arg @{fcall blob; } crlf)+) ${_req._state = request_state::ok;} >eof{_req._state = request_state::eof;};
@@ -98,16 +98,16 @@ class redis_protocol_parser : public ragel_parser_base<redis_protocol_parser> {
public:
redis::request _req;
uint32_t _u32;
uint32_t _arg_size;
uint32_t _size_left;
uint32_t _bytes_count;
uint32_t _bytes_left;
public:
virtual void init() {
init_base();
_req._state = request_state::error;
_req._args.clear();
_req._args_count = 0;
_size_left = 0;
_arg_size = 0;
_bytes_left = 0;
_bytes_count = 0;
%% write init;
}
@@ -120,6 +120,11 @@ public:
};
%% write exec;
// does not reach to the tail of the message, continue reading
if (_bytes_left || _req._args_count - _req._args.size()) {
return nullptr;
}
if (_req._state != request_state::error) {
return p;
}

View File

@@ -150,6 +150,16 @@ def test_exists_multiple_existent_key(redis_host, redis_port):
assert r.get(key4) == None
assert r.exists(key1, key2, key3, key4) == 3
def test_exists_lots_of_keys(redis_host, redis_port):
r = connect(redis_host, redis_port)
keys = []
for i in range(0, 30):
k = random_string(11)
v = random_string(10)
r.set(k, v)
keys.append(k)
assert r.exists(*keys) == len(keys)
def test_setex_ttl(redis_host, redis_port):
r = connect(redis_host, redis_port)
key = random_string(10)