From c7a7680c7dfd159853dc30482ffa084ae21f0091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Sielu=C5=BCycki?= Date: Tue, 5 Apr 2022 15:06:14 +0200 Subject: [PATCH] repair: Make to_repair_rows_list a free function. --- repair/row_level.cc | 99 ++++++++++++++++++++++----------------------- repair/row_level.hh | 9 +++++ 2 files changed, 58 insertions(+), 50 deletions(-) diff --git a/repair/row_level.cc b/repair/row_level.cc index ef4624ae11..a5a399cdcb 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -501,10 +501,56 @@ class row_level_repair; static void add_to_repair_meta_for_masters(repair_meta& rm); static void add_to_repair_meta_for_followers(repair_meta& rm); +future> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher) { + return do_with(std::move(rows), std::list(), lw_shared_ptr(), lw_shared_ptr(), position_in_partition::tri_compare(*_schema), + [_schema, _seed, _repair_master, _permit, _repair_hasher] (repair_rows_on_wire& rows, std::list& row_list, lw_shared_ptr& dk_ptr, lw_shared_ptr& last_mf, position_in_partition::tri_compare& cmp) mutable { + return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _seed, _repair_master, _permit, _repair_hasher] (partition_key_and_mutation_fragments& x) mutable { + dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key()); + if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) { + dk_ptr = make_lw_shared(*_schema, dk, _seed); + } + if (_repair_master) { + return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, _schema, _permit, _repair_hasher] (frozen_mutation_fragment& fmf) mutable { + _metrics.rx_row_nr += 1; + _metrics.rx_row_bytes += fmf.representation().size(); + // Keep the mutation_fragment in repair_row as an + // optimization to avoid unfreeze again when + // mutation_fragment is needed by _repair_writer.do_write() + // to apply the repair_row to disk + auto mf = make_lw_shared(fmf.unfreeze(*_schema, _permit)); + auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf); + position_in_partition pos(mf->position()); + row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf))); + }); + } else { + last_mf = {}; + return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _permit] (frozen_mutation_fragment& fmf) mutable { + _metrics.rx_row_nr += 1; + _metrics.rx_row_bytes += fmf.representation().size(); + auto mf = make_lw_shared(fmf.unfreeze(*_schema, _permit)); + // If the mutation_fragment has the same position as + // the last mutation_fragment, it means they are the + // same row with different contents. We can not feed + // such rows into the sstable writer. Instead we apply + // the mutation_fragment into the previous one. + if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) { + last_mf->apply(*_schema, std::move(*mf)); + } else { + last_mf = mf; + // On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used. + row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf))); + } + }); + } + }).then([&row_list] { + return std::move(row_list); + }); + }); +} + class repair_meta { friend repair_meta_tracker; public: - using repair_master = bool_class; using update_working_row_buf = bool_class; using update_peer_row_hash_sets = bool_class; using needs_all_rows_t = bool_class; @@ -1194,53 +1240,6 @@ private: }); }; - static future> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher) { - return do_with(std::move(rows), std::list(), lw_shared_ptr(), lw_shared_ptr(), position_in_partition::tri_compare(*_schema), - [_schema, _seed, _repair_master, _permit, _repair_hasher] (repair_rows_on_wire& rows, std::list& row_list, lw_shared_ptr& dk_ptr, lw_shared_ptr& last_mf, position_in_partition::tri_compare& cmp) mutable { - return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _seed, _repair_master, _permit, _repair_hasher] (partition_key_and_mutation_fragments& x) mutable { - dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key()); - if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) { - dk_ptr = make_lw_shared(*_schema, dk, _seed); - } - if (_repair_master) { - return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, _schema, _permit, _repair_hasher] (frozen_mutation_fragment& fmf) mutable { - _metrics.rx_row_nr += 1; - _metrics.rx_row_bytes += fmf.representation().size(); - // Keep the mutation_fragment in repair_row as an - // optimization to avoid unfreeze again when - // mutation_fragment is needed by _repair_writer.do_write() - // to apply the repair_row to disk - auto mf = make_lw_shared(fmf.unfreeze(*_schema, _permit)); - auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf); - position_in_partition pos(mf->position()); - row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf))); - }); - } else { - last_mf = {}; - return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _permit] (frozen_mutation_fragment& fmf) mutable { - _metrics.rx_row_nr += 1; - _metrics.rx_row_bytes += fmf.representation().size(); - auto mf = make_lw_shared(fmf.unfreeze(*_schema, _permit)); - // If the mutation_fragment has the same position as - // the last mutation_fragment, it means they are the - // same row with different contents. We can not feed - // such rows into the sstable writer. Instead we apply - // the mutation_fragment into the previous one. - if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) { - last_mf->apply(*_schema, std::move(*mf)); - } else { - last_mf = mf; - // On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used. - row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf))); - } - }); - } - }).then([&row_list] { - return std::move(row_list); - }); - }); - } - public: // RPC API // Return the hashes of the rows in _working_row_buf @@ -2653,7 +2652,7 @@ public: algorithm, max_row_buf_size, _seed, - repair_meta::repair_master::yes, + repair_master::yes, repair_meta_id, _ri.reason, std::move(master_node_shard_config), @@ -2989,7 +2988,7 @@ repair_service::insert_repair_meta( algo, max_row_buf_size, seed, - repair_meta::repair_master::no, + repair_master::no, repair_meta_id, reason, std::move(master_node_shard_config), diff --git a/repair/row_level.hh b/repair/row_level.hh index 8d9da597e2..9cf4a53d14 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -12,6 +12,9 @@ #include "gms/inet_address.hh" #include "repair/repair.hh" #include +#include + +using namespace seastar; class row_level_repair_gossip_helper; @@ -228,7 +231,13 @@ public: }; class repair_info; +using repair_master = bool_class; +class partition_key_and_mutation_fragments; +using repair_rows_on_wire = std::list; +class repair_row; +class repair_hasher; future<> repair_cf_range_row_level(repair_info& ri, sstring cf_name, utils::UUID table_id, dht::token_range range, const std::vector& all_peer_nodes); +future> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher);