From a763bb223f3631e4116463271df6b1d9e955cc6f Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 29 Jul 2020 13:38:59 +0000 Subject: [PATCH 1/3] alternator::streams: Incr shard iterator threshold in get_records Fixes #6942 We use clustering [lo:hi) range for iterator query. To avoid encoding inclusive/exclusive range (depending on init/last get_records call), instead just increment the timeuuid threshold. --- alternator/streams.cc | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index f7109d579d..08291f10a3 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -563,6 +563,19 @@ shard_iterator::shard_iterator(const sstring& s, size_t i) } } +/** + * Increment a timeuuid. The lowest larger timeuud we + * can get (tm) + */ +static utils::UUID increment(const utils::UUID& uuid) { + auto msb = uuid.get_most_significant_bits(); + auto lsb = uuid.get_least_significant_bits() + 1; + if (lsb == 0) { + ++msb; + } + return utils::UUID(msb, lsb); +} + template struct rapidjson::internal::TypeHelper : public from_string_helper @@ -611,7 +624,7 @@ future executor::get_shard_iterator(client_state& threshold = *seq_num; break; case shard_iterator_type::AFTER_SEQUENCE_NUMBER: - threshold = utils::UUID_gen::max_time_UUID(utils::UUID_gen::unix_timestamp(*seq_num)); + threshold = increment(*seq_num); break; case shard_iterator_type::TRIM_HORIZON: threshold = utils::UUID{}; @@ -847,7 +860,8 @@ future executor::get_records(client_state& client if (records.Size() != 0) { auto ret = rjson::empty_object(); rjson::set(ret, "Records", std::move(records)); - shard_iterator next_iter(iter.shard, *timestamp); + // #9642. Set next iterators threshold to > last + shard_iterator next_iter(iter.shard, increment(*timestamp)); rjson::set(ret, "NextShardIterator", next_iter); _stats.api_operations.get_records_latency.add(std::chrono::steady_clock::now() - start_time); return make_ready_future(make_jsonable(std::move(ret))); From f80b465350980f3951397bd17b978f7090bd3d3b Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 29 Jul 2020 13:49:46 +0000 Subject: [PATCH 2/3] alternator::streams: Always include "Records" array in get_records reponse Fixes #6926 Even it empty... --- alternator/streams.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 08291f10a3..4d8de0d0ab 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -857,9 +857,11 @@ future executor::get_records(client_state& client maybe_add_record(); } - if (records.Size() != 0) { - auto ret = rjson::empty_object(); - rjson::set(ret, "Records", std::move(records)); + auto ret = rjson::empty_object(); + auto nrecords = records.Size(); + rjson::set(ret, "Records", std::move(records)); + + if (nrecords != 0) { // #9642. Set next iterators threshold to > last shard_iterator next_iter(iter.shard, increment(*timestamp)); rjson::set(ret, "NextShardIterator", next_iter); @@ -868,8 +870,7 @@ future executor::get_records(client_state& client } // ugh. figure out if we are and end-of-shard - return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time](db_clock::time_point ts) { - auto ret = rjson::empty_object(); + return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable { auto& shard = iter.shard; if (shard.time < ts && ts < high_ts) { From bf63b8f9f4bfb758a74c17365dd5cfbd0c04b5d2 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 3 Aug 2020 07:43:21 +0000 Subject: [PATCH 3/3] alternator::streams: Don't include empty new/old image Fixes #6933 If old (or new) image for a change set is empty, dynamo will not include this key at all. Alternator did return an empty object. This changes it to be excluded on empty. --- alternator/streams.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 4d8de0d0ab..6a1273eb14 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -839,7 +839,9 @@ future executor::get_records(client_state& client { auto item = rjson::empty_object(); describe_single_item(*selection, row, attr_names, item, true); - rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item)); + if (!item.ObjectEmpty()) { + rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item)); + } break; } case cdc::operation::update: