diff --git a/alternator/executor.cc b/alternator/executor.cc index b13fbac2d0..0fe778676b 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -698,7 +698,7 @@ static void update_tags_map(const rjson::value& tags, std::map // are fixed, this issue will automatically get fixed as well. static future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map&& tags_map) { schema_builder builder(schema); - builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared(std::move(tags_map))}}); + builder.add_extension(tags_extension::NAME, ::make_shared(std::move(tags_map))); return mm.announce_column_family_update(builder.build(), false, std::vector(), false); } @@ -932,18 +932,10 @@ future executor::create_table(client_state& clien if (rjson::find(request, "SSESpecification")) { return make_ready_future(api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported.")); } - // We don't yet support streams (CDC), but a StreamSpecification asking - // *not* to use streams should be accepted: + rjson::value* stream_specification = rjson::find(request, "StreamSpecification"); if (stream_specification && stream_specification->IsObject()) { - rjson::value* stream_enabled = rjson::find(*stream_specification, "StreamEnabled"); - if (!stream_enabled || !stream_enabled->IsBool()) { - return make_ready_future(api_error("ValidationException", "StreamSpecification needs boolean StreamEnabled")); - } - if (stream_enabled->GetBool()) { - // TODO: support streams - return make_ready_future(api_error("ValidationException", "StreamSpecification: streams (CDC) is not yet supported.")); - } + add_stream_options(*stream_specification, builder); } // Parse the "Tags" parameter early, so we can avoid creating the table @@ -954,7 +946,7 @@ future executor::create_table(client_state& clien update_tags_map(*tags, tags_map, update_tags_action::add_tags); } - builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared()}}); + builder.add_extension(tags_extension::NAME, ::make_shared()); schema_ptr schema = builder.build(); auto where_clause_it = where_clauses.begin(); for (auto& view_builder : view_builders) { @@ -968,7 +960,7 @@ future executor::create_table(client_state& clien } const bool include_all_columns = true; view_builder.with_view_info(*schema, include_all_columns, *where_clause_it); - view_builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared()}}); + view_builder.add_extension(tags_extension::NAME, ::make_shared()); ++where_clause_it; } diff --git a/alternator/executor.hh b/alternator/executor.hh index 9acd8b3f66..4981669458 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -140,6 +140,7 @@ public: + static void add_stream_options(const rjson::value& stream_spec, schema_builder&); }; } diff --git a/alternator/streams.cc b/alternator/streams.cc index 6c86c7cf83..10a0755633 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -34,6 +34,7 @@ #include "cql3/selection/selection.hh" #include "cql3/result_set.hh" #include "cql3/type_json.hh" +#include "schema_builder.hh" #include "executor.hh" #include "tags_extension.hh" @@ -853,5 +854,36 @@ future executor::get_records(client_state& client }); } +void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder) { + auto stream_enabled = rjson::find(stream_specification, "StreamEnabled"); + if (!stream_enabled || !stream_enabled->IsBool()) { + throw validation_exception("StreamSpecification needs boolean StreamEnabled"); + } + + if (stream_enabled->GetBool()) { + cdc::options opts; + opts.enabled(true); + auto type = rjson::get_opt(stream_specification, "StreamViewType").value_or(stream_view_type::KEYS_ONLY); + switch (type) { + default: + break; + case stream_view_type::NEW_AND_OLD_IMAGES: + opts.postimage(true); + opts.preimage(true); + break; + case stream_view_type::OLD_IMAGE: + opts.preimage(true); + break; + case stream_view_type::NEW_IMAGE: + opts.postimage(true); + break; + } + builder.with_cdc_options(opts); + } else { + cdc::options opts; + opts.enabled(false); + builder.with_cdc_options(opts); + } +} }