From a7d021ee57ffc19a11dd17b0630e45750719ca90 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 2 Sep 2020 15:37:44 +0000 Subject: [PATCH] alternator: Fix sequence number range using wrong format Fixes #7158 A streams shard descriptions has a sequence range describing start/end (if available) of the shard. This is specified as being "numeric only". Alternator incorrectly used UUID here, which breaks kinesis. v2: * Fix uint128_t parsing from string. bmp::number constructor accepted sstring, but did not interpret it as std::string/chars. Weird results. --- alternator/streams.cc | 50 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 14fb1f2619..dc9be5f2b7 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include "base64.hh" #include "log.hh" @@ -240,6 +241,38 @@ shard_id::shard_id(const sstring& s) { id = cdc::stream_id(from_hex(s.substr(j + 1))); } +struct sequence_number { + utils::UUID uuid; + + sequence_number(utils::UUID uuid) + : uuid(uuid) + {} + sequence_number(std::string_view); + + operator const utils::UUID&() const { + return uuid; + } + + friend std::ostream& operator<<(std::ostream& os, const sequence_number& num) { + boost::io::ios_flags_saver fs(os); + + using namespace boost::multiprecision; + + uint128_t hi = uint64_t(num.uuid.get_most_significant_bits()); + uint128_t lo = uint64_t(num.uuid.get_least_significant_bits()); + + return os << std::dec << ((hi << 64) | lo); + } +}; + +sequence_number::sequence_number(std::string_view v) + : uuid([&] { + using namespace boost::multiprecision; + uint128_t tmp{v}; + return utils::UUID(uint64_t(tmp >> 64), uint64_t(tmp & std::numeric_limits::max())); + }()) +{} + } template @@ -247,6 +280,11 @@ struct rapidjson::internal::TypeHelper : public from_string_helper {}; +template +struct rapidjson::internal::TypeHelper + : public from_string_helper +{}; + namespace alternator { enum class stream_view_type { @@ -438,6 +476,9 @@ future executor::describe_stream(client_state& cl for (; limit > 0 && i != e; prev = i, ++i) { auto& [ts, sv] = *i; + + last = std::nullopt; + for (auto& id : sv.streams()) { if (shard_start && shard_start->id != id) { continue; @@ -464,9 +505,9 @@ future executor::describe_stream(client_state& cl rjson::set(shard, "ShardId", *last); auto range = rjson::empty_object(); - rjson::set(range, "StartingSequenceNumber", utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count())); + rjson::set(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count()))); if (sv.expired() && *sv.expired() < threshold) { - rjson::set(range, "EndingSequenceNumber", utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count())); + rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count()))); } rjson::set(shard, "SequenceNumberRange", std::move(range)); @@ -476,6 +517,7 @@ future executor::describe_stream(client_state& cl break; } + last = std::nullopt; } } @@ -587,7 +629,7 @@ future executor::get_shard_iterator(client_state& _stats.api_operations.get_shard_iterator++; auto type = rjson::get(request, "ShardIteratorType"); - auto seq_num = rjson::get_opt(request, "SequenceNumber"); + auto seq_num = rjson::get_opt(request, "SequenceNumber"); if (type < shard_iterator_type::TRIM_HORIZON && !seq_num) { throw api_error::validation("Missing required parameter \"SequenceNumber\""); @@ -807,7 +849,7 @@ future executor::get_records(client_state& client describe_single_item(*selection, row, key_names, keys); rjson::set(dynamodb, "Keys", std::move(keys)); rjson::set(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count()); - rjson::set(dynamodb, "SequenceNumber", ts); + rjson::set(dynamodb, "SequenceNumber", sequence_number(ts)); rjson::set(dynamodb, "StreamViewType", type); //TODO: SizeInBytes }