Mark CDC as GA
Enable CDC by default. Rename CDC experimental feature to UNUSED_CDC to keep accepting cdc flag. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
@@ -876,7 +876,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
|
||||
}
|
||||
|
||||
bool db::config::check_experimental(experimental_features_t::feature f) const {
|
||||
if (experimental() && f != experimental_features_t::UNUSED) {
|
||||
if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) {
|
||||
return true;
|
||||
}
|
||||
const auto& optval = experimental_features();
|
||||
@@ -930,11 +930,13 @@ std::unordered_map<sstring, db::experimental_features_t::feature> db::experiment
|
||||
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
|
||||
// Lightweight transactions are no longer experimental. Map them
|
||||
// to UNUSED switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
|
||||
// Change Data Capture is no longer experimental. Map it
|
||||
// to UNUSED_CDC switch for a while, then remove altogether.
|
||||
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}};
|
||||
}
|
||||
|
||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||
return {UDF, CDC};
|
||||
return {UDF};
|
||||
}
|
||||
|
||||
template struct utils::config_file::named_value<seastar::log_level>;
|
||||
|
||||
@@ -81,7 +81,7 @@ namespace db {
|
||||
|
||||
/// Enumeration of all valid values for the `experimental` config entry.
|
||||
struct experimental_features_t {
|
||||
enum feature { UNUSED, UDF, CDC };
|
||||
enum feature { UNUSED, UDF, UNUSED_CDC };
|
||||
static std::unordered_map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
};
|
||||
|
||||
@@ -118,10 +118,6 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
|
||||
}
|
||||
}
|
||||
|
||||
if (!cfg.check_experimental(db::experimental_features_t::CDC)) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::CDC));
|
||||
}
|
||||
|
||||
return fcfg;
|
||||
}
|
||||
|
||||
|
||||
@@ -303,7 +303,7 @@ void storage_service::prepare_to_join(
|
||||
tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
|
||||
|
||||
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
|
||||
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
if (!_cdc_streams_ts) {
|
||||
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
|
||||
// unless we are restarting after upgrading from non-CDC supported version.
|
||||
// In that case we won't begin a CDC generation: it should be done by one of the nodes
|
||||
@@ -553,7 +553,7 @@ void storage_service::join_token_ring(int delay) {
|
||||
assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts);
|
||||
}
|
||||
|
||||
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
if (!_cdc_streams_ts) {
|
||||
// If we didn't choose a CDC streams timestamp at this point, then either
|
||||
// 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason,
|
||||
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
|
||||
@@ -901,29 +901,23 @@ void storage_service::bootstrap() {
|
||||
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
|
||||
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
|
||||
|
||||
if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
|
||||
// Update pending ranges now, so we correctly count ourselves as a pending replica
|
||||
// when inserting the new CDC generation.
|
||||
with_token_metadata_lock([this] {
|
||||
return get_mutable_token_metadata_ptr().then([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->add_bootstrap_tokens(_bootstrap_tokens, endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
|
||||
});
|
||||
}).get();
|
||||
// Update pending ranges now, so we correctly count ourselves as a pending replica
|
||||
// when inserting the new CDC generation.
|
||||
with_token_metadata_lock([this] {
|
||||
return get_mutable_token_metadata_ptr().then([this] (mutable_token_metadata_ptr tmptr) {
|
||||
auto endpoint = get_broadcast_address();
|
||||
tmptr->add_bootstrap_tokens(_bootstrap_tokens, endpoint);
|
||||
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
|
||||
});
|
||||
}).get();
|
||||
|
||||
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
assert(!_cdc_streams_ts);
|
||||
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
assert(!_cdc_streams_ts);
|
||||
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
} else {
|
||||
// We should not be able to join the cluster if other nodes support CDC but we don't.
|
||||
// The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`).
|
||||
assert(!_feature_service.cluster_supports_cdc());
|
||||
}
|
||||
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
|
||||
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
|
||||
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
|
||||
|
||||
_gossiper.add_local_application_state({
|
||||
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
|
||||
@@ -2062,9 +2056,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) {
|
||||
return seastar::async([&ss, do_bind] {
|
||||
if (!ss._initialized) {
|
||||
slogger.warn("Starting gossip by operator request");
|
||||
bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC);
|
||||
ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(),
|
||||
cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt);
|
||||
std::make_optional(cdc::get_local_streams_timestamp().get0()));
|
||||
ss._gossiper.force_newer_generation();
|
||||
ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] {
|
||||
ss._initialized = true;
|
||||
|
||||
@@ -46,9 +46,6 @@ static cql_test_config mk_cdc_test_config() {
|
||||
auto ext = std::make_shared<db::extensions>();
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
auto cfg = ::make_shared<db::config>(std::move(ext));
|
||||
auto features = cfg->experimental_features();
|
||||
features.emplace_back(db::experimental_features_t::CDC);
|
||||
cfg->experimental_features(features);
|
||||
return cql_test_config(std::move(cfg));
|
||||
};
|
||||
|
||||
|
||||
@@ -931,8 +931,8 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC});
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
@@ -943,7 +943,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
@@ -954,7 +954,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
@@ -964,8 +964,8 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC}));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
@@ -979,7 +979,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
|
||||
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
|
||||
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
|
||||
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
});
|
||||
@@ -990,7 +990,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental: true", throw_on_error);
|
||||
BOOST_CHECK(cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
@@ -1000,7 +1000,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
|
||||
auto cfg_ptr = std::make_unique<config>();
|
||||
config& cfg = *cfg_ptr;
|
||||
cfg.read_from_yaml("experimental: false", throw_on_error);
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
|
||||
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
|
||||
return make_ready_future();
|
||||
|
||||
@@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) {
|
||||
// Extensions have to be registered here - config needs to have them before construction of test env.
|
||||
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
|
||||
auto cfg = ::make_shared<db::config>(ext);
|
||||
cfg->experimental_features({db::experimental_features_t::feature::CDC});
|
||||
|
||||
return do_with_cql_env([] (cql_test_env& e) {
|
||||
auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) {
|
||||
|
||||
@@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
|
||||
auto db_cfg_ptr = ::make_shared<db::config>(std::move(extensions));
|
||||
auto& db_cfg = *db_cfg_ptr;
|
||||
db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine);
|
||||
db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine);
|
||||
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
|
||||
if (regenerate) {
|
||||
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user