repair: Avoid calling get_repair_rows_size in get_sync_boundary

Instead of calling get_repair_rows_size() which might stall with large
number of rows, return the size of the rows from read_rows_from_disk.
This commit is contained in:
Asias He
2019-07-01 16:31:25 +08:00
parent 4d41f8e57e
commit 809c992b30

View File

@@ -956,7 +956,7 @@ private:
return repair_hash(h.finalize_uint64());
}
stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, std::list<repair_row>& cur_rows) {
stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (!mfopt) {
return stop_iteration::yes;
}
@@ -978,6 +978,7 @@ private:
_metrics.row_from_disk_nr++;
_metrics.row_from_disk_bytes += r.size();
cur_size += r.size();
new_rows_size += r.size();
cur_rows.push_back(std::move(r));
return stop_iteration::no;
}
@@ -985,19 +986,19 @@ private:
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
// This reads rows from where the reader left last time into _row_buf
// _current_sync_boundary or _last_sync_boundary have no effect on the reader neither.
future<std::list<repair_row>>
future<std::list<repair_row>, size_t>
read_rows_from_disk(size_t cur_size) {
return do_with(cur_size, std::list<repair_row>(), [this] (size_t& cur_size, std::list<repair_row>& cur_rows) {
return repeat([this, &cur_size, &cur_rows] () mutable {
return do_with(cur_size, size_t(0), std::list<repair_row>(), [this] (size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
return repeat([this, &cur_size, &cur_rows, &new_rows_size] () mutable {
if (cur_size >= _max_row_buf_size) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
_gate.check();
return _repair_reader.read_mutation_fragment().then([this, &cur_size, &cur_rows] (mutation_fragment_opt mfopt) mutable {
return handle_mutation_fragment(std::move(mfopt), cur_size, cur_rows);
return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable {
return handle_mutation_fragment(std::move(mfopt), cur_size, new_rows_size, cur_rows);
});
}).then([&cur_rows] () mutable {
return std::move(cur_rows);
}).then([&cur_rows, &new_rows_size] () mutable {
return make_ready_future<std::list<repair_row>, size_t>(std::move(cur_rows), new_rows_size);
});
});
}
@@ -1020,8 +1021,7 @@ private:
rlogger.trace("SET _last_sync_boundary from {} to {}", _last_sync_boundary, _current_sync_boundary);
_last_sync_boundary = _current_sync_boundary;
size_t current_size = row_buf_size();
return read_rows_from_disk(current_size).then([this, skipped_sync_boundary = std::move(skipped_sync_boundary)] (std::list<repair_row> new_rows) mutable {
size_t new_rows_size = get_repair_rows_size(new_rows);
return read_rows_from_disk(current_size).then([this, skipped_sync_boundary = std::move(skipped_sync_boundary)] (std::list<repair_row> new_rows, size_t new_rows_size) mutable {
size_t new_rows_nr = new_rows.size();
_row_buf.splice(_row_buf.end(), new_rows);
return row_buf_csum().then([this, new_rows_size, new_rows_nr, skipped_sync_boundary = std::move(skipped_sync_boundary)] (repair_hash row_buf_combined_hash) {