diff --git a/alternator/streams.cc b/alternator/streams.cc index f7109d579d..6a1273eb14 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -563,6 +563,19 @@ shard_iterator::shard_iterator(const sstring& s, size_t i) } } +/** + * Increment a timeuuid. The lowest larger timeuud we + * can get (tm) + */ +static utils::UUID increment(const utils::UUID& uuid) { + auto msb = uuid.get_most_significant_bits(); + auto lsb = uuid.get_least_significant_bits() + 1; + if (lsb == 0) { + ++msb; + } + return utils::UUID(msb, lsb); +} + template struct rapidjson::internal::TypeHelper : public from_string_helper @@ -611,7 +624,7 @@ future executor::get_shard_iterator(client_state& threshold = *seq_num; break; case shard_iterator_type::AFTER_SEQUENCE_NUMBER: - threshold = utils::UUID_gen::max_time_UUID(utils::UUID_gen::unix_timestamp(*seq_num)); + threshold = increment(*seq_num); break; case shard_iterator_type::TRIM_HORIZON: threshold = utils::UUID{}; @@ -826,7 +839,9 @@ future executor::get_records(client_state& client { auto item = rjson::empty_object(); describe_single_item(*selection, row, attr_names, item, true); - rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item)); + if (!item.ObjectEmpty()) { + rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item)); + } break; } case cdc::operation::update: @@ -844,18 +859,20 @@ future executor::get_records(client_state& client maybe_add_record(); } - if (records.Size() != 0) { - auto ret = rjson::empty_object(); - rjson::set(ret, "Records", std::move(records)); - shard_iterator next_iter(iter.shard, *timestamp); + auto ret = rjson::empty_object(); + auto nrecords = records.Size(); + rjson::set(ret, "Records", std::move(records)); + + if (nrecords != 0) { + // #9642. Set next iterators threshold to > last + shard_iterator next_iter(iter.shard, increment(*timestamp)); rjson::set(ret, "NextShardIterator", next_iter); _stats.api_operations.get_records_latency.add(std::chrono::steady_clock::now() - start_time); return make_ready_future(make_jsonable(std::move(ret))); } // ugh. figure out if we are and end-of-shard - return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time](db_clock::time_point ts) { - auto ret = rjson::empty_object(); + return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable { auto& shard = iter.shard; if (shard.time < ts && ts < high_ts) {