merge: Alternator streams get_records - fix threshold/record

Merged pull request https://github.com/scylladb/scylla/pull/6969 by
Calle Wilund:

Fixes #6942
Fixes #6926
Fixes #6933

We use clustering [lo:hi) range for iterator query.
To avoid encoding inclusive/exclusive range (depending on
init/last get_records call), instead just increment
the timeuuid threshold.

Also, dynamo result always contains a "records" entry. Include one for us as well.

Also, if old (or new) image for a change set is empty, dynamo will not include
this key at all. Alternator did return an empty object. This changes it to be excluded
on empty.

  alternator::streams: Don't include empty new/old image
  alternator::streams: Always include "Records" array in get_records reponse
  alternator::streams: Incr shard iterator threshold in get_records
This commit is contained in:
Nadav Har'El
2020-08-04 11:11:07 +03:00

View File

@@ -563,6 +563,19 @@ shard_iterator::shard_iterator(const sstring& s, size_t i)
}
}
/**
* Increment a timeuuid. The lowest larger timeuud we
* can get (tm)
*/
static utils::UUID increment(const utils::UUID& uuid) {
auto msb = uuid.get_most_significant_bits();
auto lsb = uuid.get_least_significant_bits() + 1;
if (lsb == 0) {
++msb;
}
return utils::UUID(msb, lsb);
}
template<typename ValueType>
struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_iterator>
: public from_string_helper<ValueType, alternator::shard_iterator>
@@ -611,7 +624,7 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
threshold = *seq_num;
break;
case shard_iterator_type::AFTER_SEQUENCE_NUMBER:
threshold = utils::UUID_gen::max_time_UUID(utils::UUID_gen::unix_timestamp(*seq_num));
threshold = increment(*seq_num);
break;
case shard_iterator_type::TRIM_HORIZON:
threshold = utils::UUID{};
@@ -826,7 +839,9 @@ future<executor::request_return_type> executor::get_records(client_state& client
{
auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, true);
rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
if (!item.ObjectEmpty()) {
rjson::set(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
}
break;
}
case cdc::operation::update:
@@ -844,18 +859,20 @@ future<executor::request_return_type> executor::get_records(client_state& client
maybe_add_record();
}
if (records.Size() != 0) {
auto ret = rjson::empty_object();
rjson::set(ret, "Records", std::move(records));
shard_iterator next_iter(iter.shard, *timestamp);
auto ret = rjson::empty_object();
auto nrecords = records.Size();
rjson::set(ret, "Records", std::move(records));
if (nrecords != 0) {
// #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.shard, increment(*timestamp));
rjson::set(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.add(std::chrono::steady_clock::now() - start_time);
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
}
// ugh. figure out if we are and end-of-shard
return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time](db_clock::time_point ts) {
auto ret = rjson::empty_object();
return cdc::get_local_streams_timestamp().then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable {
auto& shard = iter.shard;
if (shard.time < ts && ts < high_ts) {