From f690e2e80b671f1b23afb1f95592277f280fc3bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Fri, 10 Nov 2017 16:09:07 +0000 Subject: [PATCH] repair: convert partition_checksum::compute_streamed() to flat streams --- repair/repair.cc | 74 +++++++++++++++++++----------------------------- repair/repair.hh | 5 ++-- 2 files changed, 31 insertions(+), 48 deletions(-) diff --git a/repair/repair.cc b/repair/repair.cc index 23d6edd58a..c36dd6eb0d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -576,34 +576,40 @@ public: } }; -future partition_checksum::compute_legacy(streamed_mutation m) +future partition_checksum::compute_legacy(flat_mutation_reader mr) { - return mutation_from_streamed_mutation(std::move(m)).then([] (auto mopt) { - assert(mopt); - std::array digest; - sha256_hasher h; - feed_hash(h, *mopt); - h.finalize(digest); - return partition_checksum(digest); - }); -} - -future partition_checksum::compute_streamed(streamed_mutation m) -{ - auto& s = *m.schema(); - auto h = make_lw_shared(); - m.key().feed_hash(*h, s); - return do_with(std::move(m), [&s, h] (auto& sm) mutable { - mutation_hasher mh(s, *h); - return consume(sm, std::move(mh)).then([ h ] { - std::array digest; - h->finalize(digest); - return partition_checksum(digest); + auto s = mr.schema(); + return do_with(mutation_reader_from_flat_mutation_reader(std::move(mr)), + partition_checksum(), [] (auto& reader, auto& checksum) { + return repeat([&reader, &checksum] () { + return reader().then([&checksum] (auto smopt) { + if (!smopt) { + return make_ready_future(stop_iteration::yes); + } + return mutation_from_streamed_mutation(std::move(*smopt)).then([&checksum] (auto mopt) { + assert(mopt); + std::array digest; + sha256_hasher h; + feed_hash(h, *mopt); + h.finalize(digest); + checksum.add(partition_checksum(digest)); + return stop_iteration::no; + }); + }); + }).then([&checksum] { + return checksum; }); }); } -future partition_checksum::compute(streamed_mutation m, repair_checksum hash_version) +future partition_checksum::compute_streamed(flat_mutation_reader m) +{ + return do_with(std::move(m), [] (auto& m) { + return m.consume(partition_hasher(*m.schema())); + }); +} + +future partition_checksum::compute(flat_mutation_reader m, repair_checksum hash_version) { switch (hash_version) { case repair_checksum::legacy: return compute_legacy(std::move(m)); @@ -612,28 +618,6 @@ future partition_checksum::compute(streamed_mutation m, repa } } -future partition_checksum::compute(flat_mutation_reader mr, repair_checksum hash_version) -{ - auto s = mr.schema(); - return do_with(mutation_reader_from_flat_mutation_reader(std::move(mr)), - partition_checksum(), [hash_version] (auto& reader, auto& checksum) { - return repeat([&reader, &checksum, hash_version] () { - return reader().then([&checksum, hash_version] (auto mopt) { - if (mopt) { - return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) { - checksum.add(pc); - return stop_iteration::no; - }); - } else { - return make_ready_future(stop_iteration::yes); - } - }); - }).then([&checksum] { - return checksum; - }); - }); -} - static inline unaligned& qword(std::array& b, int n) { return *unaligned_cast(b.data() + 8 * n); } diff --git a/repair/repair.hh b/repair/repair.hh index d98229e6a4..3db4f3140e 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -90,12 +90,11 @@ class partition_checksum { private: std::array _digest; // 256 bits private: - static future compute_legacy(streamed_mutation m); - static future compute_streamed(streamed_mutation m); + static future compute_legacy(flat_mutation_reader m); + static future compute_streamed(flat_mutation_reader m); public: constexpr partition_checksum() : _digest{} { } explicit partition_checksum(std::array digest) : _digest(std::move(digest)) { } - static future compute(streamed_mutation m, repair_checksum rt); static future compute(flat_mutation_reader mr, repair_checksum rt); void add(const partition_checksum& other); bool operator==(const partition_checksum& other) const;