Fix an invalid condition, when searching for a parent shard, when table
is based on vnodes. Shards have associated with them `last token` -
token, than marks the end of the range of tokens they consume (inclusive).
An additional assumptions are whole token space is used and
(for vnodes) token space wraps around.
Previously code looked like this:
auto pid = std::upper_bound(..., [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
An `upper_bound` call with `t < id.token()` means it is looking for
an iterator, for which value `t < id.token()` changed to true,
which effectively means a position, where iterator is bigger
then searched value. Then we move iterator backward once if possible.
Assuming token space <-2, 2> and parents [0, 2], when we search for:
- -1 -> we will get 0, it's first, so we can't move backward, so 0 (ok)
- 0 -> we will get 2, it's not first, so we go back and we return 0 (ok)
- 1 -> we will get 2, it's not first, so we go back and we return 0
(not ok - should be 2)
The fix is to replace it with `std::lower_bound` and remove conditional
backward motion. Since we've a guarantees that whole token space is used
if `std::lower_bound` ends with `end()` value, then we have a wrap
around case and we need to pick `begin()` as result.
Fixes #28354
Fixes: SCYLLADB-537
Closes scylladb/scylladb#28382
1144 lines
42 KiB
C++
1144 lines
42 KiB
C++
/*
|
|
* Copyright 2020-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <type_traits>
|
|
#include <boost/lexical_cast.hpp>
|
|
#include <boost/io/ios_state.hpp>
|
|
#include <boost/multiprecision/cpp_int.hpp>
|
|
|
|
#include <seastar/json/formatter.hh>
|
|
|
|
#include "db/config.hh"
|
|
|
|
#include "cdc/log.hh"
|
|
#include "cdc/generation.hh"
|
|
#include "cdc/cdc_options.hh"
|
|
#include "cdc/metadata.hh"
|
|
#include "db/system_distributed_keyspace.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "cql3/selection/selection.hh"
|
|
#include "cql3/result_set.hh"
|
|
#include "cql3/column_identifier.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "gms/feature.hh"
|
|
#include "gms/feature_service.hh"
|
|
|
|
#include "executor.hh"
|
|
#include "data_dictionary/data_dictionary.hh"
|
|
#include "utils/rjson.hh"
|
|
|
|
static logging::logger elogger("alternator-streams");
|
|
|
|
/**
|
|
* Base template type to implement rapidjson::internal::TypeHelper<...>:s
|
|
* for types that are ostreamable/string constructible/castable.
|
|
* Eventually maybe graduate to header, for other modules.
|
|
*/
|
|
template<typename ValueType, typename T>
|
|
struct from_string_helper {
|
|
static bool Is(const ValueType& v) {
|
|
try {
|
|
Get(v);
|
|
return true;
|
|
} catch (...) {
|
|
return false;
|
|
}
|
|
}
|
|
static T Get(const ValueType& v) {
|
|
auto s = rjson::to_string_view(v);
|
|
if constexpr (std::is_constructible_v<T, std::string_view>) {
|
|
return T(s);
|
|
} else if constexpr (std::is_constructible_v<T, sstring>) {
|
|
return T(sstring(s));
|
|
} else {
|
|
return boost::lexical_cast<T>(s.data(), s.size());
|
|
}
|
|
}
|
|
static ValueType& Set(ValueType&, T) = delete;
|
|
static ValueType& Set(ValueType& v, T data, typename ValueType::AllocatorType& a) {
|
|
std::ostringstream ss;
|
|
ss << data;
|
|
auto s = ss.str();
|
|
return v.SetString(s.data(), s.size(), a);
|
|
}
|
|
};
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, utils::UUID>
|
|
: public from_string_helper<ValueType, utils::UUID>
|
|
{};
|
|
|
|
static db_clock::time_point as_timepoint(const table_id& tid) {
|
|
return db_clock::time_point{utils::UUID_gen::unix_timestamp(tid.uuid())};
|
|
}
|
|
|
|
/**
|
|
* Note: scylla tables do not have a timestamp as such,
|
|
* but the UUID (for newly created tables at least) is
|
|
* a timeuuid, so we can use this to fake creation timestamp
|
|
*/
|
|
static sstring stream_label(const schema& log_schema) {
|
|
auto ts = as_timepoint(log_schema.id());
|
|
auto tt = db_clock::to_time_t(ts);
|
|
struct tm tm;
|
|
::localtime_r(&tt, &tm);
|
|
return seastar::json::formatter::to_json(tm);
|
|
}
|
|
|
|
namespace alternator {
|
|
|
|
// stream arn _has_ to be 37 or more characters long. ugh...
|
|
// see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html#API_streams_DescribeStream_RequestSyntax
|
|
// UUID is 36 bytes as string (including dashes).
|
|
// Prepend a version/type marker -> 37
|
|
class stream_arn : public utils::UUID {
|
|
public:
|
|
using UUID = utils::UUID;
|
|
static constexpr char marker = 'S';
|
|
|
|
stream_arn() = default;
|
|
stream_arn(const UUID& uuid)
|
|
: UUID(uuid)
|
|
{}
|
|
stream_arn(const table_id& tid)
|
|
: UUID(tid.uuid())
|
|
{}
|
|
stream_arn(std::string_view v)
|
|
: UUID(v.substr(1))
|
|
{
|
|
if (v[0] != marker) {
|
|
throw std::invalid_argument(std::string(v));
|
|
}
|
|
}
|
|
friend std::ostream& operator<<(std::ostream& os, const stream_arn& arn) {
|
|
const UUID& uuid = arn;
|
|
return os << marker << uuid;
|
|
}
|
|
friend std::istream& operator>>(std::istream& is, stream_arn& arn) {
|
|
std::string s;
|
|
is >> s;
|
|
arn = stream_arn(s);
|
|
return is;
|
|
}
|
|
};
|
|
|
|
} // namespace alternator
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_arn>
|
|
: public from_string_helper<ValueType, alternator::stream_arn>
|
|
{};
|
|
|
|
namespace alternator {
|
|
|
|
future<alternator::executor::request_return_type> alternator::executor::list_streams(client_state& client_state, service_permit permit, rjson::value request) {
|
|
_stats.api_operations.list_streams++;
|
|
|
|
auto limit = rjson::get_opt<int>(request, "Limit").value_or(100);
|
|
auto streams_start = rjson::get_opt<stream_arn>(request, "ExclusiveStartStreamArn");
|
|
auto table = find_table(_proxy, request);
|
|
auto db = _proxy.data_dictionary();
|
|
|
|
if (limit < 1) {
|
|
throw api_error::validation("Limit must be 1 or more");
|
|
}
|
|
|
|
std::vector<data_dictionary::table> cfs;
|
|
|
|
if (table) {
|
|
auto log_name = cdc::log_name(table->cf_name());
|
|
try {
|
|
cfs.emplace_back(db.find_table(table->ks_name(), log_name));
|
|
} catch (data_dictionary::no_such_column_family&) {
|
|
cfs.clear();
|
|
}
|
|
} else {
|
|
cfs = db.get_tables();
|
|
}
|
|
|
|
// # 12601 (maybe?) - sort the set of tables on ID. This should ensure we never
|
|
// generate duplicates in a paged listing here. Can obviously miss things if they
|
|
// are added between paged calls and end up with a "smaller" UUID/ARN, but that
|
|
// is to be expected.
|
|
if (std::cmp_less(limit, cfs.size()) || streams_start) {
|
|
std::sort(cfs.begin(), cfs.end(), [](const data_dictionary::table& t1, const data_dictionary::table& t2) {
|
|
return t1.schema()->id().uuid() < t2.schema()->id().uuid();
|
|
});
|
|
}
|
|
|
|
auto i = cfs.begin();
|
|
auto e = cfs.end();
|
|
|
|
if (streams_start) {
|
|
i = std::find_if(i, e, [&](const data_dictionary::table& t) {
|
|
return t.schema()->id().uuid() == streams_start
|
|
&& cdc::get_base_table(db.real_database(), *t.schema())
|
|
&& is_alternator_keyspace(t.schema()->ks_name())
|
|
;
|
|
});
|
|
if (i != e) {
|
|
++i;
|
|
}
|
|
}
|
|
|
|
auto ret = rjson::empty_object();
|
|
auto streams = rjson::empty_array();
|
|
|
|
std::optional<stream_arn> last;
|
|
|
|
for (;limit > 0 && i != e; ++i) {
|
|
auto s = i->schema();
|
|
auto& ks_name = s->ks_name();
|
|
auto& cf_name = s->cf_name();
|
|
|
|
if (!is_alternator_keyspace(ks_name)) {
|
|
continue;
|
|
}
|
|
if (cdc::is_log_for_some_table(db.real_database(), ks_name, cf_name)) {
|
|
rjson::value new_entry = rjson::empty_object();
|
|
|
|
last = i->schema()->id();
|
|
rjson::add(new_entry, "StreamArn", *last);
|
|
rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s)));
|
|
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(table_name(*s))));
|
|
rjson::push_back(streams, std::move(new_entry));
|
|
|
|
--limit;
|
|
}
|
|
}
|
|
|
|
rjson::add(ret, "Streams", std::move(streams));
|
|
|
|
if (last) {
|
|
rjson::add(ret, "LastEvaluatedStreamArn", *last);
|
|
}
|
|
|
|
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
|
}
|
|
|
|
struct shard_id {
|
|
static constexpr auto marker = 'H';
|
|
|
|
db_clock::time_point time;
|
|
cdc::stream_id id;
|
|
|
|
shard_id(db_clock::time_point t, cdc::stream_id i)
|
|
: time(t)
|
|
, id(i)
|
|
{}
|
|
shard_id(const sstring&);
|
|
|
|
// dynamo specifies shardid as max 65 chars.
|
|
friend std::ostream& operator<<(std::ostream& os, const shard_id& id) {
|
|
fmt::print(os, "{} {:x}:{}", marker, id.time.time_since_epoch().count(), id.id.to_bytes());
|
|
return os;
|
|
}
|
|
};
|
|
|
|
shard_id::shard_id(const sstring& s) {
|
|
auto i = s.find(':');
|
|
|
|
if (s.at(0) != marker || i == sstring::npos) {
|
|
throw std::invalid_argument(s);
|
|
}
|
|
|
|
time = db_clock::time_point(db_clock::duration(std::stoull(s.substr(1, i - 1), nullptr, 16)));
|
|
id = cdc::stream_id(from_hex(s.substr(i + 1)));
|
|
}
|
|
|
|
struct sequence_number {
|
|
utils::UUID uuid;
|
|
|
|
sequence_number(utils::UUID uuid)
|
|
: uuid(uuid)
|
|
{}
|
|
sequence_number(std::string_view);
|
|
|
|
operator const utils::UUID&() const {
|
|
return uuid;
|
|
}
|
|
|
|
friend std::ostream& operator<<(std::ostream& os, const sequence_number& num) {
|
|
boost::io::ios_flags_saver fs(os);
|
|
|
|
using namespace boost::multiprecision;
|
|
|
|
/**
|
|
* #7424 - aws sdk assumes sequence numbers are
|
|
* monotonically growing bigints.
|
|
*
|
|
* Timeuuids viewed as msb<<64|lsb are _not_,
|
|
* but they are still sorted as
|
|
* timestamp() << 64|lsb
|
|
* so we can simply unpack the mangled msb
|
|
* and use as hi 64 in our "bignum".
|
|
*/
|
|
uint128_t hi = uint64_t(num.uuid.timestamp());
|
|
uint128_t lo = uint64_t(num.uuid.get_least_significant_bits());
|
|
|
|
return os << std::dec << ((hi << 64) | lo);
|
|
}
|
|
};
|
|
|
|
sequence_number::sequence_number(std::string_view v)
|
|
: uuid([&] {
|
|
using namespace boost::multiprecision;
|
|
// workaround for weird clang 10 bug when calling constructor with
|
|
// view directly.
|
|
uint128_t tmp{std::string(v)};
|
|
// see above
|
|
return utils::UUID_gen::get_time_UUID_raw(utils::UUID_gen::decimicroseconds{uint64_t(tmp >> 64)},
|
|
uint64_t(tmp & std::numeric_limits<uint64_t>::max()));
|
|
}())
|
|
{}
|
|
|
|
} // namespace alternator
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_id>
|
|
: public from_string_helper<ValueType, alternator::shard_id>
|
|
{};
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::sequence_number>
|
|
: public from_string_helper<ValueType, alternator::sequence_number>
|
|
{};
|
|
|
|
namespace alternator {
|
|
|
|
enum class stream_view_type {
|
|
KEYS_ONLY,
|
|
NEW_IMAGE,
|
|
OLD_IMAGE,
|
|
NEW_AND_OLD_IMAGES,
|
|
};
|
|
|
|
std::ostream& operator<<(std::ostream& os, stream_view_type type) {
|
|
switch (type) {
|
|
case stream_view_type::KEYS_ONLY: os << "KEYS_ONLY"; break;
|
|
case stream_view_type::NEW_IMAGE: os << "NEW_IMAGE"; break;
|
|
case stream_view_type::OLD_IMAGE: os << "OLD_IMAGE"; break;
|
|
case stream_view_type::NEW_AND_OLD_IMAGES: os << "NEW_AND_OLD_IMAGES"; break;
|
|
default: throw std::logic_error(std::to_string(int(type)));
|
|
}
|
|
return os;
|
|
}
|
|
std::istream& operator>>(std::istream& is, stream_view_type& type) {
|
|
sstring s;
|
|
is >> s;
|
|
if (s == "KEYS_ONLY") {
|
|
type = stream_view_type::KEYS_ONLY;
|
|
} else if (s == "NEW_IMAGE") {
|
|
type = stream_view_type::NEW_IMAGE;
|
|
} else if (s == "OLD_IMAGE") {
|
|
type = stream_view_type::OLD_IMAGE;
|
|
} else if (s == "NEW_AND_OLD_IMAGES") {
|
|
type = stream_view_type::NEW_AND_OLD_IMAGES;
|
|
} else {
|
|
throw std::invalid_argument(s);
|
|
}
|
|
return is;
|
|
}
|
|
|
|
static stream_view_type cdc_options_to_steam_view_type(const cdc::options& opts) {
|
|
stream_view_type type = stream_view_type::KEYS_ONLY;
|
|
if (opts.preimage() && opts.postimage()) {
|
|
type = stream_view_type::NEW_AND_OLD_IMAGES;
|
|
} else if (opts.preimage()) {
|
|
type = stream_view_type::OLD_IMAGE;
|
|
} else if (opts.postimage()) {
|
|
type = stream_view_type::NEW_IMAGE;
|
|
}
|
|
return type;
|
|
}
|
|
|
|
} // namespace alternator
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_view_type>
|
|
: public from_string_helper<ValueType, alternator::stream_view_type>
|
|
{};
|
|
|
|
namespace alternator {
|
|
|
|
using namespace std::string_literals;
|
|
|
|
/**
|
|
* This implementation of CDC stream to dynamo shard mapping
|
|
* is simply a 1:1 mapping, id=shard. Simple, and fairly easy
|
|
* to do ranged queries later.
|
|
*
|
|
* Downside is that we get a _lot_ of shards. And with actual
|
|
* generations, we'll get hubba-hubba loads of them.
|
|
*
|
|
* We would like to group cdc streams into N per shards
|
|
* (by token range/ ID set).
|
|
*
|
|
* This however makes it impossible to do a simple
|
|
* range query with limit (we need to do staggered querying, because
|
|
* of how GetRecords work).
|
|
*
|
|
* We can do "paged" queries (i.e. with the get_records result limit and
|
|
* continuing on "next" iterator) across cdc log PK:s (id:s), if we
|
|
* somehow track paging state.
|
|
*
|
|
* This is however problematic because while this can be encoded
|
|
* in shard iterators (long), they _cannot_ be handled in sequence
|
|
* numbers (short), and we need to be able to start a new iterator
|
|
* using just a sequence number as watershed.
|
|
*
|
|
* It also breaks per-shard sort order, where it is assumed that
|
|
* records in a shard are presented "in order".
|
|
*
|
|
* To deal with both problems we would need to do merging of
|
|
* cdc streams. But this becomes difficult with actual cql paging etc,
|
|
* we would potentially have way to many/way to few cql rows for
|
|
* whatever get_records query limit we have. And waste a lot of
|
|
* memory and cycles sorting "junk".
|
|
*
|
|
* For now, go simple, but maybe consider if this latter approach would work
|
|
* and perform.
|
|
*
|
|
* Parents:
|
|
* DynamoDB has the concept of optional "parent shard", where parent
|
|
* is the data set where data for some key range was recorded before
|
|
* sharding was changed.
|
|
*
|
|
* While probably not critical, it is meant as an indicator as to
|
|
* which shards should be read before other shards.
|
|
*
|
|
* In scylla, this is sort of akin to an ID having corresponding ID/ID:s
|
|
* that cover the token range it represents. Because ID:s are per
|
|
* vnode shard however, this relation can be somewhat ambiguous.
|
|
* We still provide some semblance of this by finding the ID in
|
|
* older generation that has token start < current ID token start.
|
|
* This will be a partial overlap, but it is the best we can do.
|
|
*/
|
|
|
|
static std::chrono::seconds confidence_interval(data_dictionary::database db) {
|
|
return std::chrono::seconds(db.get_config().alternator_streams_time_window_s());
|
|
}
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
// Dynamo docs says no data shall live longer than 24h.
|
|
static constexpr auto dynamodb_streams_max_window = 24h;
|
|
|
|
// find the parent shard in previous generation for the given child shard
|
|
// takes care of wrap-around case in vnodes
|
|
// prev_streams must be sorted by token
|
|
const cdc::stream_id& find_parent_shard_in_previous_generation(db_clock::time_point prev_timestamp, const utils::chunked_vector<cdc::stream_id> &prev_streams, const cdc::stream_id &child) {
|
|
if (prev_streams.empty()) {
|
|
// something is really wrong - streams are empty
|
|
// let's try internal_error in hope it will be notified and fixed
|
|
on_internal_error(elogger, fmt::format("streams are empty for cdc generation at {} ({})", prev_timestamp, prev_timestamp.time_since_epoch().count()));
|
|
}
|
|
auto it = std::lower_bound(prev_streams.begin(), prev_streams.end(), child.token(), [](const cdc::stream_id& id, const dht::token& t) {
|
|
return id.token() < t;
|
|
});
|
|
if (it == prev_streams.end()) {
|
|
// wrap around case - take first
|
|
it = prev_streams.begin();
|
|
}
|
|
return *it;
|
|
}
|
|
|
|
future<executor::request_return_type> executor::describe_stream(client_state& client_state, service_permit permit, rjson::value request) {
|
|
_stats.api_operations.describe_stream++;
|
|
|
|
auto limit = rjson::get_opt<int>(request, "Limit").value_or(100); // according to spec
|
|
auto ret = rjson::empty_object();
|
|
auto stream_desc = rjson::empty_object();
|
|
// local java dynamodb server does _not_ validate the arn syntax. But "real" one does.
|
|
// I.e. unparsable arn -> error.
|
|
auto stream_arn = rjson::get<alternator::stream_arn>(request, "StreamArn");
|
|
|
|
schema_ptr schema, bs;
|
|
auto db = _proxy.data_dictionary();
|
|
|
|
try {
|
|
auto cf = db.find_column_family(table_id(stream_arn));
|
|
schema = cf.schema();
|
|
bs = cdc::get_base_table(db.real_database(), *schema);
|
|
} catch (...) {
|
|
}
|
|
|
|
if (!schema || !bs || !is_alternator_keyspace(schema->ks_name())) {
|
|
throw api_error::resource_not_found("Invalid StreamArn");
|
|
}
|
|
|
|
if (limit < 1) {
|
|
throw api_error::validation("Limit must be 1 or more");
|
|
}
|
|
|
|
std::optional<shard_id> shard_start;
|
|
try {
|
|
shard_start = rjson::get_opt<shard_id>(request, "ExclusiveStartShardId");
|
|
} catch (...) {
|
|
// If we cannot parse the start shard, it is treated as non-matching
|
|
// in dynamo. We can just shortcut this and say "no records", i.e limit=0
|
|
limit = 0;
|
|
}
|
|
|
|
auto& opts = bs->cdc_options();
|
|
|
|
auto status = "DISABLED";
|
|
|
|
if (opts.enabled()) {
|
|
if (!_cdc_metadata.streams_available()) {
|
|
status = "ENABLING";
|
|
} else {
|
|
status = "ENABLED";
|
|
}
|
|
}
|
|
|
|
auto ttl = std::chrono::seconds(opts.ttl());
|
|
|
|
rjson::add(stream_desc, "StreamStatus", rjson::from_string(status));
|
|
|
|
stream_view_type type = cdc_options_to_steam_view_type(opts);
|
|
|
|
rjson::add(stream_desc, "StreamArn", alternator::stream_arn(schema->id()));
|
|
rjson::add(stream_desc, "StreamViewType", type);
|
|
rjson::add(stream_desc, "TableName", rjson::from_string(table_name(*bs)));
|
|
|
|
describe_key_schema(stream_desc, *bs);
|
|
|
|
if (!opts.enabled()) {
|
|
rjson::add(ret, "StreamDescription", std::move(stream_desc));
|
|
co_return rjson::print(std::move(ret));
|
|
}
|
|
|
|
// TODO: label
|
|
// TODO: creation time
|
|
|
|
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
|
|
|
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
|
|
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
|
|
|
|
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
|
|
auto e = topologies.end();
|
|
auto prev = e;
|
|
auto shards = rjson::empty_array();
|
|
|
|
std::optional<shard_id> last;
|
|
|
|
auto i = topologies.begin();
|
|
// if we're a paged query, skip to the generation where we left of.
|
|
if (shard_start) {
|
|
i = topologies.find(shard_start->time);
|
|
}
|
|
|
|
// for parent-child stuff we need id:s to be sorted by token
|
|
// (see explanation above) since we want to find closest
|
|
// token boundary when determining parent.
|
|
// #7346 - we processed and searched children/parents in
|
|
// stored order, which is not necessarily token order,
|
|
// so the finding of "closest" token boundary (using upper bound)
|
|
// could give somewhat weird results.
|
|
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
|
return id1.token() < id2.token();
|
|
};
|
|
|
|
// #7409 - shards must be returned in lexicographical order,
|
|
// normal bytes compare is string_traits<int8_t>::compare.
|
|
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
|
|
// compare instead we inadvertently will sort in string lexical.
|
|
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
|
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
|
|
};
|
|
|
|
// need a prev even if we are skipping stuff
|
|
if (i != topologies.begin()) {
|
|
prev = std::prev(i);
|
|
}
|
|
|
|
for (; limit > 0 && i != e; prev = i, ++i) {
|
|
auto& [ts, sv] = *i;
|
|
|
|
last = std::nullopt;
|
|
|
|
auto lo = sv.streams.begin();
|
|
auto end = sv.streams.end();
|
|
|
|
// #7409 - shards must be returned in lexicographical order,
|
|
std::sort(lo, end, id_cmp);
|
|
|
|
if (shard_start) {
|
|
// find next shard position
|
|
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
|
|
shard_start = std::nullopt;
|
|
}
|
|
|
|
if (lo != end && prev != e) {
|
|
// We want older stuff sorted in token order so we can find matching
|
|
// token range when determining parent shard.
|
|
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
|
|
}
|
|
|
|
auto expired = [&]() -> std::optional<db_clock::time_point> {
|
|
auto j = std::next(i);
|
|
if (j == e) {
|
|
return std::nullopt;
|
|
}
|
|
// add this so we sort of match potential
|
|
// sequence numbers in get_records result.
|
|
return j->first + confidence_interval(db);
|
|
}();
|
|
|
|
while (lo != end) {
|
|
auto& id = *lo++;
|
|
|
|
auto shard = rjson::empty_object();
|
|
|
|
if (prev != e) {
|
|
auto &pid = find_parent_shard_in_previous_generation(prev->first, prev->second.streams, id);
|
|
rjson::add(shard, "ParentShardId", shard_id(prev->first, pid));
|
|
}
|
|
|
|
last.emplace(ts, id);
|
|
rjson::add(shard, "ShardId", *last);
|
|
auto range = rjson::empty_object();
|
|
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
|
|
if (expired) {
|
|
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
|
|
}
|
|
|
|
rjson::add(shard, "SequenceNumberRange", std::move(range));
|
|
rjson::push_back(shards, std::move(shard));
|
|
|
|
if (--limit == 0) {
|
|
break;
|
|
}
|
|
|
|
last = std::nullopt;
|
|
}
|
|
}
|
|
|
|
if (last) {
|
|
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
|
|
}
|
|
|
|
rjson::add(stream_desc, "Shards", std::move(shards));
|
|
rjson::add(ret, "StreamDescription", std::move(stream_desc));
|
|
|
|
co_return rjson::print(std::move(ret));
|
|
}
|
|
|
|
enum class shard_iterator_type {
|
|
AT_SEQUENCE_NUMBER,
|
|
AFTER_SEQUENCE_NUMBER,
|
|
TRIM_HORIZON,
|
|
LATEST,
|
|
};
|
|
|
|
std::ostream& operator<<(std::ostream& os, shard_iterator_type type) {
|
|
switch (type) {
|
|
case shard_iterator_type::AT_SEQUENCE_NUMBER: os << "AT_SEQUENCE_NUMBER"; break;
|
|
case shard_iterator_type::AFTER_SEQUENCE_NUMBER: os << "AFTER_SEQUENCE_NUMBER"; break;
|
|
case shard_iterator_type::TRIM_HORIZON: os << "TRIM_HORIZON"; break;
|
|
case shard_iterator_type::LATEST: os << "LATEST"; break;
|
|
default: throw std::logic_error(std::to_string(int(type)));
|
|
}
|
|
return os;
|
|
}
|
|
std::istream& operator>>(std::istream& is, shard_iterator_type& type) {
|
|
sstring s;
|
|
is >> s;
|
|
if (s == "AT_SEQUENCE_NUMBER") {
|
|
type = shard_iterator_type::AT_SEQUENCE_NUMBER;
|
|
} else if (s == "AFTER_SEQUENCE_NUMBER") {
|
|
type = shard_iterator_type::AFTER_SEQUENCE_NUMBER;
|
|
} else if (s == "TRIM_HORIZON") {
|
|
type = shard_iterator_type::TRIM_HORIZON;
|
|
} else if (s == "LATEST") {
|
|
type = shard_iterator_type::LATEST;
|
|
} else {
|
|
throw std::invalid_argument(s);
|
|
}
|
|
return is;
|
|
}
|
|
|
|
struct shard_iterator {
|
|
// A stream shard iterator can request either >=threshold
|
|
// (inclusive of threshold) or >threshold (exclusive).
|
|
static auto constexpr inclusive_marker = 'I';
|
|
static auto constexpr exclusive_marker = 'i';
|
|
|
|
static auto constexpr uuid_str_length = 36;
|
|
|
|
utils::UUID table;
|
|
utils::UUID threshold;
|
|
shard_id shard;
|
|
bool inclusive;
|
|
|
|
shard_iterator(const sstring&);
|
|
|
|
shard_iterator(utils::UUID t, shard_id i, utils::UUID th, bool inclusive)
|
|
: table(t)
|
|
, threshold(th)
|
|
, shard(i)
|
|
, inclusive(inclusive)
|
|
{}
|
|
friend std::ostream& operator<<(std::ostream& os, const shard_iterator& id) {
|
|
return os << (id.inclusive ? inclusive_marker : exclusive_marker) << std::hex
|
|
<< id.table
|
|
<< ':' << id.threshold
|
|
<< ':' << id.shard
|
|
;
|
|
}
|
|
};
|
|
|
|
shard_iterator::shard_iterator(const sstring& s)
|
|
: table(s.substr(1, uuid_str_length))
|
|
, threshold(s.substr(2 + uuid_str_length, uuid_str_length))
|
|
, shard(s.substr(3 + 2 * uuid_str_length))
|
|
, inclusive(s.at(0) == inclusive_marker)
|
|
{
|
|
if (s.at(0) != inclusive_marker && s.at(0) != exclusive_marker) {
|
|
throw std::invalid_argument(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_iterator>
|
|
: public from_string_helper<ValueType, alternator::shard_iterator>
|
|
{};
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_iterator_type>
|
|
: public from_string_helper<ValueType, alternator::shard_iterator_type>
|
|
{};
|
|
|
|
namespace alternator {
|
|
|
|
future<executor::request_return_type> executor::get_shard_iterator(client_state& client_state, service_permit permit, rjson::value request) {
|
|
_stats.api_operations.get_shard_iterator++;
|
|
|
|
auto type = rjson::get<shard_iterator_type>(request, "ShardIteratorType");
|
|
auto seq_num = rjson::get_opt<sequence_number>(request, "SequenceNumber");
|
|
|
|
if (type < shard_iterator_type::TRIM_HORIZON && !seq_num) {
|
|
throw api_error::validation("Missing required parameter \"SequenceNumber\"");
|
|
}
|
|
if (type >= shard_iterator_type::TRIM_HORIZON && seq_num) {
|
|
throw api_error::validation("Iterator of this type should not have sequence number");
|
|
}
|
|
|
|
auto stream_arn = rjson::get<alternator::stream_arn>(request, "StreamArn");
|
|
auto db = _proxy.data_dictionary();
|
|
|
|
schema_ptr schema = nullptr;
|
|
std::optional<shard_id> sid;
|
|
|
|
try {
|
|
auto cf = db.find_column_family(table_id(stream_arn));
|
|
schema = cf.schema();
|
|
sid = rjson::get<shard_id>(request, "ShardId");
|
|
} catch (...) {
|
|
}
|
|
if (!schema || !cdc::get_base_table(db.real_database(), *schema) || !is_alternator_keyspace(schema->ks_name())) {
|
|
throw api_error::resource_not_found("Invalid StreamArn");
|
|
}
|
|
if (!sid) {
|
|
throw api_error::resource_not_found("Invalid ShardId");
|
|
}
|
|
|
|
utils::UUID threshold;
|
|
bool inclusive_of_threshold;
|
|
|
|
switch (type) {
|
|
case shard_iterator_type::AT_SEQUENCE_NUMBER:
|
|
threshold = *seq_num;
|
|
inclusive_of_threshold = true;
|
|
break;
|
|
case shard_iterator_type::AFTER_SEQUENCE_NUMBER:
|
|
threshold = *seq_num;
|
|
inclusive_of_threshold = false;
|
|
break;
|
|
case shard_iterator_type::TRIM_HORIZON:
|
|
// zero UUID - lowest possible timestamp. Include all
|
|
// data not ttl:ed away.
|
|
threshold = utils::UUID{};
|
|
inclusive_of_threshold = true;
|
|
break;
|
|
case shard_iterator_type::LATEST:
|
|
threshold = utils::UUID_gen::min_time_UUID((db_clock::now() - confidence_interval(db)).time_since_epoch());
|
|
inclusive_of_threshold = true;
|
|
break;
|
|
}
|
|
|
|
shard_iterator iter(stream_arn, *sid, threshold, inclusive_of_threshold);
|
|
|
|
auto ret = rjson::empty_object();
|
|
rjson::add(ret, "ShardIterator", iter);
|
|
|
|
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
|
}
|
|
|
|
struct event_id {
|
|
cdc::stream_id stream;
|
|
utils::UUID timestamp;
|
|
|
|
static constexpr auto marker = 'E';
|
|
|
|
event_id(cdc::stream_id s, utils::UUID ts)
|
|
: stream(s)
|
|
, timestamp(ts)
|
|
{}
|
|
|
|
friend std::ostream& operator<<(std::ostream& os, const event_id& id) {
|
|
fmt::print(os, "{}{}:{}", marker, id.stream.to_bytes(), id.timestamp);
|
|
return os;
|
|
}
|
|
};
|
|
} // namespace alternator
|
|
|
|
template<typename ValueType>
|
|
struct rapidjson::internal::TypeHelper<ValueType, alternator::event_id>
|
|
: public from_string_helper<ValueType, alternator::event_id>
|
|
{};
|
|
|
|
namespace alternator {
|
|
|
|
future<executor::request_return_type> executor::get_records(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
|
_stats.api_operations.get_records++;
|
|
auto start_time = std::chrono::steady_clock::now();
|
|
|
|
auto iter = rjson::get<shard_iterator>(request, "ShardIterator");
|
|
auto limit = rjson::get_opt<size_t>(request, "Limit").value_or(1000);
|
|
|
|
if (limit < 1) {
|
|
throw api_error::validation("Limit must be 1 or more");
|
|
}
|
|
if (limit > 1000) {
|
|
throw api_error::validation("Limit must be less than or equal to 1000");
|
|
}
|
|
|
|
auto db = _proxy.data_dictionary();
|
|
schema_ptr schema, base;
|
|
try {
|
|
auto log_table = db.find_column_family(table_id(iter.table));
|
|
schema = log_table.schema();
|
|
base = cdc::get_base_table(db.real_database(), *schema);
|
|
} catch (...) {
|
|
}
|
|
|
|
if (!schema || !base || !is_alternator_keyspace(schema->ks_name())) {
|
|
co_return api_error::resource_not_found(fmt::to_string(iter.table));
|
|
}
|
|
|
|
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
|
|
|
|
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
|
|
|
|
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
|
|
partition_key pk = iter.shard.id.to_partition_key(*schema);
|
|
|
|
dht::partition_range_vector partition_ranges{ dht::partition_range::make_singular(dht::decorate_key(*schema, pk)) };
|
|
|
|
auto high_ts = db_clock::now() - confidence_interval(db);
|
|
auto high_uuid = utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch());
|
|
auto lo = clustering_key_prefix::from_exploded(*schema, { iter.threshold.serialize() });
|
|
auto hi = clustering_key_prefix::from_exploded(*schema, { high_uuid.serialize() });
|
|
|
|
std::vector<query::clustering_range> bounds;
|
|
using bound = typename query::clustering_range::bound;
|
|
bounds.push_back(query::clustering_range::make(bound(lo, iter.inclusive), bound(hi, false)));
|
|
|
|
static const bytes timestamp_column_name = cdc::log_meta_column_name_bytes("time");
|
|
static const bytes op_column_name = cdc::log_meta_column_name_bytes("operation");
|
|
static const bytes eor_column_name = cdc::log_meta_column_name_bytes("end_of_batch");
|
|
|
|
std::optional<attrs_to_get> key_names =
|
|
base->primary_key_columns()
|
|
| std::views::transform([&] (const column_definition& cdef) {
|
|
return std::make_pair<std::string, attrs_to_get_node>(cdef.name_as_text(), {}); })
|
|
| std::ranges::to<attrs_to_get>()
|
|
;
|
|
// Include all base table columns as values (in case pre or post is enabled).
|
|
// This will include attributes not stored in the frozen map column
|
|
std::optional<attrs_to_get> attr_names = base->regular_columns()
|
|
// this will include the :attrs column, which we will also force evaluating.
|
|
// But not having this set empty forces out any cdc columns from actual result
|
|
| std::views::transform([] (const column_definition& cdef) {
|
|
return std::make_pair<std::string, attrs_to_get_node>(cdef.name_as_text(), {}); })
|
|
| std::ranges::to<attrs_to_get>()
|
|
;
|
|
|
|
std::vector<const column_definition*> columns;
|
|
columns.reserve(schema->all_columns().size());
|
|
|
|
auto pks = schema->partition_key_columns();
|
|
auto cks = schema->clustering_key_columns();
|
|
|
|
std::transform(pks.begin(), pks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
|
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
|
auto regular_column_start_idx = columns.size();
|
|
auto regular_column_filter = std::views::filter([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); });
|
|
std::ranges::transform(schema->regular_columns() | regular_column_filter, std::back_inserter(columns), [](auto& c) { return &c; });
|
|
|
|
auto regular_columns = std::ranges::subrange(columns.begin() + regular_column_start_idx, columns.end())
|
|
| std::views::transform(&column_definition::id)
|
|
| std::ranges::to<query::column_id_vector>()
|
|
;
|
|
|
|
stream_view_type type = cdc_options_to_steam_view_type(base->cdc_options());
|
|
|
|
auto selection = cql3::selection::selection::for_columns(schema, std::move(columns));
|
|
auto partition_slice = query::partition_slice(
|
|
std::move(bounds)
|
|
, {}, std::move(regular_columns), selection->get_query_options());
|
|
|
|
auto& opts = base->cdc_options();
|
|
auto mul = 2; // key-only, allow for delete + insert
|
|
if (opts.preimage()) {
|
|
++mul;
|
|
}
|
|
if (opts.postimage()) {
|
|
++mul;
|
|
}
|
|
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
|
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
|
|
|
|
service::storage_proxy::coordinator_query_result qr = co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state));
|
|
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
|
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
|
|
|
auto result_set = builder.build();
|
|
auto records = rjson::empty_array();
|
|
|
|
auto& metadata = result_set->get_metadata();
|
|
|
|
auto op_index = std::distance(metadata.get_names().begin(),
|
|
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
|
return cdef->name->name() == op_column_name;
|
|
})
|
|
);
|
|
auto ts_index = std::distance(metadata.get_names().begin(),
|
|
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
|
return cdef->name->name() == timestamp_column_name;
|
|
})
|
|
);
|
|
auto eor_index = std::distance(metadata.get_names().begin(),
|
|
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
|
return cdef->name->name() == eor_column_name;
|
|
})
|
|
);
|
|
|
|
std::optional<utils::UUID> timestamp;
|
|
auto dynamodb = rjson::empty_object();
|
|
auto record = rjson::empty_object();
|
|
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
|
|
|
|
using op_utype = std::underlying_type_t<cdc::operation>;
|
|
|
|
auto maybe_add_record = [&] {
|
|
if (!dynamodb.ObjectEmpty()) {
|
|
rjson::add(record, "dynamodb", std::move(dynamodb));
|
|
dynamodb = rjson::empty_object();
|
|
}
|
|
if (!record.ObjectEmpty()) {
|
|
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
|
|
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
|
|
rjson::add(record, "eventSource", "scylladb:alternator");
|
|
rjson::add(record, "eventVersion", "1.1");
|
|
rjson::push_back(records, std::move(record));
|
|
record = rjson::empty_object();
|
|
--limit;
|
|
}
|
|
};
|
|
|
|
for (auto& row : result_set->rows()) {
|
|
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
|
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
|
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
|
|
|
if (!dynamodb.HasMember("Keys")) {
|
|
auto keys = rjson::empty_object();
|
|
describe_single_item(*selection, row, key_names, keys);
|
|
rjson::add(dynamodb, "Keys", std::move(keys));
|
|
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
|
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
|
|
rjson::add(dynamodb, "StreamViewType", type);
|
|
// TODO: SizeBytes
|
|
}
|
|
|
|
/**
|
|
* We merge rows with same timestamp into a single event.
|
|
* This is pretty much needed, because a CDC row typically
|
|
* encodes ~half the info of an alternator write.
|
|
*
|
|
* A big, big downside to how alternator records are written
|
|
* (i.e. CQL), is that the distinction between INSERT and UPDATE
|
|
* is somewhat lost/unmappable to actual eventName.
|
|
* A write (currently) always looks like an insert+modify
|
|
* regardless whether we wrote existing record or not.
|
|
*
|
|
* Maybe RMW ops could be done slightly differently so
|
|
* we can distinguish them here...
|
|
*
|
|
* For now, all writes will become MODIFY.
|
|
*
|
|
* Note: we do not check the current pre/post
|
|
* flags on CDC log, instead we use data to
|
|
* drive what is returned. This is (afaict)
|
|
* consistent with dynamo streams
|
|
*/
|
|
switch (op) {
|
|
case cdc::operation::pre_image:
|
|
case cdc::operation::post_image:
|
|
{
|
|
auto item = rjson::empty_object();
|
|
describe_single_item(*selection, row, attr_names, item, nullptr, true);
|
|
describe_single_item(*selection, row, key_names, item);
|
|
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
|
|
break;
|
|
}
|
|
case cdc::operation::update:
|
|
rjson::add(record, "eventName", "MODIFY");
|
|
break;
|
|
case cdc::operation::insert:
|
|
rjson::add(record, "eventName", "INSERT");
|
|
break;
|
|
case cdc::operation::service_row_delete:
|
|
case cdc::operation::service_partition_delete:
|
|
{
|
|
auto user_identity = rjson::empty_object();
|
|
rjson::add(user_identity, "Type", "Service");
|
|
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
|
|
rjson::add(record, "userIdentity", std::move(user_identity));
|
|
rjson::add(record, "eventName", "REMOVE");
|
|
break;
|
|
}
|
|
default:
|
|
rjson::add(record, "eventName", "REMOVE");
|
|
break;
|
|
}
|
|
if (eor) {
|
|
maybe_add_record();
|
|
timestamp = ts;
|
|
if (limit == 0) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
auto ret = rjson::empty_object();
|
|
auto nrecords = records.Size();
|
|
rjson::add(ret, "Records", std::move(records));
|
|
|
|
if (nrecords != 0) {
|
|
// #9642. Set next iterators threshold to > last
|
|
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
|
|
// Note that here we unconditionally return NextShardIterator,
|
|
// without checking if maybe we reached the end-of-shard. If the
|
|
// shard did end, then the next read will have nrecords == 0 and
|
|
// will notice end end of shard and not return NextShardIterator.
|
|
rjson::add(ret, "NextShardIterator", next_iter);
|
|
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
|
co_return rjson::print(std::move(ret));
|
|
}
|
|
|
|
// ugh. figure out if we are and end-of-shard
|
|
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
|
|
|
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
|
|
auto& shard = iter.shard;
|
|
|
|
if (shard.time < ts && ts < high_ts) {
|
|
// The DynamoDB documentation states that when a shard is
|
|
// closed, reading it until the end has NextShardIterator
|
|
// "set to null". Our test test_streams_closed_read
|
|
// confirms that by "null" they meant not set at all.
|
|
} else {
|
|
// We could have return the same iterator again, but we did
|
|
// a search from it until high_ts and found nothing, so we
|
|
// can also start the next search from high_ts.
|
|
// TODO: but why? It's simpler just to leave the iterator be.
|
|
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
|
|
rjson::add(ret, "NextShardIterator", iter);
|
|
}
|
|
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
|
if (is_big(ret)) {
|
|
co_return make_streamed(std::move(ret));
|
|
}
|
|
co_return rjson::print(std::move(ret));
|
|
}
|
|
|
|
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
|
|
auto stream_enabled = rjson::find(stream_specification, "StreamEnabled");
|
|
if (!stream_enabled || !stream_enabled->IsBool()) {
|
|
throw api_error::validation("StreamSpecification needs boolean StreamEnabled");
|
|
}
|
|
|
|
if (stream_enabled->GetBool()) {
|
|
if (!sp.features().alternator_streams) {
|
|
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
|
|
}
|
|
|
|
cdc::options opts;
|
|
opts.enabled(true);
|
|
opts.set_delta_mode(cdc::delta_mode::keys);
|
|
opts.ttl(std::chrono::duration_cast<std::chrono::seconds>(dynamodb_streams_max_window).count());
|
|
|
|
auto type = rjson::get_opt<stream_view_type>(stream_specification, "StreamViewType").value_or(stream_view_type::KEYS_ONLY);
|
|
switch (type) {
|
|
default:
|
|
break;
|
|
case stream_view_type::NEW_AND_OLD_IMAGES:
|
|
opts.postimage(true);
|
|
opts.preimage(cdc::image_mode::full);
|
|
break;
|
|
case stream_view_type::OLD_IMAGE:
|
|
opts.preimage(cdc::image_mode::full);
|
|
break;
|
|
case stream_view_type::NEW_IMAGE:
|
|
opts.postimage(true);
|
|
break;
|
|
}
|
|
builder.with_cdc_options(opts);
|
|
return true;
|
|
} else {
|
|
cdc::options opts;
|
|
opts.enabled(false);
|
|
builder.with_cdc_options(opts);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp) {
|
|
auto& opts = schema.cdc_options();
|
|
if (opts.enabled()) {
|
|
auto db = sp.data_dictionary();
|
|
auto cf = db.find_table(schema.ks_name(), cdc::log_name(schema.cf_name()));
|
|
stream_arn arn(cf.schema()->id());
|
|
rjson::add(descr, "LatestStreamArn", arn);
|
|
rjson::add(descr, "LatestStreamLabel", rjson::from_string(stream_label(*cf.schema())));
|
|
|
|
auto stream_desc = rjson::empty_object();
|
|
rjson::add(stream_desc, "StreamEnabled", true);
|
|
|
|
auto mode = stream_view_type::KEYS_ONLY;
|
|
if (opts.preimage() && opts.postimage()) {
|
|
mode = stream_view_type::NEW_AND_OLD_IMAGES;
|
|
} else if (opts.preimage()) {
|
|
mode = stream_view_type::OLD_IMAGE;
|
|
} else if (opts.postimage()) {
|
|
mode = stream_view_type::NEW_IMAGE;
|
|
}
|
|
rjson::add(stream_desc, "StreamViewType", mode);
|
|
rjson::add(descr, "StreamSpecification", std::move(stream_desc));
|
|
}
|
|
}
|
|
|
|
} // namespace alternator
|