diff --git a/sstables/consumer.hh b/sstables/consumer.hh index 1fc6073d20..98bb54c263 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -607,10 +607,11 @@ public: assert(data.size() == 0); _remain -= orig_data_size; if (skip.get_value() >= _remain) { + skip_bytes skip_remaining(_remain); _stream_position.position += _remain; _remain = 0; verify_end_state(); - return make_ready_future(stop_consuming{std::move(data)}); + return make_ready_future(std::move(skip_remaining)); } _stream_position.position += skip.get_value(); _remain -= skip.get_value(); diff --git a/test/boost/continuous_data_consumer_test.cc b/test/boost/continuous_data_consumer_test.cc index 671c280e4c..fcb7420700 100644 --- a/test/boost/continuous_data_consumer_test.cc +++ b/test/boost/continuous_data_consumer_test.cc @@ -26,6 +26,7 @@ #include "utils/buffer_input_stream.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/random_utils.hh" +#include "sstables/processing_result_generator.hh" #include #include @@ -118,3 +119,82 @@ SEASTAR_THREAD_TEST_CASE(test_read_unsigned_vint) { } } +class skipping_consumer final : public data_consumer::continuous_data_consumer { + int _initial_data_size; + int _to_skip; + int _next_data_size; + processing_result_generator _gen; + temporary_buffer* _processing_data; + + // stream starting with initial_data_size 'a's, followed by to_skip 'b's, + // ending with next_data_size 'a's, returning chunks of size 1 + static input_stream prepare_stream(int initial_data_size, int to_skip, int next_data_size) { + temporary_buffer buf(initial_data_size + to_skip + next_data_size); + std::memset(buf.get_write(), 'a', initial_data_size); + std::memset(buf.get_write() + initial_data_size, 'b', to_skip); + std::memset(buf.get_write() + initial_data_size + to_skip, 'a', next_data_size); + return make_buffer_input_stream(std::move(buf), [] {return 1;}); + } + static size_t prepare_initial_consumer_length(int initial_data_size, int to_skip) { + // some bytes that we want to skip may end up even after the initial consumer range + return initial_data_size + tests::random::get_int(0, to_skip); + } + +public: + skipping_consumer(reader_permit permit, int initial_data_size, int to_skip, int next_data_size) + : continuous_data_consumer(std::move(permit), prepare_stream(initial_data_size, to_skip, next_data_size), + 0, prepare_initial_consumer_length(initial_data_size, to_skip)) + , _initial_data_size(initial_data_size) + , _to_skip(to_skip) + , _next_data_size(next_data_size) + , _gen(do_process_state()) + { } + + bool non_consuming() { return false; } + + void verify_end_state() {} + + data_consumer::processing_result process_state(temporary_buffer& data) { + _processing_data = &data; + return _gen.generate(); + } + + processing_result_generator do_process_state() { + while (_initial_data_size--) { + co_yield read_8(*_processing_data); + if (_u8 != 'a') { + BOOST_FAIL("wrong data read"); + } + } + auto skipped_by_trimming = _processing_data->size(); + _processing_data->trim(0); + co_yield skip_bytes{_to_skip - skipped_by_trimming}; + while (_next_data_size--) { + co_yield read_8(*_processing_data); + if (_u8 != 'a') { + BOOST_FAIL("wrong data read"); + } + } + co_yield data_consumer::proceed::no; + } + + void run() { + consume_input().get(); + } +}; + +// Make sure that we can correctly fast forward to the next position with useful data, +// in a case when the previous consumer range ends with bytes that we want to +// skip using skip_bytes (instead of simply trimming the received data buffer) +SEASTAR_THREAD_TEST_CASE(test_skip_at_end) { + tests::reader_concurrency_semaphore_wrapper semaphore; + for (int i = 0; i < 1000; i++) { + int initial_data_size = tests::random::get_int(1, 50); + int to_skip = tests::random::get_int(1, 50); + int next_data_size = tests::random::get_int(1, 50); + skipping_consumer consumer(semaphore.make_permit(), initial_data_size, to_skip, next_data_size); + consumer.run(); + consumer.fast_forward_to(initial_data_size + to_skip, initial_data_size + to_skip + next_data_size).get(); + consumer.run(); + } +}