executor: Allow streams specification in CreateTable schema

This commit is contained in:
Calle Wilund
2020-05-18 10:24:57 +00:00
parent 3376209718
commit 45ee73969d
3 changed files with 38 additions and 13 deletions

View File

@@ -698,7 +698,7 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
// are fixed, this issue will automatically get fixed as well.
static future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
schema_builder builder(schema);
builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>(std::move(tags_map))}});
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(std::move(tags_map)));
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>(), false);
}
@@ -932,18 +932,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (rjson::find(request, "SSESpecification")) {
return make_ready_future<request_return_type>(api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported."));
}
// We don't yet support streams (CDC), but a StreamSpecification asking
// *not* to use streams should be accepted:
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
rjson::value* stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (!stream_enabled || !stream_enabled->IsBool()) {
return make_ready_future<request_return_type>(api_error("ValidationException", "StreamSpecification needs boolean StreamEnabled"));
}
if (stream_enabled->GetBool()) {
// TODO: support streams
return make_ready_future<request_return_type>(api_error("ValidationException", "StreamSpecification: streams (CDC) is not yet supported."));
}
add_stream_options(*stream_specification, builder);
}
// Parse the "Tags" parameter early, so we can avoid creating the table
@@ -954,7 +946,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
}
builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>()}});
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>());
schema_ptr schema = builder.build();
auto where_clause_it = where_clauses.begin();
for (auto& view_builder : view_builders) {
@@ -968,7 +960,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
}
const bool include_all_columns = true;
view_builder.with_view_info(*schema, include_all_columns, *where_clause_it);
view_builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>()}});
view_builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>());
++where_clause_it;
}

View File

@@ -140,6 +140,7 @@ public:
static void add_stream_options(const rjson::value& stream_spec, schema_builder&);
};
}

View File

@@ -34,6 +34,7 @@
#include "cql3/selection/selection.hh"
#include "cql3/result_set.hh"
#include "cql3/type_json.hh"
#include "schema_builder.hh"
#include "executor.hh"
#include "tags_extension.hh"
@@ -853,5 +854,36 @@ future<executor::request_return_type> executor::get_records(client_state& client
});
}
void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder) {
auto stream_enabled = rjson::find(stream_specification, "StreamEnabled");
if (!stream_enabled || !stream_enabled->IsBool()) {
throw validation_exception("StreamSpecification needs boolean StreamEnabled");
}
if (stream_enabled->GetBool()) {
cdc::options opts;
opts.enabled(true);
auto type = rjson::get_opt<stream_view_type>(stream_specification, "StreamViewType").value_or(stream_view_type::KEYS_ONLY);
switch (type) {
default:
break;
case stream_view_type::NEW_AND_OLD_IMAGES:
opts.postimage(true);
opts.preimage(true);
break;
case stream_view_type::OLD_IMAGE:
opts.preimage(true);
break;
case stream_view_type::NEW_IMAGE:
opts.postimage(true);
break;
}
builder.with_cdc_options(opts);
} else {
cdc::options opts;
opts.enabled(false);
builder.with_cdc_options(opts);
}
}
}