repair: coroutinize move_row_buf_to_working_row_buf()

for better readability

Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
This commit is contained in:
Kefu Chai
2023-06-15 17:12:18 +08:00
parent a973b43b9f
commit e16b5ceb48

View File

@@ -25,6 +25,7 @@
#include <seastar/util/bool_class.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <list>
#include <vector>
@@ -1120,34 +1121,29 @@ private:
if (_cmp(_row_buf.back().boundary(), *_current_sync_boundary) <= 0) {
// Fast path
_working_row_buf.swap(_row_buf);
return make_ready_future<>();
co_return;
}
return do_with(_row_buf.rbegin(), [this, sz = _row_buf.size()] (auto& it) {
size_t sz = _row_buf.size();
for (auto it = _row_buf.rbegin(); it != _row_buf.rend(); ++it) {
// Move the rows > _current_sync_boundary to _working_row_buf
// Delete the rows > _current_sync_boundary from _row_buf
// Swap _working_row_buf and _row_buf so that _working_row_buf
// contains rows within (_last_sync_boundary,
// _current_sync_boundary], _row_buf contains rows wthin
// (_current_sync_boundary, ...]
return repeat([this, &it] () {
if (it == _row_buf.rend()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
repair_row& r = *(it++);
if (_cmp(r.boundary(), *_current_sync_boundary) > 0) {
_working_row_buf.push_front(std::move(r));
return make_ready_future<stop_iteration>(stop_iteration::no);
}
return make_ready_future<stop_iteration>(stop_iteration::yes);
}).then([this, sz] {
_row_buf.resize(_row_buf.size() - _working_row_buf.size());
_row_buf.swap(_working_row_buf);
if (sz != _working_row_buf.size() + _row_buf.size()) {
throw std::runtime_error(format("incorrect row_buf and working_row_buf size, before={}, after={} + {}",
sz, _working_row_buf.size(), _row_buf.size()));
}
});
});
repair_row& r = *it;
if (_cmp(r.boundary(), *_current_sync_boundary) <= 0) {
break;
}
_working_row_buf.push_front(std::move(r));
co_await coroutine::maybe_yield();
}
_row_buf.resize(_row_buf.size() - _working_row_buf.size());
_row_buf.swap(_working_row_buf);
if (sz != _working_row_buf.size() + _row_buf.size()) {
throw std::runtime_error(format("incorrect row_buf and working_row_buf size, before={}, after={} + {}",
sz, _working_row_buf.size(), _row_buf.size()));
}
}
// Move rows from <_row_buf> to <_working_row_buf> according to