mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
alternator: Fix sequence number range using wrong format
Fixes #7158 A streams shard descriptions has a sequence range describing start/end (if available) of the shard. This is specified as being "numeric only". Alternator incorrectly used UUID here, which breaks kinesis. v2: * Fix uint128_t parsing from string. bmp::number constructor accepted sstring, but did not interpret it as std::string/chars. Weird results.
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
#include <type_traits>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
#include <boost/io/ios_state.hpp>
|
||||
#include <boost/multiprecision/cpp_int.hpp>
|
||||
|
||||
#include "base64.hh"
|
||||
#include "log.hh"
|
||||
@@ -240,6 +241,38 @@ shard_id::shard_id(const sstring& s) {
|
||||
id = cdc::stream_id(from_hex(s.substr(j + 1)));
|
||||
}
|
||||
|
||||
struct sequence_number {
|
||||
utils::UUID uuid;
|
||||
|
||||
sequence_number(utils::UUID uuid)
|
||||
: uuid(uuid)
|
||||
{}
|
||||
sequence_number(std::string_view);
|
||||
|
||||
operator const utils::UUID&() const {
|
||||
return uuid;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const sequence_number& num) {
|
||||
boost::io::ios_flags_saver fs(os);
|
||||
|
||||
using namespace boost::multiprecision;
|
||||
|
||||
uint128_t hi = uint64_t(num.uuid.get_most_significant_bits());
|
||||
uint128_t lo = uint64_t(num.uuid.get_least_significant_bits());
|
||||
|
||||
return os << std::dec << ((hi << 64) | lo);
|
||||
}
|
||||
};
|
||||
|
||||
sequence_number::sequence_number(std::string_view v)
|
||||
: uuid([&] {
|
||||
using namespace boost::multiprecision;
|
||||
uint128_t tmp{v};
|
||||
return utils::UUID(uint64_t(tmp >> 64), uint64_t(tmp & std::numeric_limits<uint64_t>::max()));
|
||||
}())
|
||||
{}
|
||||
|
||||
}
|
||||
|
||||
template<typename ValueType>
|
||||
@@ -247,6 +280,11 @@ struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_id>
|
||||
: public from_string_helper<ValueType, alternator::shard_id>
|
||||
{};
|
||||
|
||||
template<typename ValueType>
|
||||
struct rapidjson::internal::TypeHelper<ValueType, alternator::sequence_number>
|
||||
: public from_string_helper<ValueType, alternator::sequence_number>
|
||||
{};
|
||||
|
||||
namespace alternator {
|
||||
|
||||
enum class stream_view_type {
|
||||
@@ -438,6 +476,9 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
|
||||
for (; limit > 0 && i != e; prev = i, ++i) {
|
||||
auto& [ts, sv] = *i;
|
||||
|
||||
last = std::nullopt;
|
||||
|
||||
for (auto& id : sv.streams()) {
|
||||
if (shard_start && shard_start->id != id) {
|
||||
continue;
|
||||
@@ -464,9 +505,9 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
rjson::set(shard, "ShardId", *last);
|
||||
|
||||
auto range = rjson::empty_object();
|
||||
rjson::set(range, "StartingSequenceNumber", utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count()));
|
||||
rjson::set(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch().count())));
|
||||
if (sv.expired() && *sv.expired() < threshold) {
|
||||
rjson::set(range, "EndingSequenceNumber", utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count()));
|
||||
rjson::set(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::max_time_UUID((*sv.expired() + confidence_interval(db)).time_since_epoch().count())));
|
||||
}
|
||||
|
||||
rjson::set(shard, "SequenceNumberRange", std::move(range));
|
||||
@@ -476,6 +517,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
break;
|
||||
}
|
||||
|
||||
last = std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -587,7 +629,7 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
|
||||
_stats.api_operations.get_shard_iterator++;
|
||||
|
||||
auto type = rjson::get<shard_iterator_type>(request, "ShardIteratorType");
|
||||
auto seq_num = rjson::get_opt<utils::UUID>(request, "SequenceNumber");
|
||||
auto seq_num = rjson::get_opt<sequence_number>(request, "SequenceNumber");
|
||||
|
||||
if (type < shard_iterator_type::TRIM_HORIZON && !seq_num) {
|
||||
throw api_error::validation("Missing required parameter \"SequenceNumber\"");
|
||||
@@ -807,7 +849,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
describe_single_item(*selection, row, key_names, keys);
|
||||
rjson::set(dynamodb, "Keys", std::move(keys));
|
||||
rjson::set(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
||||
rjson::set(dynamodb, "SequenceNumber", ts);
|
||||
rjson::set(dynamodb, "SequenceNumber", sequence_number(ts));
|
||||
rjson::set(dynamodb, "StreamViewType", type);
|
||||
//TODO: SizeInBytes
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user