diff --git a/alternator/streams.cc b/alternator/streams.cc index e1a4c6534c..cb8d4ba9a3 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -1047,6 +1047,9 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche if (!db.features().cluster_supports_cdc()) { throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster."); } + if (!db.features().cluster_supports_alternator_streams()) { + throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster."); + } cdc::options opts; opts.enabled(true); diff --git a/cdc/generation.hh b/cdc/generation.hh index 652d5be5aa..060b04e3d1 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -154,7 +154,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos future get_local_streams_timestamp(); /* Generate a new set of CDC streams and insert it into the distributed cdc_generation_descriptions table. - * Returns the timestamp of this new generation. + * Returns the timestamp of this new generation * * Should be called when starting the node for the first time (i.e., joining the ring). * diff --git a/db/config.cc b/db/config.cc index 683f420167..88bf71d80d 100644 --- a/db/config.cc +++ b/db/config.cc @@ -874,7 +874,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) { } bool db::config::check_experimental(experimental_features_t::feature f) const { - if (experimental() && f != experimental_features_t::UNUSED) { + if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) { return true; } const auto& optval = experimental_features(); @@ -928,11 +928,13 @@ std::unordered_map db::experiment // https://github.com/scylladb/scylla/pull/5369#discussion_r353614807 // Lightweight transactions are no longer experimental. Map them // to UNUSED switch for a while, then remove altogether. - return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}}; + // Change Data Capture is no longer experimental. Map it + // to UNUSED_CDC switch for a while, then remove altogether. + return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}, {"alternator-streams", ALTERNATOR_STREAMS}}; } std::vector> db::experimental_features_t::all() { - return {UDF, CDC}; + return {UDF, ALTERNATOR_STREAMS}; } template struct utils::config_file::named_value; diff --git a/db/config.hh b/db/config.hh index 9184e5bf0c..6c0235b5fb 100644 --- a/db/config.hh +++ b/db/config.hh @@ -81,7 +81,7 @@ namespace db { /// Enumeration of all valid values for the `experimental` config entry. struct experimental_features_t { - enum feature { UNUSED, UDF, CDC }; + enum feature { UNUSED, UDF, UNUSED_CDC, ALTERNATOR_STREAMS }; static std::unordered_map map(); // See enum_option. static std::vector> all(); }; diff --git a/gms/feature.hh b/gms/feature.hh index 0c8c379ccf..58fa095c10 100644 --- a/gms/feature.hh +++ b/gms/feature.hh @@ -143,6 +143,7 @@ extern const std::string_view LWT; extern const std::string_view PER_TABLE_PARTITIONERS; extern const std::string_view PER_TABLE_CACHING; extern const std::string_view DIGEST_FOR_NULL_VALUES; +extern const std::string_view ALTERNATOR_STREAMS; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 771d16df90..7d50f01e57 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT"; constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS"; constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING"; constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES"; +constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS"; static logging::logger logger("features"); @@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg) , _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS) , _per_table_caching_feature(*this, features::PER_TABLE_CACHING) , _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES) + , _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS) {} feature_config feature_config_from_db_config(db::config& cfg, std::set disabled) { @@ -116,8 +118,8 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set } } - if (!cfg.check_experimental(db::experimental_features_t::CDC)) { - fcfg._disabled_features.insert(sstring(gms::features::CDC)); + if (!cfg.check_experimental(db::experimental_features_t::ALTERNATOR_STREAMS)) { + fcfg._disabled_features.insert(sstring(gms::features::ALTERNATOR_STREAMS)); } return fcfg; @@ -187,6 +189,7 @@ std::set feature_service::known_feature_set() { gms::features::UDF, gms::features::CDC, gms::features::DIGEST_FOR_NULL_VALUES, + gms::features::ALTERNATOR_STREAMS, }; for (const sstring& s : _config._disabled_features) { @@ -266,6 +269,7 @@ void feature_service::enable(const std::set& list) { std::ref(_per_table_partitioners_feature), std::ref(_per_table_caching_feature), std::ref(_digest_for_null_values_feature), + std::ref(_alternator_streams_feature), }) { if (list.contains(f.name())) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index ca0e6b6b68..209ea32cc9 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -92,6 +92,7 @@ private: gms::feature _per_table_partitioners_feature; gms::feature _per_table_caching_feature; gms::feature _digest_for_null_values_feature; + gms::feature _alternator_streams_feature; public: bool cluster_supports_user_defined_functions() const { @@ -160,6 +161,10 @@ public: bool cluster_supports_lwt() const { return bool(_lwt_feature); } + + bool cluster_supports_alternator_streams() const { + return bool(_alternator_streams_feature); + } }; } // namespace gms diff --git a/service/storage_service.cc b/service/storage_service.cc index 950c475783..3bb4858a14 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -298,7 +298,7 @@ void storage_service::prepare_to_join( _token_metadata.update_normal_tokens(my_tokens, get_broadcast_address()); _cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0(); - if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { + if (!_cdc_streams_ts) { // We could not have completed joining if we didn't generate and persist a CDC streams timestamp, // unless we are restarting after upgrading from non-CDC supported version. // In that case we won't begin a CDC generation: it should be done by one of the nodes @@ -550,7 +550,7 @@ void storage_service::join_token_ring(int delay) { assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts); } - if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { + if (!_cdc_streams_ts) { // If we didn't choose a CDC streams timestamp at this point, then either // 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason, // 2. we've already bootstrapped, but are upgrading from a non-CDC version, @@ -570,10 +570,15 @@ void storage_service::join_token_ring(int delay) { if (!db().local().is_replacing() && (!db::system_keyspace::bootstrap_complete() || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { - - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), - _bootstrap_tokens, _token_metadata, _gossiper, - _sys_dist_ks.local(), get_ring_delay(), _for_testing); + try { + _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); + } catch (...) { + cdc_log.warn( + "Could not create a new CDC generation: {}. This may make it impossible to use CDC. Use nodetool checkAndRepairCdcStreams to fix CDC generation", + std::current_exception()); + } } } @@ -893,24 +898,18 @@ void storage_service::bootstrap() { // It doesn't hurt: other nodes will (potentially) just do more generation switches. // We do this because with this new attempt at bootstrapping we picked a different set of tokens. - if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { - // Update pending ranges now, so we correctly count ourselves as a pending replica - // when inserting the new CDC generation. - _token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address()); - update_pending_ranges().get(); + // Update pending ranges now, so we correctly count ourselves as a pending replica + // when inserting the new CDC generation. + _token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address()); + update_pending_ranges().get(); - // After we pick a generation timestamp, we start gossiping it, and we stick with it. - // We don't do any other generation switches (unless we crash before complecting bootstrap). - assert(!_cdc_streams_ts); + // After we pick a generation timestamp, we start gossiping it, and we stick with it. + // We don't do any other generation switches (unless we crash before complecting bootstrap). + assert(!_cdc_streams_ts); - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), - _bootstrap_tokens, _token_metadata, _gossiper, - _sys_dist_ks.local(), get_ring_delay(), _for_testing); - } else { - // We should not be able to join the cluster if other nodes support CDC but we don't. - // The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`). - assert(!_feature_service.cluster_supports_cdc()); - } + _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); _gossiper.add_local_application_state({ // Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status. @@ -2036,9 +2035,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) { return seastar::async([&ss, do_bind] { if (!ss._initialized) { slogger.warn("Starting gossip by operator request"); - bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC); ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(), - cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt); + std::make_optional(cdc::get_local_streams_timestamp().get0())); ss._gossiper.force_newer_generation(); ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] { ss._initialized = true; diff --git a/test/alternator/run b/test/alternator/run index d7530ab1e8..d72118479a 100755 --- a/test/alternator/run +++ b/test/alternator/run @@ -86,7 +86,7 @@ ln -s "$SCYLLA" "$SCYLLA_LINK" --alternator-write-isolation=always_use_lwt \ --alternator-streams-time-window-s=0 \ --developer-mode=1 \ - --experimental-features=cdc \ + --experimental-features=alternator-streams \ --ring-delay-ms 0 --collectd 0 \ --smp 2 -m 1G \ --overprovisioned --unsafe-bypass-fsync 1 \ diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index e5cedc89ce..ee4c0c70c9 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -46,9 +46,6 @@ static cql_test_config mk_cdc_test_config() { auto ext = std::make_shared(); ext->add_schema_extension(cdc::cdc_extension::NAME); auto cfg = ::make_shared(std::move(ext)); - auto features = cfg->experimental_features(); - features.emplace_back(db::experimental_features_t::CDC); - cfg->experimental_features(features); return cql_test_config(std::move(cfg)); }; diff --git a/test/boost/config_test.cc b/test/boost/config_test.cc index a68cd75421..909b6decab 100644 --- a/test/boost/config_test.cc +++ b/test/boost/config_test.cc @@ -931,10 +931,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error); - BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC}); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC}); + BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -943,9 +944,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) { config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error); BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED}); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -954,9 +956,22 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) { config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error); BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF}); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); + return make_ready_future(); +} + +SEASTAR_TEST_CASE(test_parse_experimental_features_alternator_streams) { + auto cfg_ptr = std::make_unique(); + config& cfg = *cfg_ptr; + cfg.read_from_yaml("experimental_features:\n - alternator-streams\n", throw_on_error); + BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::ALTERNATOR_STREAMS}); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); + BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -964,10 +979,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error); - BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC})); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC})); + BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -979,9 +995,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) { [&cfg] (const sstring& opt, const sstring& msg, std::optional status) { BOOST_REQUIRE_EQUAL(opt, "experimental_features"); BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); }); return make_ready_future(); } @@ -990,9 +1007,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental: true", throw_on_error); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(cfg.check_experimental(ef::UDF)); + BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -1000,8 +1018,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental: false", throw_on_error); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } diff --git a/test/boost/extensions_test.cc b/test/boost/extensions_test.cc index a6c401e81e..bec4b6b1b0 100644 --- a/test/boost/extensions_test.cc +++ b/test/boost/extensions_test.cc @@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) { // Extensions have to be registered here - config needs to have them before construction of test env. ext->add_schema_extension(cdc::cdc_extension::NAME); auto cfg = ::make_shared(ext); - cfg->experimental_features({db::experimental_features_t::feature::CDC}); return do_with_cql_env([] (cql_test_env& e) { auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) { diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index 2655bba915..610172732b 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_ auto db_cfg_ptr = ::make_shared(std::move(extensions)); auto& db_cfg = *db_cfg_ptr; db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine); - db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine); + db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine); if (regenerate) { db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine); } else {