repair: remove partition_checksum and related code

80ebedd242 made row-level repair mandatory, so there remain no
callers to partition_checksum. Remove it.

Closes #8537
This commit is contained in:
Avi Kivity
2021-04-22 17:55:18 +03:00
committed by Nadav Har'El
parent c36549b22e
commit 0af7a22c21
7 changed files with 4 additions and 380 deletions

View File

@@ -252,7 +252,7 @@
"UNUSED__STREAM_MUTATION",
"STREAM_MUTATION_DONE",
"COMPLETE_MESSAGE",
"REPAIR_CHECKSUM_RANGE",
"UNUSED__REPAIR_CHECKSUM_RANGE",
"GET_SCHEMA_VERSION"
]
}

View File

@@ -19,15 +19,6 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
enum class repair_checksum : uint8_t {
legacy = 0,
streamed = 1,
};
class partition_checksum {
std::array<uint8_t, 32> digest();
};
class repair_hash {
uint64_t hash;
};

View File

@@ -537,7 +537,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::STREAM_MUTATION_DONE:
case messaging_verb::COMPLETE_MESSAGE:
case messaging_verb::REPLICATION_FINISHED:
case messaging_verb::REPAIR_CHECKSUM_RANGE:
case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE:
case messaging_verb::STREAM_MUTATION_FRAGMENTS:
case messaging_verb::REPAIR_ROW_LEVEL_START:
case messaging_verb::REPAIR_ROW_LEVEL_STOP:
@@ -1287,23 +1287,6 @@ future<> messaging_service::send_replication_finished(msg_addr id, inet_address
return send_message_timeout<void>(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from));
}
// Wrapper for REPAIR_CHECKSUM_RANGE
void messaging_service::register_repair_checksum_range(
std::function<future<partition_checksum> (sstring keyspace,
sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& f) {
register_handler(this, messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(f));
}
future<> messaging_service::unregister_repair_checksum_range() {
return unregister_handler(messaging_verb::REPAIR_CHECKSUM_RANGE);
}
future<partition_checksum> messaging_service::send_repair_checksum_range(
msg_addr id, sstring keyspace, sstring cf, ::dht::token_range range, repair_checksum hash_version)
{
return send_message<partition_checksum>(this,
messaging_verb::REPAIR_CHECKSUM_RANGE, std::move(id),
std::move(keyspace), std::move(cf), std::move(range), hash_version);
}
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void messaging_service::register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func));

View File

