repair: convert partition_checksum::compute_streamed() to flat streams

This commit is contained in:
Paweł Dziepak
2017-11-10 16:09:07 +00:00
parent d71a14b943
commit f690e2e80b
2 changed files with 31 additions and 48 deletions

View File

@@ -576,34 +576,40 @@ public:
}
};
future<partition_checksum> partition_checksum::compute_legacy(streamed_mutation m)
future<partition_checksum> partition_checksum::compute_legacy(flat_mutation_reader mr)
{
return mutation_from_streamed_mutation(std::move(m)).then([] (auto mopt) {
assert(mopt);
std::array<uint8_t, 32> digest;
sha256_hasher h;
feed_hash(h, *mopt);
h.finalize(digest);
return partition_checksum(digest);
});
}
future<partition_checksum> partition_checksum::compute_streamed(streamed_mutation m)
{
auto& s = *m.schema();
auto h = make_lw_shared<sha256_hasher>();
m.key().feed_hash(*h, s);
return do_with(std::move(m), [&s, h] (auto& sm) mutable {
mutation_hasher<sha256_hasher> mh(s, *h);
return consume(sm, std::move(mh)).then([ h ] {
std::array<uint8_t, 32> digest;
h->finalize(digest);
return partition_checksum(digest);
auto s = mr.schema();
return do_with(mutation_reader_from_flat_mutation_reader(std::move(mr)),
partition_checksum(), [] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum] () {
return reader().then([&checksum] (auto smopt) {
if (!smopt) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return mutation_from_streamed_mutation(std::move(*smopt)).then([&checksum] (auto mopt) {
assert(mopt);
std::array<uint8_t, 32> digest;
sha256_hasher h;
feed_hash(h, *mopt);
h.finalize(digest);
checksum.add(partition_checksum(digest));
return stop_iteration::no;
});
});
}).then([&checksum] {
return checksum;
});
});
}
future<partition_checksum> partition_checksum::compute(streamed_mutation m, repair_checksum hash_version)
future<partition_checksum> partition_checksum::compute_streamed(flat_mutation_reader m)
{
return do_with(std::move(m), [] (auto& m) {
return m.consume(partition_hasher(*m.schema()));
});
}
future<partition_checksum> partition_checksum::compute(flat_mutation_reader m, repair_checksum hash_version)
{
switch (hash_version) {
case repair_checksum::legacy: return compute_legacy(std::move(m));
@@ -612,28 +618,6 @@ future<partition_checksum> partition_checksum::compute(streamed_mutation m, repa
}
}
future<partition_checksum> partition_checksum::compute(flat_mutation_reader mr, repair_checksum hash_version)
{
auto s = mr.schema();
return do_with(mutation_reader_from_flat_mutation_reader(std::move(mr)),
partition_checksum(), [hash_version] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum, hash_version] () {
return reader().then([&checksum, hash_version] (auto mopt) {
if (mopt) {
return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) {
checksum.add(pc);
return stop_iteration::no;
});
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).then([&checksum] {
return checksum;
});
});
}
static inline unaligned<uint64_t>& qword(std::array<uint8_t, 32>& b, int n) {
return *unaligned_cast<uint64_t>(b.data() + 8 * n);
}

View File

@@ -90,12 +90,11 @@ class partition_checksum {
private:
std::array<uint8_t, 32> _digest; // 256 bits
private:
static future<partition_checksum> compute_legacy(streamed_mutation m);
static future<partition_checksum> compute_streamed(streamed_mutation m);
static future<partition_checksum> compute_legacy(flat_mutation_reader m);
static future<partition_checksum> compute_streamed(flat_mutation_reader m);
public:
constexpr partition_checksum() : _digest{} { }
explicit partition_checksum(std::array<uint8_t, 32> digest) : _digest(std::move(digest)) { }
static future<partition_checksum> compute(streamed_mutation m, repair_checksum rt);
static future<partition_checksum> compute(flat_mutation_reader mr, repair_checksum rt);
void add(const partition_checksum& other);
bool operator==(const partition_checksum& other) const;