diff --git a/alternator/executor.hh b/alternator/executor.hh index 81a05e21af..b3602c52a0 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -146,7 +146,7 @@ public: - static void add_stream_options(const rjson::value& stream_spec, schema_builder&); + void add_stream_options(const rjson::value& stream_spec, schema_builder&); }; } diff --git a/alternator/streams.cc b/alternator/streams.cc index 0f70457629..f7109d579d 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -37,6 +37,8 @@ #include "cql3/type_json.hh" #include "schema_builder.hh" #include "service/storage_service.hh" +#include "gms/feature.hh" +#include "gms/feature_service.hh" #include "executor.hh" #include "tags_extension.hh" @@ -875,6 +877,12 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche } if (stream_enabled->GetBool()) { + auto& db = _proxy.get_db().local(); + + if (!db.features().cluster_supports_cdc()) { + throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster."); + } + cdc::options opts; opts.enabled(true); auto type = rjson::get_opt(stream_specification, "StreamViewType").value_or(stream_view_type::KEYS_ONLY);