diff --git a/api/api-doc/messaging_service.json b/api/api-doc/messaging_service.json index 7fd7b47c2c..442ebeef2c 100644 --- a/api/api-doc/messaging_service.json +++ b/api/api-doc/messaging_service.json @@ -252,7 +252,7 @@ "UNUSED__STREAM_MUTATION", "STREAM_MUTATION_DONE", "COMPLETE_MESSAGE", - "REPAIR_CHECKSUM_RANGE", + "UNUSED__REPAIR_CHECKSUM_RANGE", "GET_SCHEMA_VERSION" ] } diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 322f65a037..e6d8828fa5 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -19,15 +19,6 @@ * along with Scylla. If not, see . */ -enum class repair_checksum : uint8_t { - legacy = 0, - streamed = 1, -}; - -class partition_checksum { - std::array digest(); -}; - class repair_hash { uint64_t hash; }; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 76cd584e4c..ef023cf2ce 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -537,7 +537,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::STREAM_MUTATION_DONE: case messaging_verb::COMPLETE_MESSAGE: case messaging_verb::REPLICATION_FINISHED: - case messaging_verb::REPAIR_CHECKSUM_RANGE: + case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE: case messaging_verb::STREAM_MUTATION_FRAGMENTS: case messaging_verb::REPAIR_ROW_LEVEL_START: case messaging_verb::REPAIR_ROW_LEVEL_STOP: @@ -1287,23 +1287,6 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address return send_message_timeout(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from)); } -// Wrapper for REPAIR_CHECKSUM_RANGE -void messaging_service::register_repair_checksum_range( - std::function (sstring keyspace, - sstring cf, dht::token_range range, rpc::optional hash_version)>&& f) { - register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f)); -} -future<> messaging_service::unregister_repair_checksum_range() { - return unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE); -} -future messaging_service::send_repair_checksum_range( - msg_addr id, sstring keyspace, sstring cf, ::dht::token_range range, repair_checksum hash_version) -{ - return send_message(this, - messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id), - std::move(keyspace), std::move(cf), std::move(range), hash_version); -} - // Wrapper for REPAIR_GET_FULL_ROW_HASHES void messaging_service::register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) { register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5e8391027c..efc0140057 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -73,7 +73,6 @@ class update_backlog; class frozen_mutation; class frozen_schema; -class partition_checksum; class canonical_mutation; namespace dht { @@ -120,7 +119,7 @@ enum class messaging_verb : int32_t { STREAM_MUTATION_DONE = 18, COMPLETE_MESSAGE = 19, // end of streaming verbs - REPAIR_CHECKSUM_RANGE = 20, + UNUSED__REPAIR_CHECKSUM_RANGE = 20, GET_SCHEMA_VERSION = 21, SCHEMA_CHECK = 22, COUNTER_MUTATION = 23, @@ -348,11 +347,6 @@ public: future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false); future<> unregister_complete_message(); - // Wrapper for REPAIR_CHECKSUM_RANGE verb - void register_repair_checksum_range(std::function (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version)>&& func); - future<> unregister_repair_checksum_range(); - future send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version); - // Wrapper for REPAIR_GET_FULL_ROW_HASHES void register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func); future<> unregister_repair_get_full_row_hashes(); diff --git a/repair/repair.cc b/repair/repair.cc index 6b6eef74dd..e345db74f5 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -496,250 +496,6 @@ void check_in_shutdown() { repair_tracker().check_in_shutdown(); } -class partition_hasher { - const schema& _schema; - sha256_hasher _hasher; - partition_checksum _checksum; - - bound_view::compare _cmp; - range_tombstone_list _rt_list; - bool _inside_range_tombstone = false; -private: - void consume_cell(const column_definition& col, const atomic_cell_or_collection& cell) { - feed_hash(_hasher, col.name()); - feed_hash(_hasher, col.type->name()); - feed_hash(_hasher, cell, col); - } - - void consume_range_tombstone_start(const range_tombstone& rt) noexcept { - feed_hash(_hasher, rt.start, _schema); - feed_hash(_hasher, rt.start_kind); - feed_hash(_hasher, rt.tomb); - } - - void consume_range_tombstone_end(const range_tombstone& rt) noexcept { - feed_hash(_hasher, rt.end, _schema); - feed_hash(_hasher, rt.end_kind); - } - - void consume_range_tombstones_until(const clustering_row& cr) { - while (!_rt_list.empty()) { - auto it = _rt_list.begin(); - if (_inside_range_tombstone) { - if (_cmp(it->end_bound(), cr.key())) { - consume_range_tombstone_end(_rt_list.pop_as(it)); - _inside_range_tombstone = false; - } else { - break; - } - } else { - if (_cmp(it->start_bound(), cr.key())) { - consume_range_tombstone_start(*it); - _inside_range_tombstone = true; - } else { - break; - } - } - } - } - - void consume_range_tombstones_until_end() { - if (_inside_range_tombstone) { - consume_range_tombstone_end(_rt_list.pop_as(_rt_list.begin())); - } - for (auto&& rt : _rt_list) { - consume_range_tombstone_start(rt); - consume_range_tombstone_end(rt); - } - _rt_list.clear(); - _inside_range_tombstone = false; - } -public: - explicit partition_hasher(const schema& s) - : _schema(s), _cmp(s), _rt_list(s) { } - - void consume_new_partition(const dht::decorated_key& dk) { - feed_hash(_hasher, dk.key(), _schema); - } - - stop_iteration consume(tombstone t) { - feed_hash(_hasher, t); - return stop_iteration::no; - } - - stop_iteration consume(const static_row& sr) { - sr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) { - auto&& col = _schema.static_column_at(id); - consume_cell(col, cell); - }); - return stop_iteration::no; - } - - stop_iteration consume(const clustering_row& cr) { - consume_range_tombstones_until(cr); - - feed_hash(_hasher, cr.key(), _schema); - feed_hash(_hasher, cr.tomb()); - feed_hash(_hasher, cr.marker()); - cr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) { - auto&& col = _schema.regular_column_at(id); - consume_cell(col, cell); - }); - return stop_iteration::no; - } - - stop_iteration consume(range_tombstone&& rt) { - _rt_list.apply(_schema, std::move(rt)); - return stop_iteration::no; - } - - stop_iteration consume_end_of_partition() { - consume_range_tombstones_until_end(); - - std::array digest = _hasher.finalize_array(); - _hasher = { }; - - _checksum.add(partition_checksum(digest)); - return stop_iteration::no; - } - - partition_checksum consume_end_of_stream() { - return std::move(_checksum); - } -}; - -future partition_checksum::compute_legacy(flat_mutation_reader mr) -{ - auto s = mr.schema(); - return do_with(std::move(mr), - partition_checksum(), [] (auto& reader, auto& checksum) { - return repeat([&reader, &checksum] () { - return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([&checksum] (auto mopt) { - if (!mopt) { - return stop_iteration::yes; - } - sha256_hasher h; - feed_hash(h, *mopt); - std::array digest = h.finalize_array(); - checksum.add(partition_checksum(digest)); - return stop_iteration::no; - }); - }).then([&checksum] { - return checksum; - }); - }); -} - -future partition_checksum::compute_streamed(flat_mutation_reader m) -{ - return do_with(std::move(m), [] (auto& m) { - return m.consume(partition_hasher(*m.schema()), db::no_timeout); - }); -} - -future partition_checksum::compute(flat_mutation_reader m, repair_checksum hash_version) -{ - switch (hash_version) { - case repair_checksum::legacy: return compute_legacy(std::move(m)); - case repair_checksum::streamed: return compute_streamed(std::move(m)); - default: throw std::runtime_error(format("Unknown hash version: {}", static_cast(hash_version))); - } -} - -void partition_checksum::add(const partition_checksum& other) { - static_assert(std::tuple_size::value == 32, "digest size"); - // Hopefully the following trickery is faster than XOR'ing 32 separate bytes - for (int i = 0; i < 4; ++i) { - uint64_t a = read_unaligned(_digest.data() + 8 * i); - uint64_t b = read_unaligned(other._digest.data() + 8 * i); - write_unaligned(_digest.data() + 8 * i, a ^ b); - } -} - -bool partition_checksum::operator==(const partition_checksum& other) const { - static_assert(std::tuple_size::value == 32, "digest size"); - return _digest == other._digest; -} - -const std::array& partition_checksum::digest() const { - return _digest; -} - -std::ostream& operator<<(std::ostream& out, const partition_checksum& c) { - auto save_flags = out.flags(); - out << std::hex << std::setfill('0'); - for (auto b : c._digest) { - out << std::setw(2) << (unsigned int)b; - } - out.flags(save_flags); - return out; -} - -// Calculate the checksum of the data held *on this shard* of a column family, -// in the given token range. -// All parameters to this function are constant references, and the caller -// must ensure they live as long as the future returned by this function is -// not resolved. -// FIXME: Both master and slave will typically call this on consecutive ranges -// so it would be useful to have this code cache its stopping point or have -// some object live throughout the operation. Moreover, it makes sense to to -// vary the collection of sstables used throught a long repair. -static future checksum_range_shard(database &db, - const sstring& keyspace_name, const sstring& cf_name, - const dht::partition_range_vector& prs, repair_checksum hash_version) { - auto& cf = db.find_column_family(keyspace_name, cf_name); - auto reader = cf.make_streaming_reader(cf.schema(), prs); - return partition_checksum::compute(std::move(reader), hash_version); -} - -// It is counter-productive to allow a large number of range checksum -// operations to proceed in parallel (on the same shard), because the read -// operation can already parallelize itself as much as needed, and doing -// multiple reads in parallel just adds a lot of memory overheads. -// So checksum_parallelism_semaphore is used to limit this parallelism, -// and should be set to 1, or another small number. -// -// Note that checksumming_parallelism_semaphore applies not just in the -// repair master, but also in the slave: The repair slave may receive many -// checksum requests in parallel, but will only work on one or a few -// (checksum_parallelism_semaphore) at once. -static thread_local named_semaphore checksum_parallelism_semaphore(2, named_semaphore_exception_factory{"repair checksum parallelism"}); - -// Calculate the checksum of the data held on all shards of a column family, -// in the given token range. -// In practice, we only need to consider one or two shards which intersect the -// given "range". This is because the token ring has nodes*vnodes tokens, -// dividing the token space into nodes*vnodes ranges, with "range" being one -// of those. This number is big (vnodes = 256 by default). At the same time, -// sharding divides the token space into relatively few large ranges, one per -// thread. -// Watch out: All parameters to this function are constant references, and the -// caller must ensure they live as line as the future returned by this -// function is not resolved. -future checksum_range(seastar::sharded &db, - const sstring& keyspace, const sstring& cf, - const ::dht::token_range& range, repair_checksum hash_version) { - auto& schema = db.local().find_column_family(keyspace, cf).schema(); - auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema); - return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) { - return parallel_for_each(shard_ranges, [&db, &keyspace, &cf, &result, hash_version] (auto& shard_range) { - auto& shard = shard_range.first; - auto& prs = shard_range.second; - return db.invoke_on(shard, [keyspace, cf, prs = std::move(prs), hash_version] (database& db) mutable { - return do_with(std::move(keyspace), std::move(cf), std::move(prs), [&db, hash_version] (auto& keyspace, auto& cf, auto& prs) { - return seastar::with_semaphore(checksum_parallelism_semaphore, 1, [&db, hash_version, &keyspace, &cf, &prs] { - return checksum_range_shard(db, keyspace, cf, prs, hash_version); - }); - }); - }).then([&result] (partition_checksum sum) { - result.add(sum); - }); - }).then([&result] { - return make_ready_future(result); - }); - }); -} - // parallelism_semaphore limits the number of parallel ongoing checksum // comparisons. This could mean, for example, that this number of checksum // requests have been sent to other nodes and we are waiting for them to @@ -2029,13 +1785,6 @@ future<> replace_with_repair(seastar::sharded& db, seastar::sharded init_messaging_service_handler(sharded& db, sharded& messaging) { _messaging = &messaging; return messaging.invoke_on_all([&db] (auto& ms) { - ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional hash_version) { - auto hv = hash_version ? *hash_version : repair_checksum::legacy; - return do_with(std::move(keyspace), std::move(cf), std::move(range), - [&db, hv] (auto& keyspace, auto& cf, auto& range) { - return checksum_range(db, keyspace, cf, range, hv); - }); - }); ms.register_node_ops_cmd([] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto coordinator = cinfo.retrieve_auxiliary("baddr"); @@ -2048,7 +1797,7 @@ static future<> init_messaging_service_handler(sharded& db, sharded uninit_messaging_service_handler() { return _messaging->invoke_on_all([] (auto& ms) { - return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result(); + return when_all_succeed(ms.unregister_node_ops_cmd()).discard_result(); }); } diff --git a/repair/repair.hh b/repair/repair.hh index c2256378ad..e2788aa866 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -137,40 +137,6 @@ enum class repair_checksum { streamed = 1, }; -// The class partition_checksum calculates a 256-bit cryptographically-secure -// checksum of a set of partitions fed to it. The checksum of a partition set -// is calculated by calculating a strong hash function (SHA-256) of each -// individual partition, and then XORing the individual hashes together. -// XOR is good enough for merging strong checksums, and allows us to -// independently calculate the checksums of different subsets of the original -// set, and then combine the results into one checksum with the add() method. -// The hash of an individual partition uses both its key and value. -class partition_checksum { -private: - std::array _digest; // 256 bits -private: - static future compute_legacy(flat_mutation_reader m); - static future compute_streamed(flat_mutation_reader m); -public: - constexpr partition_checksum() : _digest{} { } - explicit partition_checksum(std::array digest) : _digest(std::move(digest)) { } - static future compute(flat_mutation_reader mr, repair_checksum rt); - void add(const partition_checksum& other); - bool operator==(const partition_checksum& other) const; - bool operator!=(const partition_checksum& other) const { return !operator==(other); } - friend std::ostream& operator<<(std::ostream&, const partition_checksum&); - const std::array& digest() const; -}; - -// Calculate the checksum of the data held on all shards of a column family, -// in the given token range. -// All parameters to this function are constant references, and the caller -// must ensure they live as long as the future returned by this function is -// not resolved. -future checksum_range(seastar::sharded &db, - const sstring& keyspace, const sstring& cf, - const ::dht::token_range& range, repair_checksum rt); - class repair_stats { public: uint64_t round_nr = 0; @@ -510,14 +476,6 @@ struct node_ops_cmd_response { }; namespace std { -template<> -struct hash { - size_t operator()(partition_checksum sum) const { - size_t h = 0; - std::copy_n(sum.digest().begin(), std::min(sizeof(size_t), sizeof(sum.digest())), reinterpret_cast(&h)); - return h; - } -}; template<> struct hash { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 3b93c4b293..d17eea2897 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -279,57 +279,6 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) { }); } - -SEASTAR_TEST_CASE(test_partition_checksum) { - return seastar::async([] { - for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) { - auto get_hash = [] (mutation m) { - return partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), { m }), - repair_checksum::streamed).get0(); - }; - auto h1 = get_hash(m1); - auto h2 = get_hash(m2); - if (eq) { - if (h1 != h2) { - BOOST_FAIL(format("Hash should be equal for {} and {}", m1, m2)); - } - } else { - // We're using a strong hasher, collision should be unlikely - if (h1 == h2) { - BOOST_FAIL(format("Hash should be different for {} and {}", m1, m2)); - } - } - }); - - auto test_random_streams = [] (random_mutation_generator&& gen) { - for (auto i = 0; i < 4; i++) { - auto muts = gen(4); - auto muts2 = muts; - std::vector checksum; - while (!muts2.empty()) { - auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), muts2), - repair_checksum::streamed).get0(); - BOOST_REQUIRE(boost::count(checksum, chk) == 0); - checksum.emplace_back(chk); - muts2.pop_back(); - } - std::vector individually_computed_checksums(muts.size()); - for (auto k = 0u; k < muts.size(); k++) { - auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), { muts[k] }), - repair_checksum::streamed).get0(); - for (auto j = 0u; j < (muts.size() - k); j++) { - individually_computed_checksums[j].add(chk); - } - } - BOOST_REQUIRE_EQUAL(checksum, individually_computed_checksums); - } - }; - - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); - }); -} - SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) { struct dummy_reader_impl : public flat_mutation_reader::impl { using flat_mutation_reader::impl::impl;