repair: Move row_repair hashing logic to separate class and file.

This commit is contained in:
Mikołaj Sielużycki
2022-04-11 18:45:30 +02:00
parent 8427ec056c
commit 0fa703de3e
3 changed files with 65 additions and 38 deletions

51
repair/hash.hh Normal file
View File

@@ -0,0 +1,51 @@
#pragma once
#include <absl/container/btree_set.h>
#include <cstdint>
#include <ostream>
#include "schema.hh"
class decorated_key_with_hash;
class mutation_fragment;
// Hash of a repair row
class repair_hash {
public:
uint64_t hash = 0;
repair_hash() = default;
explicit repair_hash(uint64_t h) : hash(h) {
}
void clear() {
hash = 0;
}
void add(const repair_hash& other) {
hash ^= other.hash;
}
bool operator==(const repair_hash& x) const {
return x.hash == hash;
}
bool operator!=(const repair_hash& x) const {
return x.hash != hash;
}
bool operator<(const repair_hash& x) const {
return x.hash < hash;
}
friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) {
return os << x.hash;
}
};
using repair_hash_set = absl::btree_set<repair_hash>;
class repair_hasher {
uint64_t _seed;
schema_ptr _schema;
public:
repair_hasher(uint64_t seed, schema_ptr s)
: _seed(seed)
, _schema(std::move(s))
{}
repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf);
};

View File

@@ -26,6 +26,7 @@
#include "utils/hash.hh"
#include "streaming/stream_reason.hh"
#include "locator/token_metadata.hh"
#include "repair/hash.hh"
class flat_mutation_reader;
@@ -280,35 +281,6 @@ struct repair_sync_boundary {
}
};
// Hash of a repair row
class repair_hash {
public:
uint64_t hash = 0;
repair_hash() = default;
explicit repair_hash(uint64_t h) : hash(h) {
}
void clear() {
hash = 0;
}
void add(const repair_hash& other) {
hash ^= other.hash;
}
bool operator==(const repair_hash& x) const {
return x.hash == hash;
}
bool operator!=(const repair_hash& x) const {
return x.hash != hash;
}
bool operator<(const repair_hash& x) const {
return x.hash < hash;
}
friend std::ostream& operator<<(std::ostream& os, const repair_hash& x) {
return os << x.hash;
}
};
using repair_hash_set = absl::btree_set<repair_hash>;
enum class repair_row_level_start_status: uint8_t {
ok,
no_such_column_family,

View File

@@ -49,6 +49,8 @@
#include "readers/empty.hh"
#include "readers/evictable.hh"
#include "readers/queue.hh"
#include "repair/hash.hh"
#include "xx_hasher.hh"
extern logging::logger rlogger;
@@ -253,6 +255,13 @@ public:
}
};
repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
xx_hasher h(_seed);
feed_hash(h, mf, *_schema);
feed_hash(h, dk_with_hash.hash.hash);
return repair_hash(h.finalize_uint64());
}
using is_dirty_on_master = bool_class<class is_dirty_on_master_tag>;
class repair_row {
@@ -687,6 +696,7 @@ private:
std::vector<repair_node_state> _all_node_states;
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_promise<>> _stop_promise;
repair_hasher _repair_hasher;
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
@@ -787,6 +797,7 @@ public:
return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, addr);
})
, _row_level_repair_ptr(row_level_repair_ptr)
, _repair_hasher(_seed, _schema)
{
if (master) {
add_to_repair_meta_for_masters(*this);
@@ -977,13 +988,6 @@ private:
});
}
repair_hash do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
xx_hasher h(_seed);
feed_hash(h, mf, *_schema);
feed_hash(h, dk_with_hash.hash.hash);
return repair_hash(h.finalize_uint64());
}
stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
if (mf.is_partition_start()) {
auto& start = mf.as_partition_start();
@@ -996,7 +1000,7 @@ private:
_repair_reader.clear_current_dk();
return stop_iteration::no;
}
auto hash = do_hash_for_mf(*_repair_reader.get_current_dk(), mf);
auto hash = _repair_hasher.do_hash_for_mf(*_repair_reader.get_current_dk(), mf);
repair_row r(freeze(*_schema, mf), position_in_partition(mf.position()), _repair_reader.get_current_dk(), hash, is_dirty_on_master::no);
rlogger.trace("Reading: r.boundary={}, r.hash={}", r.boundary(), r.hash());
_metrics.row_from_disk_nr++;
@@ -1339,7 +1343,7 @@ private:
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*_schema, _permit));
auto hash = do_hash_for_mf(*dk_ptr, *mf);
auto hash = _repair_hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
});