mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-25 09:11:10 +00:00
repair: Make repair_meta::to_repair_rows_list a static function
It allows pulling out the logic of convering on-the-wire representation of repair mutations to an internal representation used later for flushing repair mutations to disk. This in turn is needed to unit test the functionality without spinning up clusters, which significantly improves developer iteration time.
This commit is contained in:
@@ -1091,7 +1091,7 @@ private:
|
||||
if (rows.empty()) {
|
||||
return;
|
||||
}
|
||||
auto row_diff = to_repair_rows_list(std::move(rows)).get0();
|
||||
auto row_diff = to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).get0();
|
||||
auto sz = get_repair_rows_size(row_diff).get0();
|
||||
stats().rx_row_bytes += sz;
|
||||
stats().rx_row_nr += row_diff.size();
|
||||
@@ -1158,7 +1158,7 @@ private:
|
||||
if (rows.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return to_repair_rows_list(std::move(rows)).then([this] (std::list<repair_row> row_diff) {
|
||||
return to_repair_rows_list(std::move(rows), _schema, _seed, _repair_master, _permit, _repair_hasher).then([this] (std::list<repair_row> row_diff) {
|
||||
return do_apply_rows(std::move(row_diff), update_working_row_buf::no);
|
||||
});
|
||||
}
|
||||
@@ -1194,16 +1194,16 @@ private:
|
||||
});
|
||||
};
|
||||
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows) {
|
||||
static future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher) {
|
||||
return do_with(std::move(rows), std::list<repair_row>(), lw_shared_ptr<const decorated_key_with_hash>(), lw_shared_ptr<mutation_fragment>(), position_in_partition::tri_compare(*_schema),
|
||||
[this] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
|
||||
return do_for_each(rows, [this, &dk_ptr, &row_list, &last_mf, &cmp] (partition_key_and_mutation_fragments& x) mutable {
|
||||
[_schema, _seed, _repair_master, _permit, _repair_hasher] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
|
||||
return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _seed, _repair_master, _permit, _repair_hasher] (partition_key_and_mutation_fragments& x) mutable {
|
||||
dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key());
|
||||
if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) {
|
||||
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*_schema, dk, _seed);
|
||||
}
|
||||
if (_repair_master) {
|
||||
return do_for_each(x.get_mutation_fragments(), [this, &dk_ptr, &row_list] (frozen_mutation_fragment& fmf) mutable {
|
||||
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, _schema, _permit, _repair_hasher] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
// Keep the mutation_fragment in repair_row as an
|
||||
@@ -1217,7 +1217,7 @@ private:
|
||||
});
|
||||
} else {
|
||||
last_mf = {};
|
||||
return do_for_each(x.get_mutation_fragments(), [this, &dk_ptr, &row_list, &last_mf, &cmp] (frozen_mutation_fragment& fmf) mutable {
|
||||
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _permit] (frozen_mutation_fragment& fmf) mutable {
|
||||
_metrics.rx_row_nr += 1;
|
||||
_metrics.rx_row_bytes += fmf.representation().size();
|
||||
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
|
||||
|
||||
Reference in New Issue
Block a user