From f7bb0baba7fc917f139a85a2a32997e61eb0fccd Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 7 Sep 2020 14:26:21 +0000 Subject: [PATCH] alternator: Include stream spec in desc for create/update/describe Fixes #7163 If enabled, the resulting table description should include a StreamDescription object with the appropriate members describing current stream settings. --- alternator/streams.cc | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index 75429bc152..2d8eab4b06 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -988,13 +988,27 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche } void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema) const { - if (schema.cdc_options().enabled()) { + auto& opts = schema.cdc_options(); + if (opts.enabled()) { auto& db = _proxy.get_db().local(); auto& cf = db.find_column_family(schema.ks_name(), cdc::log_name(schema.cf_name())); stream_arn arn(cf.schema()->id()); rjson::set(descr, "LatestStreamArn", arn); rjson::set(descr, "LatestStreamLabel", rjson::from_string(stream_label(*cf.schema()))); + auto stream_desc = rjson::empty_object(); + rjson::set(stream_desc, "StreamEnabled", true); + + auto mode = stream_view_type::KEYS_ONLY; + if (opts.preimage() && opts.postimage()) { + mode = stream_view_type::NEW_AND_OLD_IMAGES; + } else if (opts.preimage()) { + mode = stream_view_type::OLD_IMAGE; + } else if (opts.postimage()) { + mode = stream_view_type::NEW_IMAGE; + } + rjson::set(stream_desc, "StreamViewType", mode); + rjson::set(descr, "StreamSpecification", std::move(stream_desc)); } }