diff --git a/repair/row_level.cc b/repair/row_level.cc index c79865cd3f..0db0a80221 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -956,7 +956,7 @@ private: return repair_hash(h.finalize_uint64()); } - stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, std::list& cur_rows) { + stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, size_t& new_rows_size, std::list& cur_rows) { if (!mfopt) { return stop_iteration::yes; } @@ -978,6 +978,7 @@ private: _metrics.row_from_disk_nr++; _metrics.row_from_disk_bytes += r.size(); cur_size += r.size(); + new_rows_size += r.size(); cur_rows.push_back(std::move(r)); return stop_iteration::no; } @@ -985,19 +986,19 @@ private: // Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size // This reads rows from where the reader left last time into _row_buf // _current_sync_boundary or _last_sync_boundary have no effect on the reader neither. - future> + future, size_t> read_rows_from_disk(size_t cur_size) { - return do_with(cur_size, std::list(), [this] (size_t& cur_size, std::list& cur_rows) { - return repeat([this, &cur_size, &cur_rows] () mutable { + return do_with(cur_size, size_t(0), std::list(), [this] (size_t& cur_size, size_t& new_rows_size, std::list& cur_rows) { + return repeat([this, &cur_size, &cur_rows, &new_rows_size] () mutable { if (cur_size >= _max_row_buf_size) { return make_ready_future(stop_iteration::yes); } _gate.check(); - return _repair_reader.read_mutation_fragment().then([this, &cur_size, &cur_rows] (mutation_fragment_opt mfopt) mutable { - return handle_mutation_fragment(std::move(mfopt), cur_size, cur_rows); + return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable { + return handle_mutation_fragment(std::move(mfopt), cur_size, new_rows_size, cur_rows); }); - }).then([&cur_rows] () mutable { - return std::move(cur_rows); + }).then([&cur_rows, &new_rows_size] () mutable { + return make_ready_future, size_t>(std::move(cur_rows), new_rows_size); }); }); } @@ -1020,8 +1021,7 @@ private: rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary); _last_sync_boundary = _current_sync_boundary; size_t current_size = row_buf_size(); - return read_rows_from_disk(current_size).then([this, skipped_sync_boundary = std::move(skipped_sync_boundary)] (std::list new_rows) mutable { - size_t new_rows_size = get_repair_rows_size(new_rows); + return read_rows_from_disk(current_size).then([this, skipped_sync_boundary = std::move(skipped_sync_boundary)] (std::list new_rows, size_t new_rows_size) mutable { size_t new_rows_nr = new_rows.size(); _row_buf.splice(_row_buf.end(), new_rows); return row_buf_csum().then([this, new_rows_size, new_rows_nr, skipped_sync_boundary = std::move(skipped_sync_boundary)] (repair_hash row_buf_combined_hash) {