Merge 'Mark CDC as GA' from Piotr Jastrzębski

CDC is ready to be a non-experimental feature so remove the experimental flag for it.
Also, guard Alternator Streams with their own experimental flag. Previously, they were using CDC experimental flag as they depend on CDC.

Tests: unit(dev)

Closes #7539

* github.com:scylladb/scylla:
  alternator: guard streams with an experimental flag
  Mark CDC as GA
  cdc: Make it possible for CDC generation creation to fail

(cherry picked from commit 78649c2322)
This commit is contained in:
Nadav Har'El
2020-11-12 13:49:27 +02:00
committed by Avi Kivity
parent 8c3e8350d6
commit da29b65e04
13 changed files with 74 additions and 46 deletions

View File

@@ -1047,6 +1047,9 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
if (!db.features().cluster_supports_cdc()) {
throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster.");
}
if (!db.features().cluster_supports_alternator_streams()) {
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
}
cdc::options opts;
opts.enabled(true);

View File

@@ -154,7 +154,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
future<db_clock::time_point> get_local_streams_timestamp();
/* Generate a new set of CDC streams and insert it into the distributed cdc_generation_descriptions table.
* Returns the timestamp of this new generation.
* Returns the timestamp of this new generation
*
* Should be called when starting the node for the first time (i.e., joining the ring).
*

View File

@@ -874,7 +874,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
}
bool db::config::check_experimental(experimental_features_t::feature f) const {
if (experimental() && f != experimental_features_t::UNUSED) {
if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) {
return true;
}
const auto& optval = experimental_features();
@@ -928,11 +928,13 @@ std::unordered_map<sstring, db::experimental_features_t::feature> db::experiment
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
// Lightweight transactions are no longer experimental. Map them
// to UNUSED switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
// Change Data Capture is no longer experimental. Map it
// to UNUSED_CDC switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}, {"alternator-streams", ALTERNATOR_STREAMS}};
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
return {UDF, CDC};
return {UDF, ALTERNATOR_STREAMS};
}
template struct utils::config_file::named_value<seastar::log_level>;

View File

@@ -81,7 +81,7 @@ namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { UNUSED, UDF, CDC };
enum feature { UNUSED, UDF, UNUSED_CDC, ALTERNATOR_STREAMS };
static std::unordered_map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();
};

View File

@@ -143,6 +143,7 @@ extern const std::string_view LWT;
extern const std::string_view PER_TABLE_PARTITIONERS;
extern const std::string_view PER_TABLE_CACHING;
extern const std::string_view DIGEST_FOR_NULL_VALUES;
extern const std::string_view ALTERNATOR_STREAMS;
}

View File

@@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT";
constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS";
constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING";
constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES";
constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS";
static logging::logger logger("features");
@@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS)
, _per_table_caching_feature(*this, features::PER_TABLE_CACHING)
, _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES)
, _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS)
{}
feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring> disabled) {
@@ -116,8 +118,8 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
}
}
if (!cfg.check_experimental(db::experimental_features_t::CDC)) {
fcfg._disabled_features.insert(sstring(gms::features::CDC));
if (!cfg.check_experimental(db::experimental_features_t::ALTERNATOR_STREAMS)) {
fcfg._disabled_features.insert(sstring(gms::features::ALTERNATOR_STREAMS));
}
return fcfg;
@@ -187,6 +189,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::UDF,
gms::features::CDC,
gms::features::DIGEST_FOR_NULL_VALUES,
gms::features::ALTERNATOR_STREAMS,
};
for (const sstring& s : _config._disabled_features) {
@@ -266,6 +269,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_per_table_partitioners_feature),
std::ref(_per_table_caching_feature),
std::ref(_digest_for_null_values_feature),
std::ref(_alternator_streams_feature),
})
{
if (list.contains(f.name())) {

View File

@@ -92,6 +92,7 @@ private:
gms::feature _per_table_partitioners_feature;
gms::feature _per_table_caching_feature;
gms::feature _digest_for_null_values_feature;
gms::feature _alternator_streams_feature;
public:
bool cluster_supports_user_defined_functions() const {
@@ -160,6 +161,10 @@ public:
bool cluster_supports_lwt() const {
return bool(_lwt_feature);
}
bool cluster_supports_alternator_streams() const {
return bool(_alternator_streams_feature);
}
};
} // namespace gms

View File

@@ -298,7 +298,7 @@ void storage_service::prepare_to_join(
_token_metadata.update_normal_tokens(my_tokens, get_broadcast_address());
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
if (!_cdc_streams_ts) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
// In that case we won't begin a CDC generation: it should be done by one of the nodes
@@ -550,7 +550,7 @@ void storage_service::join_token_ring(int delay) {
assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts);
}
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
if (!_cdc_streams_ts) {
// If we didn't choose a CDC streams timestamp at this point, then either
// 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason,
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
@@ -570,10 +570,15 @@ void storage_service::join_token_ring(int delay) {
if (!db().local().is_replacing()
&& (!db::system_keyspace::bootstrap_complete()
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
try {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
} catch (...) {
cdc_log.warn(
"Could not create a new CDC generation: {}. This may make it impossible to use CDC. Use nodetool checkAndRepairCdcStreams to fix CDC generation",
std::current_exception());
}
}
}
@@ -893,24 +898,18 @@ void storage_service::bootstrap() {
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
// Update pending ranges now, so we correctly count ourselves as a pending replica
// when inserting the new CDC generation.
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
update_pending_ranges().get();
// Update pending ranges now, so we correctly count ourselves as a pending replica
// when inserting the new CDC generation.
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
update_pending_ranges().get();
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!_cdc_streams_ts);
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!_cdc_streams_ts);
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
} else {
// We should not be able to join the cluster if other nodes support CDC but we don't.
// The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`).
assert(!_feature_service.cluster_supports_cdc());
}
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
_gossiper.add_local_application_state({
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
@@ -2036,9 +2035,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) {
return seastar::async([&ss, do_bind] {
if (!ss._initialized) {
slogger.warn("Starting gossip by operator request");
bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC);
ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(),
cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt);
std::make_optional(cdc::get_local_streams_timestamp().get0()));
ss._gossiper.force_newer_generation();
ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] {
ss._initialized = true;

View File

@@ -86,7 +86,7 @@ ln -s "$SCYLLA" "$SCYLLA_LINK"
--alternator-write-isolation=always_use_lwt \
--alternator-streams-time-window-s=0 \
--developer-mode=1 \
--experimental-features=cdc \
--experimental-features=alternator-streams \
--ring-delay-ms 0 --collectd 0 \
--smp 2 -m 1G \
--overprovisioned --unsafe-bypass-fsync 1 \

View File

@@ -46,9 +46,6 @@ static cql_test_config mk_cdc_test_config() {
auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
auto cfg = ::make_shared<db::config>(std::move(ext));
auto features = cfg->experimental_features();
features.emplace_back(db::experimental_features_t::CDC);
cfg->experimental_features(features);
return cql_test_config(std::move(cfg));
};

View File

@@ -931,10 +931,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC});
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -943,9 +944,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -954,9 +956,22 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_alternator_streams) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - alternator-streams\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::ALTERNATOR_STREAMS});
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -964,10 +979,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC}));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -979,9 +995,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
});
return make_ready_future();
}
@@ -990,9 +1007,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -1000,8 +1018,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}

View File

@@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) {
// Extensions have to be registered here - config needs to have them before construction of test env.
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
auto cfg = ::make_shared<db::config>(ext);
cfg->experimental_features({db::experimental_features_t::feature::CDC});
return do_with_cql_env([] (cql_test_env& e) {
auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) {

View File

@@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
auto db_cfg_ptr = ::make_shared<db::config>(std::move(extensions));
auto& db_cfg = *db_cfg_ptr;
db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine);
db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine);
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
if (regenerate) {
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
} else {