diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh
index e6e8025565..b89e57a96b 100644
--- a/idl/partition_checksum.idl.hh
+++ b/idl/partition_checksum.idl.hh
@@ -19,6 +19,11 @@
* along with Scylla. If not, see .
*/
+enum class repair_checksum : uint8_t {
+ legacy = 0,
+ streamed = 1,
+};
+
class partition_checksum {
std::array digest();
};
diff --git a/main.cc b/main.cc
index 3f17f1e146..cf2de59df7 100644
--- a/main.cc
+++ b/main.cc
@@ -578,10 +578,11 @@ int main(int ac, char** av) {
api::set_server_stream_manager(ctx).get();
// Start handling REPAIR_CHECKSUM_RANGE messages
net::get_messaging_service().invoke_on_all([&db] (auto& ms) {
- ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range range) {
+ ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, query::range range, rpc::optional hash_version) {
+ auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
- [&db] (auto& keyspace, auto& cf, auto& range) {
- return checksum_range(db, keyspace, cf, range);
+ [&db, hv] (auto& keyspace, auto& cf, auto& range) {
+ return checksum_range(db, keyspace, cf, range, hv);
});
});
}).get();
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
index 66eef4ccf4..e57f412a73 100644
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -875,18 +875,18 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address
// Wrapper for REPAIR_CHECKSUM_RANGE
void messaging_service::register_repair_checksum_range(
std::function (sstring keyspace,
- sstring cf, query::range range)>&& f) {
+ sstring cf, query::range range, rpc::optional hash_version)>&& f) {
register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f));
}
void messaging_service::unregister_repair_checksum_range() {
_rpc->unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE);
}
future messaging_service::send_repair_checksum_range(
- msg_addr id, sstring keyspace, sstring cf, ::range range)
+ msg_addr id, sstring keyspace, sstring cf, ::range range, repair_checksum hash_version)
{
return send_message(this,
messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id),
- std::move(keyspace), std::move(cf), std::move(range));
+ std::move(keyspace), std::move(cf), std::move(range), hash_version);
}
} // namespace net
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
index 1b77f2ce78..8f7e18e46f 100644
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -30,6 +30,7 @@
#include "query-request.hh"
#include "mutation_query.hh"
#include "range.hh"
+#include "repair/repair.hh"
#include
@@ -233,9 +234,9 @@ public:
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id);
// Wrapper for REPAIR_CHECKSUM_RANGE verb
- void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, range range)>&& func);
+ void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, range range, rpc::optional hash_version)>&& func);
void unregister_repair_checksum_range();
- future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, range range);
+ future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, range range, repair_checksum hash_version);
// Wrapper for GOSSIP_ECHO verb
void register_gossip_echo(std::function ()>&& func);
diff --git a/repair/repair.cc b/repair/repair.cc
index 8c2332052d..500ef00ffe 100644
--- a/repair/repair.cc
+++ b/repair/repair.cc
@@ -263,11 +263,40 @@ public:
}
};
+future partition_checksum::compute_legacy(streamed_mutation m)
+{
+ return mutation_from_streamed_mutation(std::move(m)).then([] (auto mopt) {
+ assert(mopt);
+ std::array digest;
+ sha256_hasher h;
+ feed_hash(h, *mopt);
+ h.finalize(digest);
+ return partition_checksum(digest);
+ });
+}
-partition_checksum::partition_checksum(const mutation& m) {
- sha256_hasher h;
- feed_hash(h, m);
- h.finalize(_digest);
+future partition_checksum::compute_streamed(streamed_mutation m)
+{
+ auto& s = *m.schema();
+ auto h = make_lw_shared();
+ m.key().feed_hash(*h, s);
+ return do_with(std::move(m), [&s, h] (auto& sm) mutable {
+ mutation_hasher mh(s, *h);
+ return consume(sm, std::move(mh)).then([ h ] {
+ std::array digest;
+ h->finalize(digest);
+ return partition_checksum(digest);
+ });
+ });
+}
+
+future partition_checksum::compute(streamed_mutation m, repair_checksum hash_version)
+{
+ switch (hash_version) {
+ case repair_checksum::legacy: return compute_legacy(std::move(m));
+ case repair_checksum::streamed: return compute_streamed(std::move(m));
+ default: throw std::runtime_error(sprint("Unknown hash version: %d", static_cast(hash_version)));
+ }
}
static inline unaligned& qword(std::array& b, int n) {
@@ -324,24 +353,24 @@ std::ostream& operator<<(std::ostream& out, const partition_checksum& c) {
// data is coming in).
static future checksum_range_shard(database &db,
const sstring& keyspace_name, const sstring& cf_name,
- const ::range& range) {
+ const ::range& range, repair_checksum hash_version) {
auto& cf = db.find_column_family(keyspace_name, cf_name);
- return do_with(query::to_partition_range(range), [&cf] (const auto& partition_range) {
+ return do_with(query::to_partition_range(range), [&cf, hash_version] (const auto& partition_range) {
auto reader = cf.make_reader(cf.schema(),
partition_range,
query::no_clustering_key_filtering,
service::get_local_streaming_read_priority());
return do_with(std::move(reader), partition_checksum(),
- [] (auto& reader, auto& checksum) {
- return repeat([&reader, &checksum] () {
- return reader().then([] (auto sm) {
- return mutation_from_streamed_mutation(std::move(sm));
- }).then([&checksum] (auto mopt) {
+ [hash_version] (auto& reader, auto& checksum) {
+ return repeat([&reader, &checksum, hash_version] () {
+ return reader().then([&checksum, hash_version] (auto mopt) {
if (mopt) {
- checksum.add(partition_checksum(*mopt));
- return stop_iteration::no;
+ return partition_checksum::compute(std::move(*mopt), hash_version).then([&checksum] (auto pc) {
+ checksum.add(pc);
+ return stop_iteration::no;
+ });
} else {
- return stop_iteration::yes;
+ return make_ready_future(stop_iteration::yes);
}
});
}).then([&checksum] {
@@ -364,17 +393,17 @@ static future checksum_range_shard(database &db,
// function is not resolved.
future checksum_range(seastar::sharded &db,
const sstring& keyspace, const sstring& cf,
- const ::range& range) {
+ const ::range& range, repair_checksum hash_version) {
unsigned shard_begin = range.start() ?
dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ?
dht::shard_of(range.end()->value())+1 : smp::count;
- return do_with(partition_checksum(), [shard_begin, shard_end, &db, &keyspace, &cf, &range] (auto& result) {
+ return do_with(partition_checksum(), [shard_begin, shard_end, &db, &keyspace, &cf, &range, hash_version] (auto& result) {
return parallel_for_each(boost::counting_iterator(shard_begin),
boost::counting_iterator(shard_end),
- [&db, &keyspace, &cf, &range, &result] (unsigned shard) {
- return db.invoke_on(shard, [&keyspace, &cf, &range] (database& db) {
- return checksum_range_shard(db, keyspace, cf, range);
+ [&db, &keyspace, &cf, &range, &result, hash_version] (unsigned shard) {
+ return db.invoke_on(shard, [&keyspace, &cf, &range, hash_version] (database& db) {
+ return checksum_range_shard(db, keyspace, cf, range, hash_version);
}).then([&result] (partition_checksum sum) {
result.add(sum);
});
@@ -496,11 +525,11 @@ static future<> repair_cf_range(seastar::sharded& db,
// there are any differences, sync the content of this range.
std::vector> checksums;
checksums.reserve(1 + neighbors.size());
- checksums.push_back(checksum_range(db, keyspace, cf, range));
+ checksums.push_back(checksum_range(db, keyspace, cf, range, repair_checksum::legacy));
for (auto&& neighbor : neighbors) {
checksums.push_back(
net::get_local_messaging_service().send_repair_checksum_range(
- net::msg_addr{neighbor},keyspace, cf, range));
+ net::msg_addr{neighbor},keyspace, cf, range, repair_checksum::legacy));
}
completion.enter();
diff --git a/repair/repair.hh b/repair/repair.hh
index ebcf0678ef..992835b699 100644
--- a/repair/repair.hh
+++ b/repair/repair.hh
@@ -70,6 +70,11 @@ future repair_get_status(seastar::sharded& db, int id);
// stop them abruptly).
future<> repair_shutdown(seastar::sharded& db);
+enum class repair_checksum {
+ legacy = 0,
+ streamed = 1,
+};
+
// The class partition_checksum calculates a 256-bit cryptographically-secure
// checksum of a set of partitions fed to it. The checksum of a partition set
// is calculated by calculating a strong hash function (SHA-256) of each
@@ -81,10 +86,13 @@ future<> repair_shutdown(seastar::sharded& db);
class partition_checksum {
private:
std::array _digest; // 256 bits
+private:
+ static future compute_legacy(streamed_mutation m);
+ static future compute_streamed(streamed_mutation m);
public:
constexpr partition_checksum() : _digest{} { }
explicit partition_checksum(std::array digest) : _digest(std::move(digest)) { }
- partition_checksum(const mutation& m);
+ static future compute(streamed_mutation m, repair_checksum rt);
void add(const partition_checksum& other);
bool operator==(const partition_checksum& other) const;
bool operator!=(const partition_checksum& other) const { return !operator==(other); }
@@ -99,4 +107,4 @@ public:
// not resolved.
future checksum_range(seastar::sharded &db,
const sstring& keyspace, const sstring& cf,
- const ::range& range);
+ const ::range& range, repair_checksum rt);