Merge 'continuous_data_consumer: properly skip bytes at the end of a range' from Wojciech Mitros

When skipping bytes at the end of a continuous_data_consumer range,
the position of the consumer is moved after the skipped bytes, but
the position of the underlying input_stream is not.

This patch adds skipping of the underlying input_stream, to make
its position consistent with the position of the consumer.

Fixes #9024

Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>

Closes #9039

* github.com:scylladb/scylla:
  tests: add test for skipping bytes at end of consumer
  continuous_data_consumer: properly skip bytes at the end of a range
This commit is contained in:
Nadav Har'El
2021-07-19 15:57:26 +03:00
2 changed files with 82 additions and 1 deletions

View File

@@ -607,10 +607,11 @@ public:
assert(data.size() == 0);
_remain -= orig_data_size;
if (skip.get_value() >= _remain) {
skip_bytes skip_remaining(_remain);
_stream_position.position += _remain;
_remain = 0;
verify_end_state();
return make_ready_future<consumption_result_type>(stop_consuming<char>{std::move(data)});
return make_ready_future<consumption_result_type>(std::move(skip_remaining));
}
_stream_position.position += skip.get_value();
_remain -= skip.get_value();

View File

@@ -26,6 +26,7 @@
#include "utils/buffer_input_stream.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "sstables/processing_result_generator.hh"
#include <boost/test/unit_test.hpp>
#include <seastar/core/iostream.hh>
@@ -118,3 +119,82 @@ SEASTAR_THREAD_TEST_CASE(test_read_unsigned_vint) {
}
}
class skipping_consumer final : public data_consumer::continuous_data_consumer<skipping_consumer> {
int _initial_data_size;
int _to_skip;
int _next_data_size;
processing_result_generator _gen;
temporary_buffer<char>* _processing_data;
// stream starting with initial_data_size 'a's, followed by to_skip 'b's,
// ending with next_data_size 'a's, returning chunks of size 1
static input_stream<char> prepare_stream(int initial_data_size, int to_skip, int next_data_size) {
temporary_buffer<char> buf(initial_data_size + to_skip + next_data_size);
std::memset(buf.get_write(), 'a', initial_data_size);
std::memset(buf.get_write() + initial_data_size, 'b', to_skip);
std::memset(buf.get_write() + initial_data_size + to_skip, 'a', next_data_size);
return make_buffer_input_stream(std::move(buf), [] {return 1;});
}
static size_t prepare_initial_consumer_length(int initial_data_size, int to_skip) {
// some bytes that we want to skip may end up even after the initial consumer range
return initial_data_size + tests::random::get_int<int>(0, to_skip);
}
public:
skipping_consumer(reader_permit permit, int initial_data_size, int to_skip, int next_data_size)
: continuous_data_consumer(std::move(permit), prepare_stream(initial_data_size, to_skip, next_data_size),
0, prepare_initial_consumer_length(initial_data_size, to_skip))
, _initial_data_size(initial_data_size)
, _to_skip(to_skip)
, _next_data_size(next_data_size)
, _gen(do_process_state())
{ }
bool non_consuming() { return false; }
void verify_end_state() {}
data_consumer::processing_result process_state(temporary_buffer<char>& data) {
_processing_data = &data;
return _gen.generate();
}
processing_result_generator do_process_state() {
while (_initial_data_size--) {
co_yield read_8(*_processing_data);
if (_u8 != 'a') {
BOOST_FAIL("wrong data read");
}
}
auto skipped_by_trimming = _processing_data->size();
_processing_data->trim(0);
co_yield skip_bytes{_to_skip - skipped_by_trimming};
while (_next_data_size--) {
co_yield read_8(*_processing_data);
if (_u8 != 'a') {
BOOST_FAIL("wrong data read");
}
}
co_yield data_consumer::proceed::no;
}
void run() {
consume_input().get();
}
};
// Make sure that we can correctly fast forward to the next position with useful data,
// in a case when the previous consumer range ends with bytes that we want to
// skip using skip_bytes (instead of simply trimming the received data buffer)
SEASTAR_THREAD_TEST_CASE(test_skip_at_end) {
tests::reader_concurrency_semaphore_wrapper semaphore;
for (int i = 0; i < 1000; i++) {
int initial_data_size = tests::random::get_int<int>(1, 50);
int to_skip = tests::random::get_int<int>(1, 50);
int next_data_size = tests::random::get_int<int>(1, 50);
skipping_consumer consumer(semaphore.make_permit(), initial_data_size, to_skip, next_data_size);
consumer.run();
consumer.fast_forward_to(initial_data_size + to_skip, initial_data_size + to_skip + next_data_size).get();
consumer.run();
}
}