@@ -73,7 +73,6 @@ class update_backlog;
class frozen_mutation;
class frozen_schema;
class partition_checksum;
class canonical_mutation;
namespace dht {
@@ -120,7 +119,7 @@ enum class messaging_verb : int32_t {
STREAM_MUTATION_DONE = 18,
COMPLETE_MESSAGE = 19,
// end of streaming verbs
REPAIR_CHECKSUM_RANGE = 20,
UNUSED__REPAIR_CHECKSUM_RANGE = 20,
GET_SCHEMA_VERSION = 21,
SCHEMA_CHECK = 22,
COUNTER_MUTATION = 23,
@@ -348,11 +347,6 @@ public:
future<> send_complete_message(msg_addr id, UUID plan_id, unsigned dst_cpu_id, bool failed = false);
future<> unregister_complete_message();
// Wrapper for REPAIR_CHECKSUM_RANGE verb
void register_repair_checksum_range(std::function<future<partition_checksum> (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version)>&& func);
future<> unregister_repair_checksum_range();
future<partition_checksum> send_repair_checksum_range(msg_addr id, sstring keyspace, sstring cf, dht::token_range range, repair_checksum hash_version);
// Wrapper for REPAIR_GET_FULL_ROW_HASHES
void register_repair_get_full_row_hashes(std::function<future<repair_hash_set> (const rpc::client_info& cinfo, uint32_t repair_meta_id)>&& func);
future<> unregister_repair_get_full_row_hashes();

View File

@@ -496,250 +496,6 @@ void check_in_shutdown() {
repair_tracker().check_in_shutdown();
}
class partition_hasher {
const schema& _schema;
sha256_hasher _hasher;
partition_checksum _checksum;
bound_view::compare _cmp;
range_tombstone_list _rt_list;
bool _inside_range_tombstone = false;
private:
void consume_cell(const column_definition& col, const atomic_cell_or_collection& cell) {
feed_hash(_hasher, col.name());
feed_hash(_hasher, col.type->name());
feed_hash(_hasher, cell, col);
}
void consume_range_tombstone_start(const range_tombstone& rt) noexcept {
feed_hash(_hasher, rt.start, _schema);
feed_hash(_hasher, rt.start_kind);
feed_hash(_hasher, rt.tomb);
}
void consume_range_tombstone_end(const range_tombstone& rt) noexcept {
feed_hash(_hasher, rt.end, _schema);
feed_hash(_hasher, rt.end_kind);
}
void consume_range_tombstones_until(const clustering_row& cr) {
while (!_rt_list.empty()) {
auto it = _rt_list.begin();
if (_inside_range_tombstone) {
if (_cmp(it->end_bound(), cr.key())) {
consume_range_tombstone_end(_rt_list.pop_as<range_tombstone>(it));
_inside_range_tombstone = false;
} else {
break;
}
} else {
if (_cmp(it->start_bound(), cr.key())) {
consume_range_tombstone_start(*it);
_inside_range_tombstone = true;
} else {
break;
}
}
}
}
void consume_range_tombstones_until_end() {
if (_inside_range_tombstone) {
consume_range_tombstone_end(_rt_list.pop_as<range_tombstone>(_rt_list.begin()));
}
for (auto&& rt : _rt_list) {
consume_range_tombstone_start(rt);
consume_range_tombstone_end(rt);
}
_rt_list.clear();
_inside_range_tombstone = false;
}
public:
explicit partition_hasher(const schema& s)
: _schema(s), _cmp(s), _rt_list(s) { }
void consume_new_partition(const dht::decorated_key& dk) {
feed_hash(_hasher, dk.key(), _schema);
}
stop_iteration consume(tombstone t) {
feed_hash(_hasher, t);
return stop_iteration::no;
}
stop_iteration consume(const static_row& sr) {
sr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
auto&& col = _schema.static_column_at(id);
consume_cell(col, cell);
});
return stop_iteration::no;
}
stop_iteration consume(const clustering_row& cr) {
consume_range_tombstones_until(cr);
feed_hash(_hasher, cr.key(), _schema);
feed_hash(_hasher, cr.tomb());
feed_hash(_hasher, cr.marker());
cr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
auto&& col = _schema.regular_column_at(id);
consume_cell(col, cell);
});
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
_rt_list.apply(_schema, std::move(rt));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
consume_range_tombstones_until_end();
std::array<uint8_t, 32> digest = _hasher.finalize_array();
_hasher = { };
_checksum.add(partition_checksum(digest));
return stop_iteration::no;
}
partition_checksum consume_end_of_stream() {
return std::move(_checksum);
}
};
future<partition_checksum> partition_checksum::compute_legacy(flat_mutation_reader mr)
{
auto s = mr.schema();
return do_with(std::move(mr),
partition_checksum(), [] (auto& reader, auto& checksum) {
return repeat([&reader, &checksum] () {
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([&checksum] (auto mopt) {
if (!mopt) {
return stop_iteration::yes;
}
sha256_hasher h;
feed_hash(h, *mopt);
std::array<uint8_t, 32> digest = h.finalize_array();
checksum.add(partition_checksum(digest));
return stop_iteration::no;
});
}).then([&checksum] {
return checksum;
});
});
}
future<partition_checksum> partition_checksum::compute_streamed(flat_mutation_reader m)
{
return do_with(std::move(m), [] (auto& m) {
return m.consume(partition_hasher(*m.schema()), db::no_timeout);
});
}
future<partition_checksum> partition_checksum::compute(flat_mutation_reader m, repair_checksum hash_version)
{
switch (hash_version) {
case repair_checksum::legacy: return compute_legacy(std::move(m));
case repair_checksum::streamed: return compute_streamed(std::move(m));
default: throw std::runtime_error(format("Unknown hash version: {}", static_cast<int>(hash_version)));
}
}
void partition_checksum::add(const partition_checksum& other) {
static_assert(std::tuple_size<decltype(_digest)>::value == 32, "digest size");
// Hopefully the following trickery is faster than XOR'ing 32 separate bytes
for (int i = 0; i < 4; ++i) {
uint64_t a = read_unaligned<uint64_t>(_digest.data() + 8 * i);
uint64_t b = read_unaligned<uint64_t>(other._digest.data() + 8 * i);
write_unaligned(_digest.data() + 8 * i, a ^ b);
}
}
bool partition_checksum::operator==(const partition_checksum& other) const {
static_assert(std::tuple_size<decltype(_digest)>::value == 32, "digest size");
return _digest == other._digest;
}
const std::array<uint8_t, 32>& partition_checksum::digest() const {
return _digest;
}
std::ostream& operator<<(std::ostream& out, const partition_checksum& c) {
auto save_flags = out.flags();
out << std::hex << std::setfill('0');
for (auto b : c._digest) {
out << std::setw(2) << (unsigned int)b;
}
out.flags(save_flags);
return out;
}
// Calculate the checksum of the data held *on this shard* of a column family,
// in the given token range.
// All parameters to this function are constant references, and the caller
// must ensure they live as long as the future returned by this function is
// not resolved.
// FIXME: Both master and slave will typically call this on consecutive ranges
// so it would be useful to have this code cache its stopping point or have
// some object live throughout the operation. Moreover, it makes sense to to
// vary the collection of sstables used throught a long repair.
static future<partition_checksum> checksum_range_shard(database &db,
const sstring& keyspace_name, const sstring& cf_name,
const dht::partition_range_vector& prs, repair_checksum hash_version) {
auto& cf = db.find_column_family(keyspace_name, cf_name);
auto reader = cf.make_streaming_reader(cf.schema(), prs);
return partition_checksum::compute(std::move(reader), hash_version);
}
// It is counter-productive to allow a large number of range checksum
// operations to proceed in parallel (on the same shard), because the read
// operation can already parallelize itself as much as needed, and doing
// multiple reads in parallel just adds a lot of memory overheads.
// So checksum_parallelism_semaphore is used to limit this parallelism,
// and should be set to 1, or another small number.
//
// Note that checksumming_parallelism_semaphore applies not just in the
// repair master, but also in the slave: The repair slave may receive many
// checksum requests in parallel, but will only work on one or a few
// (checksum_parallelism_semaphore) at once.
static thread_local named_semaphore checksum_parallelism_semaphore(2, named_semaphore_exception_factory{"repair checksum parallelism"});
// Calculate the checksum of the data held on all shards of a column family,
// in the given token range.
// In practice, we only need to consider one or two shards which intersect the
// given "range". This is because the token ring has nodes*vnodes tokens,
// dividing the token space into nodes*vnodes ranges, with "range" being one
// of those. This number is big (vnodes = 256 by default). At the same time,
// sharding divides the token space into relatively few large ranges, one per
// thread.
// Watch out: All parameters to this function are constant references, and the
// caller must ensure they live as line as the future returned by this
// function is not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::dht::token_range& range, repair_checksum hash_version) {
auto& schema = db.local().find_column_family(keyspace, cf).schema();
auto shard_ranges = dht::split_range_to_shards(dht::to_partition_range(range), *schema);
return do_with(partition_checksum(), std::move(shard_ranges), [&db, &keyspace, &cf, hash_version] (auto& result, auto& shard_ranges) {
return parallel_for_each(shard_ranges, [&db, &keyspace, &cf, &result, hash_version] (auto& shard_range) {
auto& shard = shard_range.first;
auto& prs = shard_range.second;
return db.invoke_on(shard, [keyspace, cf, prs = std::move(prs), hash_version] (database& db) mutable {
return do_with(std::move(keyspace), std::move(cf), std::move(prs), [&db, hash_version] (auto& keyspace, auto& cf, auto& prs) {
return seastar::with_semaphore(checksum_parallelism_semaphore, 1, [&db, hash_version, &keyspace, &cf, &prs] {
return checksum_range_shard(db, keyspace, cf, prs, hash_version);
});
});
}).then([&result] (partition_checksum sum) {
result.add(sum);
});
}).then([&result] {
return make_ready_future<partition_checksum>(result);
});
});
}
// parallelism_semaphore limits the number of parallel ongoing checksum
// comparisons. This could mean, for example, that this number of checksum
// requests have been sent to other nodes and we are waiting for them to
@@ -2029,13 +1785,6 @@ future<> replace_with_repair(seastar::sharded<database>& db, seastar::sharded<ne
static future<> init_messaging_service_handler(sharded<database>& db, sharded<netw::messaging_service>& messaging) {
_messaging = &messaging;
return messaging.invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db, hv] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range, hv);
});
});
ms.register_node_ops_cmd([] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
@@ -2048,7 +1797,7 @@ static future<> init_messaging_service_handler(sharded<database>& db, sharded<ne
static future<> uninit_messaging_service_handler() {
return _messaging->invoke_on_all([] (auto& ms) {
return when_all_succeed(ms.unregister_repair_checksum_range(), ms.unregister_node_ops_cmd()).discard_result();
return when_all_succeed(ms.unregister_node_ops_cmd()).discard_result();
});
}

View File

@@ -137,40 +137,6 @@ enum class repair_checksum {
streamed = 1,
};
// The class partition_checksum calculates a 256-bit cryptographically-secure
// checksum of a set of partitions fed to it. The checksum of a partition set
// is calculated by calculating a strong hash function (SHA-256) of each
// individual partition, and then XORing the individual hashes together.
// XOR is good enough for merging strong checksums, and allows us to
// independently calculate the checksums of different subsets of the original
// set, and then combine the results into one checksum with the add() method.
// The hash of an individual partition uses both its key and value.
class partition_checksum {
private:
std::array<uint8_t, 32> _digest; // 256 bits
private:
static future<partition_checksum> compute_legacy(flat_mutation_reader m);
static future<partition_checksum> compute_streamed(flat_mutation_reader m);
public:
constexpr partition_checksum() : _digest{} { }
explicit partition_checksum(std::array<uint8_t, 32> digest) : _digest(std::move(digest)) { }
static future<partition_checksum> compute(flat_mutation_reader mr, repair_checksum rt);
void add(const partition_checksum& other);
bool operator==(const partition_checksum& other) const;
bool operator!=(const partition_checksum& other) const { return !operator==(other); }
friend std::ostream& operator<<(std::ostream&, const partition_checksum&);
const std::array<uint8_t, 32>& digest() const;
};
// Calculate the checksum of the data held on all shards of a column family,
// in the given token range.
// All parameters to this function are constant references, and the caller
// must ensure they live as long as the future returned by this function is
// not resolved.
future<partition_checksum> checksum_range(seastar::sharded<database> &db,
const sstring& keyspace, const sstring& cf,
const ::dht::token_range& range, repair_checksum rt);
class repair_stats {
public:
uint64_t round_nr = 0;
@@ -510,14 +476,6 @@ struct node_ops_cmd_response {
};
namespace std {
template<>
struct hash<partition_checksum> {
size_t operator()(partition_checksum sum) const {
size_t h = 0;
std::copy_n(sum.digest().begin(), std::min(sizeof(size_t), sizeof(sum.digest())), reinterpret_cast<uint8_t*>(&h));
return h;
}
};
template<>
struct hash<repair_hash> {

View File

@@ -279,57 +279,6 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
});
}
SEASTAR_TEST_CASE(test_partition_checksum) {
return seastar::async([] {
for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) {
auto get_hash = [] (mutation m) {
return partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), { m }),
repair_checksum::streamed).get0();
};
auto h1 = get_hash(m1);
auto h2 = get_hash(m2);
if (eq) {
if (h1 != h2) {
BOOST_FAIL(format("Hash should be equal for {} and {}", m1, m2));
}
} else {
// We're using a strong hasher, collision should be unlikely
if (h1 == h2) {
BOOST_FAIL(format("Hash should be different for {} and {}", m1, m2));
}
}
});
auto test_random_streams = [] (random_mutation_generator&& gen) {
for (auto i = 0; i < 4; i++) {
auto muts = gen(4);
auto muts2 = muts;
std::vector<partition_checksum> checksum;
while (!muts2.empty()) {
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), muts2),
repair_checksum::streamed).get0();
BOOST_REQUIRE(boost::count(checksum, chk) == 0);
checksum.emplace_back(chk);
muts2.pop_back();
}
std::vector<partition_checksum> individually_computed_checksums(muts.size());
for (auto k = 0u; k < muts.size(); k++) {
auto chk = partition_checksum::compute(flat_mutation_reader_from_mutations(tests::make_permit(), { muts[k] }),
repair_checksum::streamed).get0();
for (auto j = 0u; j < (muts.size() - k); j++) {
individually_computed_checksums[j].add(chk);
}
}
BOOST_REQUIRE_EQUAL(checksum, individually_computed_checksums);
}
};
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
});
}
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
struct dummy_reader_impl : public flat_mutation_reader::impl {
using flat_mutation_reader::impl::impl;