/* * Copyright (C) 2021-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include "db/hints/sync_point.hh" #include "sync_point.hh" #include "idl/hinted_handoff.dist.hh" #include "idl/hinted_handoff.dist.impl.hh" #include "utils/base64.hh" #include "utils/xx_hasher.hh" namespace db { namespace hints { // Sync points can be encoded in two formats: V1 and V2. V2 extends V1 by adding // a checksum. Currently, we use the V2 format, but sync points encoded in the V1 // format still can be safely decoded. // // Format V1 (encoded in base64): // uint8_t 0x01 - version of format // sync_point_v1_or_v2 - encoded using IDL // // Format V2 (encoded in base64): // uint8_t 0x02 - version of format // sync_point_v1_or_v2 - encoded using IDL // uint64_t - checksum computed using the xxHash algorithm // // Format V3 (encoded in base64): // uint8_t 0x03 - version of format // sync_point_v3 - encoded using IDL // uint64_t - checksum computed using the xxHash algorithm // // sync_point_v1_or_v2: // UUID host_id - ID of the host which created the sync point // uint16_t shard_count - the number of shards in this sync point // per_manager_sync_point_v1_or_v2 regular_sp - replay positions for regular mutation hint queues // per_manager_sync_point_v1_or_v2 mv_sp - replay positions for materialized view hint queues // // per_manager_sync_point_v1_or_v2: // std::vector endpoints - addresses for which this sync point defines replay positions // std::vector flattened_rps: // A flattened collection of replay positions for all endpoints and shards. // Replay positions are grouped by address, in the same order as in // the `endpoints` field, and there is one replay position for each of // the shards (shard count is defined by the `shard_count`) field. // Flattened representation was chosen in order to save space on // vector lengths etc. // // sync_point_v3: // similar to sync_point_v1_or_v2 except it uses per_manager_sync_point_v3 instead // of per_manager_sync_point_v1_or_v2, which has locator::host_id instead of // gms::inet_address. static constexpr size_t version_size = sizeof(uint8_t); static constexpr size_t checksum_size = sizeof(uint64_t); template static std::vector decode_one_type(uint16_t shard_count, const PerManagerType& v) { std::vector ret; if (size_t(shard_count) * v.endpoints.size() != v.flattened_rps.size()) { throw std::runtime_error(format("Could not decode the sync point - there should be {} rps in flattened_rps, but there are only {}", size_t(shard_count) * v.endpoints.size(), v.flattened_rps.size())); } ret.resize(std::max(unsigned(shard_count), smp::count)); auto rps_it = v.flattened_rps.begin(); for (const auto ep : v.endpoints) { uint16_t shard; for (shard = 0; shard < shard_count; shard++) { ret[shard].emplace(ep, *rps_it++); } // Fill missing shards with zero replay positions so that segments // which were moved across shards will be correctly waited on for (; shard < smp::count; shard++) { ret[shard].emplace(ep, db::replay_position()); } } return ret; } static uint64_t calculate_checksum(const std::string_view s) { xx_hasher h; h.update(s.data(), s.size()); return h.finalize_uint64(); } sync_point sync_point::decode(std::string_view s) { bytes raw = base64_decode(s); if (raw.empty()) { throw std::runtime_error("Could not decode the sync point - not a valid hex string"); } std::string_view raw_s(reinterpret_cast(raw.data()), raw.size()); seastar::simple_memory_input_stream in{raw_s.data(), raw_s.size()}; uint8_t version = ser::serializer::read(in); if (version == 2 || version == 3) { if (raw_s.size() < version_size + checksum_size) { throw std::runtime_error("Could not decode the sync point encoded in the V2/V3 format - serialized blob is too short"); } seastar::simple_memory_input_stream in_checksum{raw_s.end() - checksum_size, checksum_size}; uint64_t checksum = ser::serializer::read(in_checksum); if (checksum != calculate_checksum(raw_s.substr(0, raw_s.size() - checksum_size))) { throw std::runtime_error("Could not decode the sync point encoded in the V2/V3 format - wrong checksum"); } } else if (version != 1) { throw std::runtime_error(format("Unsupported sync point format version: {}", int(version))); } if (version == 1 || version == 2) { sync_point_v1_or_v2 v = ser::serializer::read(in); return sync_point{ v.host_id, decode_one_type(v.shard_count, v.regular_sp), decode_one_type(v.shard_count, v.mv_sp), }; } // version == 3 sync_point_v3 v3 = ser::serializer::read(in); return sync_point{ v3.host_id, decode_one_type(v3.shard_count, v3.regular_sp), decode_one_type(v3.shard_count, v3.mv_sp), }; } static per_manager_sync_point_v3 encode_one_type_v3(unsigned shards, const std::vector& rps) { per_manager_sync_point_v3 ret; // Gather all endpoints, from all shards std::unordered_set all_eps; for (const auto& shard_rps : rps) { for (const auto& p : shard_rps) { // New sync points are created with host_id only all_eps.insert(std::get(p.first)); } } ret.flattened_rps.reserve(size_t(shards) * all_eps.size()); // Encode into v3 struct // For each endpoint, we encode a replay position for all shards. // If there is no replay position for a shard, we use a zero replay position. for (const auto ep : all_eps) { ret.endpoints.push_back(ep); for (const auto& shard_rps : rps) { auto it = shard_rps.find(ep); if (it != shard_rps.end()) { ret.flattened_rps.push_back(it->second); } else { ret.flattened_rps.push_back(db::replay_position()); } } // Fill with zeros for remaining shards for (unsigned i = rps.size(); i < shards; i++) { ret.flattened_rps.push_back(db::replay_position()); } } return ret; } sstring sync_point::encode() const { // Encode as v3 structure sync_point_v3 v3; v3.host_id = this->host_id; v3.shard_count = std::max(this->regular_per_shard_rps.size(), this->mv_per_shard_rps.size()); v3.regular_sp = encode_one_type_v3(v3.shard_count, this->regular_per_shard_rps); v3.mv_sp = encode_one_type_v3(v3.shard_count, this->mv_per_shard_rps); // Measure how much space we need seastar::measuring_output_stream measure; ser::serializer::write(measure, v3); // Reserve version_size bytes for the version and checksum_size bytes for the checksum bytes serialized{bytes::initialized_later{}, version_size + measure.size() + checksum_size}; // Encode using V3 format seastar::simple_memory_output_stream out{reinterpret_cast(serialized.data()), serialized.size()}; ser::serializer::write(out, 3); ser::serializer::write(out, v3); std::string_view serialized_s(reinterpret_cast(serialized.data()), version_size + measure.size()); uint64_t checksum = calculate_checksum(serialized_s); ser::serializer::write(out, checksum); return base64_encode(serialized); } } }