diff --git a/repair/hash.hh b/repair/hash.hh new file mode 100644 index 0000000000..8569f99126 --- /dev/null +++ b/repair/hash.hh @@ -0,0 +1,51 @@ +#pragma once +#include +#include +#include +#include "schema.hh" + +class decorated_key_with_hash; +class mutation_fragment; + +// Hash of a repair row +class repair_hash { +public: + uint64_t hash = 0; + repair_hash() = default; + explicit repair_hash(uint64_t h) : hash(h) { + } + void clear() { + hash = 0; + } + void add(const repair_hash& other) { + hash ^= other.hash; + } + bool operator==(const repair_hash& x) const { + return x.hash == hash; + } + bool operator!=(const repair_hash& x) const { + return x.hash != hash; + } + bool operator<(const repair_hash& x) const { + return x.hash < hash; + } + friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) { + return os << x.hash; + } +}; + +using repair_hash_set = absl::btree_set; + +class repair_hasher { + uint64_t _seed; + schema_ptr _schema; +public: + repair_hasher(uint64_t seed, schema_ptr s) + : _seed(seed) + , _schema(std::move(s)) + {} + + repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf); +}; + + diff --git a/repair/repair.hh b/repair/repair.hh index ce518f9514..3e81d59b05 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -26,6 +26,7 @@ #include "utils/hash.hh" #include "streaming/stream_reason.hh" #include "locator/token_metadata.hh" +#include "repair/hash.hh" class flat_mutation_reader; @@ -280,35 +281,6 @@ struct repair_sync_boundary { } }; -// Hash of a repair row -class repair_hash { -public: - uint64_t hash = 0; - repair_hash() = default; - explicit repair_hash(uint64_t h) : hash(h) { - } - void clear() { - hash = 0; - } - void add(const repair_hash& other) { - hash ^= other.hash; - } - bool operator==(const repair_hash& x) const { - return x.hash == hash; - } - bool operator!=(const repair_hash& x) const { - return x.hash != hash; - } - bool operator<(const repair_hash& x) const { - return x.hash < hash; - } - friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) { - return os << x.hash; - } -}; - -using repair_hash_set = absl::btree_set; - enum class repair_row_level_start_status: uint8_t { ok, no_such_column_family, diff --git a/repair/row_level.cc b/repair/row_level.cc index 4fc3717576..bf9d60a5b0 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -49,6 +49,8 @@ #include "readers/empty.hh" #include "readers/evictable.hh" #include "readers/queue.hh" +#include "repair/hash.hh" +#include "xx_hasher.hh" extern logging::logger rlogger; @@ -253,6 +255,13 @@ public: } }; +repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) { + xx_hasher h(_seed); + feed_hash(h, mf, *_schema); + feed_hash(h, dk_with_hash.hash.hash); + return repair_hash(h.finalize_uint64()); +} + using is_dirty_on_master = bool_class; class repair_row { @@ -687,6 +696,7 @@ private: std::vector _all_node_states; is_dirty_on_master _dirty_on_master = is_dirty_on_master::no; std::optional> _stop_promise; + repair_hasher _repair_hasher; public: std::vector& all_nodes() { return _all_node_states; @@ -787,6 +797,7 @@ public: return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr); }) , _row_level_repair_ptr(row_level_repair_ptr) + , _repair_hasher(_seed, _schema) { if (master) { add_to_repair_meta_for_masters(*this); @@ -977,13 +988,6 @@ private: }); } - repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) { - xx_hasher h(_seed); - feed_hash(h, mf, *_schema); - feed_hash(h, dk_with_hash.hash.hash); - return repair_hash(h.finalize_uint64()); - } - stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list& cur_rows) { if (mf.is_partition_start()) { auto& start = mf.as_partition_start(); @@ -996,7 +1000,7 @@ private: _repair_reader.clear_current_dk(); return stop_iteration::no; } - auto hash = do_hash_for_mf(*_repair_reader.get_current_dk(), mf); + auto hash = _repair_hasher.do_hash_for_mf(*_repair_reader.get_current_dk(), mf); repair_row r(freeze(*_schema, mf), position_in_partition(mf.position()), _repair_reader.get_current_dk(), hash, is_dirty_on_master::no); rlogger.trace("Reading: r.boundary={}, r.hash={}", r.boundary(), r.hash()); _metrics.row_from_disk_nr++; @@ -1339,7 +1343,7 @@ private: // mutation_fragment is needed by _repair_writer.do_write() // to apply the repair_row to disk auto mf = make_lw_shared(fmf.unfreeze(*_schema, _permit)); - auto hash = do_hash_for_mf(*dk_ptr, *mf); + auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf); position_in_partition pos(mf->position()); row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf))); });