diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 6fbed2ae56..4662107213 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -689,6 +689,8 @@ public: // but all previous write/flush pairs. return _pending_ops.run_with_ordered_post_op(rp, [this, size, off, buf = std::move(buf)]() mutable { /////////////////////////////////////////////////// auto view = fragmented_temporary_buffer::view(buf); + view.remove_suffix(buf.size_bytes() - size); + assert(size == view.size_bytes()); return do_with(off, view, [&] (uint64_t& off, fragmented_temporary_buffer::view& view) { if (view.empty()) { return make_ready_future<>(); diff --git a/tests/fragmented_temporary_buffer_test.cc b/tests/fragmented_temporary_buffer_test.cc index aa80e5bb42..4710abf338 100644 --- a/tests/fragmented_temporary_buffer_test.cc +++ b/tests/fragmented_temporary_buffer_test.cc @@ -133,6 +133,39 @@ SEASTAR_THREAD_TEST_CASE(test_view) { data_view.remove_prefix(data_view.size()); test(frag_view); + data_view = bytes_view(data); + frag_view = fragmented_temporary_buffer::view(frag_buffer); + + frag_view.remove_suffix(sizeof(value2) - 1); + data_view.remove_suffix(sizeof(value2) - 1); + test(frag_view); + + frag_view.remove_suffix(data_view.size()); + data_view.remove_suffix(data_view.size()); + test(frag_view); + + data_view = bytes_view(data); + frag_view = fragmented_temporary_buffer::view(frag_buffer); + + frag_view.remove_suffix(sizeof(value2) - 1); + data_view.remove_suffix(sizeof(value2) - 1); + test(frag_view); + + frag_view.remove_prefix(data_view.size()); + data_view.remove_prefix(data_view.size()); + test(frag_view); + + data_view = bytes_view(data); + frag_view = fragmented_temporary_buffer::view(frag_buffer); + + frag_view.remove_prefix(sizeof(value2) - 1); + data_view.remove_prefix(sizeof(value2) - 1); + test(frag_view); + + frag_view.remove_suffix(data_view.size()); + data_view.remove_suffix(data_view.size()); + test(frag_view); + data_view = bytes_view(data); } diff --git a/utils/fragmented_temporary_buffer.hh b/utils/fragmented_temporary_buffer.hh index 5a6283458b..7a36f5fa9c 100644 --- a/utils/fragmented_temporary_buffer.hh +++ b/utils/fragmented_temporary_buffer.hh @@ -166,21 +166,28 @@ public: if (!_total_size) { return; } - _total_size -= n; while (n > _current_size) { + _total_size -= _current_size; n -= _current_size; ++_current; - _current_size = _current->size(); + _current_size = std::min(_current->size(), _total_size); } + _total_size -= n; _current_size -= n; _current_position = _current->get() + n; if (!_current_size && _total_size) { ++_current; - _current_size = _current->size(); + _current_size = std::min(_current->size(), _total_size); _current_position = _current->get(); } } + // Invalidates iterators + void remove_suffix(size_t n) noexcept { + _total_size -= n; + _current_size = std::min(_current_size, _total_size); + } + bool operator==(const fragmented_temporary_buffer::view& other) const noexcept { auto this_it = begin(); auto other_it = other.begin();