From 122f03e689f76453cec3ab1ba4efc8589bb7d740 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 26 Oct 2015 13:19:23 +0200 Subject: [PATCH] storage_proxy: calculate mutation diffs during reconcile. --- service/storage_proxy.cc | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index d71203aaab..247612eaa4 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -59,6 +59,7 @@ #include "exceptions/exceptions.hh" #include #include +#include #include #include #include @@ -1375,6 +1376,7 @@ class data_read_resolver : public abstract_read_resolver { uint32_t _max_live_count = 0; std::vector _data_results; + std::unordered_map> _diffs; private: virtual void on_timeout() override { // we will not need them any more @@ -1444,22 +1446,38 @@ public: } } while(true); - std::vector reconciliated_partitions; - reconciliated_partitions.reserve(versions.size()); - uint32_t row_count = 0; + std::vector reconciled_partitions; + reconciled_partitions.reserve(versions.size()); - // traverse backwards since large keys are at the start - boost::range::transform(boost::make_iterator_range(versions.rbegin(), versions.rend()), std::back_inserter(reconciliated_partitions), [this, &row_count, schema] (std::vector& v) { - mutation m = std::accumulate(std::begin(v), std::end(v), mutation(v.front().par.mut().key(*schema), schema), [this, schema = std::move(schema)] (mutation& m, const version& ver) { + // reconcile all versions + boost::range::transform(boost::make_iterator_range(versions.begin(), versions.end()), std::back_inserter(reconciled_partitions), [this, schema] (std::vector& v) { + return boost::accumulate(v, mutation(v.front().par.mut().key(*schema), schema), [this, schema = std::move(schema)] (mutation& m, const version& ver) { m.partition().apply(*schema, ver.par.mut().partition()); return std::move(m); }); - auto count = m.live_row_count(); - row_count += count; - return partition(count, freeze(m)); }); - return reconcilable_result(row_count, std::move(reconciliated_partitions)); + // calculate differences + for (auto z : boost::combine(versions, reconciled_partitions)) { + const mutation& m = z.get<1>(); + for (const version& v : z.get<0>()) { + auto diff = m.partition().difference(schema, v.par.mut().unfreeze(schema).partition()); + if (!diff.empty()) { + _diffs[v.from].emplace_back(mutation(schema, m.decorated_key(), std::move(diff))); + } + } + } + + // build reconcilable_result from reconciled data + // traverse backwards since large keys are at the start + auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::make_pair(uint32_t(0), std::vector()), [] (auto&& a, const mutation& m) { + auto count = m.live_row_count(); + a.first += count; + a.second.emplace_back(partition(count, freeze(m))); + return std::move(a); + }); + + return reconcilable_result(r.first, std::move(r.second)); } };