repair: Make to_repair_rows_list a free function.

This commit is contained in:
Mikołaj Sielużycki
2022-04-05 15:06:14 +02:00
parent 69fc74ffbe
commit c7a7680c7d
2 changed files with 58 additions and 50 deletions

View File

@@ -501,10 +501,56 @@ class row_level_repair;
static void add_to_repair_meta_for_masters(repair_meta& rm);
static void add_to_repair_meta_for_followers(repair_meta& rm);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher) {
return do_with(std::move(rows), std::list<repair_row>(), lw_shared_ptr<const decorated_key_with_hash>(), lw_shared_ptr<mutation_fragment>(), position_in_partition::tri_compare(*_schema),
[_schema, _seed, _repair_master, _permit, _repair_hasher] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _seed, _repair_master, _permit, _repair_hasher] (partition_key_and_mutation_fragments& x) mutable {
dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key());
if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) {
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*_schema, dk, _seed);
}
if (_repair_master) {
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, _schema, _permit, _repair_hasher] (frozen_mutation_fragment& fmf) mutable {
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
// Keep the mutation_fragment in repair_row as an
// optimization to avoid unfreeze again when
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
});
} else {
last_mf = {};
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _permit] (frozen_mutation_fragment& fmf) mutable {
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
// If the mutation_fragment has the same position as
// the last mutation_fragment, it means they are the
// same row with different contents. We can not feed
// such rows into the sstable writer. Instead we apply
// the mutation_fragment into the previous one.
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
last_mf->apply(*_schema, std::move(*mf));
} else {
last_mf = mf;
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
}
});
}
}).then([&row_list] {
return std::move(row_list);
});
});
}
class repair_meta {
friend repair_meta_tracker;
public:
using repair_master = bool_class<class repair_master_tag>;
using update_working_row_buf = bool_class<class update_working_row_buf_tag>;
using update_peer_row_hash_sets = bool_class<class update_peer_row_hash_sets_tag>;
using needs_all_rows_t = bool_class<class needs_all_rows_tag>;
@@ -1194,53 +1240,6 @@ private:
});
};
static future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher) {
return do_with(std::move(rows), std::list<repair_row>(), lw_shared_ptr<const decorated_key_with_hash>(), lw_shared_ptr<mutation_fragment>(), position_in_partition::tri_compare(*_schema),
[_schema, _seed, _repair_master, _permit, _repair_hasher] (repair_rows_on_wire& rows, std::list<repair_row>& row_list, lw_shared_ptr<const decorated_key_with_hash>& dk_ptr, lw_shared_ptr<mutation_fragment>& last_mf, position_in_partition::tri_compare& cmp) mutable {
return do_for_each(rows, [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _seed, _repair_master, _permit, _repair_hasher] (partition_key_and_mutation_fragments& x) mutable {
dht::decorated_key dk = dht::decorate_key(*_schema, x.get_key());
if (!(dk_ptr && dk_ptr->dk.equal(*_schema, dk))) {
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*_schema, dk, _seed);
}
if (_repair_master) {
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, _schema, _permit, _repair_hasher] (frozen_mutation_fragment& fmf) mutable {
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
// Keep the mutation_fragment in repair_row as an
// optimization to avoid unfreeze again when
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
});
} else {
last_mf = {};
return do_for_each(x.get_mutation_fragments(), [&dk_ptr, &row_list, &last_mf, &cmp, _schema, _permit] (frozen_mutation_fragment& fmf) mutable {
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
// If the mutation_fragment has the same position as
// the last mutation_fragment, it means they are the
// same row with different contents. We can not feed
// such rows into the sstable writer. Instead we apply
// the mutation_fragment into the previous one.
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
last_mf->apply(*_schema, std::move(*mf));
} else {
last_mf = mf;
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
}
});
}
}).then([&row_list] {
return std::move(row_list);
});
});
}
public:
// RPC API
// Return the hashes of the rows in _working_row_buf
@@ -2653,7 +2652,7 @@ public:
algorithm,
max_row_buf_size,
_seed,
repair_meta::repair_master::yes,
repair_master::yes,
repair_meta_id,
_ri.reason,
std::move(master_node_shard_config),
@@ -2989,7 +2988,7 @@ repair_service::insert_repair_meta(
algo,
max_row_buf_size,
seed,
repair_meta::repair_master::no,
repair_master::no,
repair_meta_id,
reason,
std::move(master_node_shard_config),

View File

@@ -12,6 +12,9 @@
#include "gms/inet_address.hh"
#include "repair/repair.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/bool_class.hh>
using namespace seastar;
class row_level_repair_gossip_helper;
@@ -228,7 +231,13 @@ public:
};
class repair_info;
using repair_master = bool_class<class repair_master_tag>;
class partition_key_and_mutation_fragments;
using repair_rows_on_wire = std::list<partition_key_and_mutation_fragments>;
class repair_row;
class repair_hasher;
future<> repair_cf_range_row_level(repair_info& ri,
sstring cf_name, utils::UUID table_id, dht::token_range range,
const std::vector<gms::inet_address>& all_peer_nodes);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr _schema, uint64_t _seed, repair_master _repair_master, reader_permit _permit, repair_hasher _repair_hasher);