From da29b65e040c11aedf6592c680f36e367d7863f5 Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Thu, 12 Nov 2020 13:49:27 +0200 Subject: [PATCH] =?UTF-8?q?Merge=20'Mark=20CDC=20as=20GA'=20from=20Piotr?= =?UTF-8?q?=20Jastrz=C4=99bski?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CDC is ready to be a non-experimental feature so remove the experimental flag for it. Also, guard Alternator Streams with their own experimental flag. Previously, they were using CDC experimental flag as they depend on CDC. Tests: unit(dev) Closes #7539 * github.com:scylladb/scylla: alternator: guard streams with an experimental flag Mark CDC as GA cdc: Make it possible for CDC generation creation to fail (cherry picked from commit 78649c2322c63f3156440d86bdecb1c16ddb2efe) --- alternator/streams.cc | 3 +++ cdc/generation.hh | 2 +- db/config.cc | 8 +++--- db/config.hh | 2 +- gms/feature.hh | 1 + gms/feature_service.cc | 8 ++++-- gms/feature_service.hh | 5 ++++ service/storage_service.cc | 46 +++++++++++++++----------------- test/alternator/run | 2 +- test/boost/cdc_test.cc | 3 --- test/boost/config_test.cc | 37 ++++++++++++++++++------- test/boost/extensions_test.cc | 1 - test/boost/schema_change_test.cc | 2 +- 13 files changed, 74 insertions(+), 46 deletions(-) diff --git a/alternator/streams.cc b/alternator/streams.cc index e1a4c6534c..cb8d4ba9a3 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -1047,6 +1047,9 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche if (!db.features().cluster_supports_cdc()) { throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster."); } + if (!db.features().cluster_supports_alternator_streams()) { + throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster."); + } cdc::options opts; opts.enabled(true); diff --git a/cdc/generation.hh b/cdc/generation.hh index 652d5be5aa..060b04e3d1 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -154,7 +154,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos future get_local_streams_timestamp(); /* Generate a new set of CDC streams and insert it into the distributed cdc_generation_descriptions table. - * Returns the timestamp of this new generation. + * Returns the timestamp of this new generation * * Should be called when starting the node for the first time (i.e., joining the ring). * diff --git a/db/config.cc b/db/config.cc index 683f420167..88bf71d80d 100644 --- a/db/config.cc +++ b/db/config.cc @@ -874,7 +874,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) { } bool db::config::check_experimental(experimental_features_t::feature f) const { - if (experimental() && f != experimental_features_t::UNUSED) { + if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) { return true; } const auto& optval = experimental_features(); @@ -928,11 +928,13 @@ std::unordered_map db::experiment // https://github.com/scylladb/scylla/pull/5369#discussion_r353614807 // Lightweight transactions are no longer experimental. Map them // to UNUSED switch for a while, then remove altogether. - return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}}; + // Change Data Capture is no longer experimental. Map it + // to UNUSED_CDC switch for a while, then remove altogether. + return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}, {"alternator-streams", ALTERNATOR_STREAMS}}; } std::vector> db::experimental_features_t::all() { - return {UDF, CDC}; + return {UDF, ALTERNATOR_STREAMS}; } template struct utils::config_file::named_value; diff --git a/db/config.hh b/db/config.hh index 9184e5bf0c..6c0235b5fb 100644 --- a/db/config.hh +++ b/db/config.hh @@ -81,7 +81,7 @@ namespace db { /// Enumeration of all valid values for the `experimental` config entry. struct experimental_features_t { - enum feature { UNUSED, UDF, CDC }; + enum feature { UNUSED, UDF, UNUSED_CDC, ALTERNATOR_STREAMS }; static std::unordered_map map(); // See enum_option. static std::vector> all(); }; diff --git a/gms/feature.hh b/gms/feature.hh index 0c8c379ccf..58fa095c10 100644 --- a/gms/feature.hh +++ b/gms/feature.hh @@ -143,6 +143,7 @@ extern const std::string_view LWT; extern const std::string_view PER_TABLE_PARTITIONERS; extern const std::string_view PER_TABLE_CACHING; extern const std::string_view DIGEST_FOR_NULL_VALUES; +extern const std::string_view ALTERNATOR_STREAMS; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 771d16df90..7d50f01e57 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT"; constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS"; constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING"; constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES"; +constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS"; static logging::logger logger("features"); @@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg) , _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS) , _per_table_caching_feature(*this, features::PER_TABLE_CACHING) , _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES) + , _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS) {} feature_config feature_config_from_db_config(db::config& cfg, std::set disabled) { @@ -116,8 +118,8 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set } } - if (!cfg.check_experimental(db::experimental_features_t::CDC)) { - fcfg._disabled_features.insert(sstring(gms::features::CDC)); + if (!cfg.check_experimental(db::experimental_features_t::ALTERNATOR_STREAMS)) { + fcfg._disabled_features.insert(sstring(gms::features::ALTERNATOR_STREAMS)); } return fcfg; @@ -187,6 +189,7 @@ std::set feature_service::known_feature_set() { gms::features::UDF, gms::features::CDC, gms::features::DIGEST_FOR_NULL_VALUES, + gms::features::ALTERNATOR_STREAMS, }; for (const sstring& s : _config._disabled_features) { @@ -266,6 +269,7 @@ void feature_service::enable(const std::set& list) { std::ref(_per_table_partitioners_feature), std::ref(_per_table_caching_feature), std::ref(_digest_for_null_values_feature), + std::ref(_alternator_streams_feature), }) { if (list.contains(f.name())) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index ca0e6b6b68..209ea32cc9 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -92,6 +92,7 @@ private: gms::feature _per_table_partitioners_feature; gms::feature _per_table_caching_feature; gms::feature _digest_for_null_values_feature; + gms::feature _alternator_streams_feature; public: bool cluster_supports_user_defined_functions() const { @@ -160,6 +161,10 @@ public: bool cluster_supports_lwt() const { return bool(_lwt_feature); } + + bool cluster_supports_alternator_streams() const { + return bool(_alternator_streams_feature); + } }; } // namespace gms diff --git a/service/storage_service.cc b/service/storage_service.cc index 950c475783..3bb4858a14 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -298,7 +298,7 @@ void storage_service::prepare_to_join( _token_metadata.update_normal_tokens(my_tokens, get_broadcast_address()); _cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0(); - if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { + if (!_cdc_streams_ts) { // We could not have completed joining if we didn't generate and persist a CDC streams timestamp, // unless we are restarting after upgrading from non-CDC supported version. // In that case we won't begin a CDC generation: it should be done by one of the nodes @@ -550,7 +550,7 @@ void storage_service::join_token_ring(int delay) { assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts); } - if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { + if (!_cdc_streams_ts) { // If we didn't choose a CDC streams timestamp at this point, then either // 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason, // 2. we've already bootstrapped, but are upgrading from a non-CDC version, @@ -570,10 +570,15 @@ void storage_service::join_token_ring(int delay) { if (!db().local().is_replacing() && (!db::system_keyspace::bootstrap_complete() || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { - - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), - _bootstrap_tokens, _token_metadata, _gossiper, - _sys_dist_ks.local(), get_ring_delay(), _for_testing); + try { + _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); + } catch (...) { + cdc_log.warn( + "Could not create a new CDC generation: {}. This may make it impossible to use CDC. Use nodetool checkAndRepairCdcStreams to fix CDC generation", + std::current_exception()); + } } } @@ -893,24 +898,18 @@ void storage_service::bootstrap() { // It doesn't hurt: other nodes will (potentially) just do more generation switches. // We do this because with this new attempt at bootstrapping we picked a different set of tokens. - if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) { - // Update pending ranges now, so we correctly count ourselves as a pending replica - // when inserting the new CDC generation. - _token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address()); - update_pending_ranges().get(); + // Update pending ranges now, so we correctly count ourselves as a pending replica + // when inserting the new CDC generation. + _token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address()); + update_pending_ranges().get(); - // After we pick a generation timestamp, we start gossiping it, and we stick with it. - // We don't do any other generation switches (unless we crash before complecting bootstrap). - assert(!_cdc_streams_ts); + // After we pick a generation timestamp, we start gossiping it, and we stick with it. + // We don't do any other generation switches (unless we crash before complecting bootstrap). + assert(!_cdc_streams_ts); - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), - _bootstrap_tokens, _token_metadata, _gossiper, - _sys_dist_ks.local(), get_ring_delay(), _for_testing); - } else { - // We should not be able to join the cluster if other nodes support CDC but we don't. - // The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`). - assert(!_feature_service.cluster_supports_cdc()); - } + _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _bootstrap_tokens, _token_metadata, _gossiper, + _sys_dist_ks.local(), get_ring_delay(), _for_testing); _gossiper.add_local_application_state({ // Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status. @@ -2036,9 +2035,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) { return seastar::async([&ss, do_bind] { if (!ss._initialized) { slogger.warn("Starting gossip by operator request"); - bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC); ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(), - cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt); + std::make_optional(cdc::get_local_streams_timestamp().get0())); ss._gossiper.force_newer_generation(); ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] { ss._initialized = true; diff --git a/test/alternator/run b/test/alternator/run index d7530ab1e8..d72118479a 100755 --- a/test/alternator/run +++ b/test/alternator/run @@ -86,7 +86,7 @@ ln -s "$SCYLLA" "$SCYLLA_LINK" --alternator-write-isolation=always_use_lwt \ --alternator-streams-time-window-s=0 \ --developer-mode=1 \ - --experimental-features=cdc \ + --experimental-features=alternator-streams \ --ring-delay-ms 0 --collectd 0 \ --smp 2 -m 1G \ --overprovisioned --unsafe-bypass-fsync 1 \ diff --git a/test/boost/cdc_test.cc b/test/boost/cdc_test.cc index e5cedc89ce..ee4c0c70c9 100644 --- a/test/boost/cdc_test.cc +++ b/test/boost/cdc_test.cc @@ -46,9 +46,6 @@ static cql_test_config mk_cdc_test_config() { auto ext = std::make_shared(); ext->add_schema_extension(cdc::cdc_extension::NAME); auto cfg = ::make_shared(std::move(ext)); - auto features = cfg->experimental_features(); - features.emplace_back(db::experimental_features_t::CDC); - cfg->experimental_features(features); return cql_test_config(std::move(cfg)); }; diff --git a/test/boost/config_test.cc b/test/boost/config_test.cc index a68cd75421..909b6decab 100644 --- a/test/boost/config_test.cc +++ b/test/boost/config_test.cc @@ -931,10 +931,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error); - BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC}); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC}); + BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -943,9 +944,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) { config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error); BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED}); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -954,9 +956,22 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) { config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error); BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF}); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); + return make_ready_future(); +} + +SEASTAR_TEST_CASE(test_parse_experimental_features_alternator_streams) { + auto cfg_ptr = std::make_unique(); + config& cfg = *cfg_ptr; + cfg.read_from_yaml("experimental_features:\n - alternator-streams\n", throw_on_error); + BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::ALTERNATOR_STREAMS}); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); + BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -964,10 +979,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error); - BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC})); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC})); + BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -979,9 +995,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) { [&cfg] (const sstring& opt, const sstring& msg, std::optional status) { BOOST_REQUIRE_EQUAL(opt, "experimental_features"); BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); }); return make_ready_future(); } @@ -990,9 +1007,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental: true", throw_on_error); - BOOST_CHECK(cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(cfg.check_experimental(ef::UDF)); + BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } @@ -1000,8 +1018,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) { auto cfg_ptr = std::make_unique(); config& cfg = *cfg_ptr; cfg.read_from_yaml("experimental: false", throw_on_error); - BOOST_CHECK(!cfg.check_experimental(ef::CDC)); + BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC)); BOOST_CHECK(!cfg.check_experimental(ef::UNUSED)); BOOST_CHECK(!cfg.check_experimental(ef::UDF)); + BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS)); return make_ready_future(); } diff --git a/test/boost/extensions_test.cc b/test/boost/extensions_test.cc index a6c401e81e..bec4b6b1b0 100644 --- a/test/boost/extensions_test.cc +++ b/test/boost/extensions_test.cc @@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) { // Extensions have to be registered here - config needs to have them before construction of test env. ext->add_schema_extension(cdc::cdc_extension::NAME); auto cfg = ::make_shared(ext); - cfg->experimental_features({db::experimental_features_t::feature::CDC}); return do_with_cql_env([] (cql_test_env& e) { auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) { diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index 2655bba915..610172732b 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_ auto db_cfg_ptr = ::make_shared(std::move(extensions)); auto& db_cfg = *db_cfg_ptr; db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine); - db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine); + db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine); if (regenerate) { db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine); } else {