storage_proxy: calculate mutation diffs during reconcile.

This commit is contained in:
Gleb Natapov
2015-10-26 13:19:23 +02:00
parent 8bbe4bdbd4
commit 122f03e689

View File

@@ -59,6 +59,7 @@
#include "exceptions/exceptions.hh"
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include <boost/iterator/counting_iterator.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/algorithm/count_if.hpp>
@@ -1375,6 +1376,7 @@ class data_read_resolver : public abstract_read_resolver {
uint32_t _max_live_count = 0;
std::vector<reply> _data_results;
std::unordered_map<gms::inet_address, std::vector<mutation>> _diffs;
private:
virtual void on_timeout() override {
// we will not need them any more
@@ -1444,22 +1446,38 @@ public:
}
} while(true);
std::vector<partition> reconciliated_partitions;
reconciliated_partitions.reserve(versions.size());
uint32_t row_count = 0;
std::vector<mutation> reconciled_partitions;
reconciled_partitions.reserve(versions.size());
// traverse backwards since large keys are at the start
boost::range::transform(boost::make_iterator_range(versions.rbegin(), versions.rend()), std::back_inserter(reconciliated_partitions), [this, &row_count, schema] (std::vector<version>& v) {
mutation m = std::accumulate(std::begin(v), std::end(v), mutation(v.front().par.mut().key(*schema), schema), [this, schema = std::move(schema)] (mutation& m, const version& ver) {
// reconcile all versions
boost::range::transform(boost::make_iterator_range(versions.begin(), versions.end()), std::back_inserter(reconciled_partitions), [this, schema] (std::vector<version>& v) {
return boost::accumulate(v, mutation(v.front().par.mut().key(*schema), schema), [this, schema = std::move(schema)] (mutation& m, const version& ver) {
m.partition().apply(*schema, ver.par.mut().partition());
return std::move(m);
});
auto count = m.live_row_count();
row_count += count;
return partition(count, freeze(m));
});
return reconcilable_result(row_count, std::move(reconciliated_partitions));
// calculate differences
for (auto z : boost::combine(versions, reconciled_partitions)) {
const mutation& m = z.get<1>();
for (const version& v : z.get<0>()) {
auto diff = m.partition().difference(schema, v.par.mut().unfreeze(schema).partition());
if (!diff.empty()) {
_diffs[v.from].emplace_back(mutation(schema, m.decorated_key(), std::move(diff)));
}
}
}
// build reconcilable_result from reconciled data
// traverse backwards since large keys are at the start
auto r = boost::accumulate(reconciled_partitions | boost::adaptors::reversed, std::make_pair(uint32_t(0), std::vector<partition>()), [] (auto&& a, const mutation& m) {
auto count = m.live_row_count();
a.first += count;
a.second.emplace_back(partition(count, freeze(m)));
return std::move(a);
});
return reconcilable_result(r.first, std::move(r.second));
}
};