From e486e0f7593e6071d4bf0bb6915fffbd5fc57602 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 11 Mar 2021 12:19:36 +0100 Subject: [PATCH] tree-wide: rename "cdc streams timestamp" to "cdc generation id" Each CDC generation always has a timestamp, but the fact that the timestamp identifies the generation is an implementation detail. We abstract away from this detail by using a more generic naming scheme: a generation "identifier" (whatever that is - a timestamp or something else). It's possible that a CDC generation will be identified by more than a timestamp in the (near) future. The actual string gossiped by nodes in their application state is left as "CDC_STREAMS_TIMESTAMP" for backward compatibility. Some stale comments have been updated. --- cdc/generation.cc | 90 +++++++++++++++---------------- cdc/generation.hh | 3 +- cdc/generation_service.hh | 8 +-- db/system_distributed_keyspace.hh | 8 +-- db/system_keyspace.cc | 12 ++--- db/system_keyspace.hh | 8 +-- docs/design-notes/cdc.md | 2 +- gms/application_state.cc | 2 +- gms/application_state.hh | 2 +- gms/versioned_value.cc | 4 +- gms/versioned_value.hh | 10 ++-- service/storage_service.cc | 38 ++++++------- service/storage_service.hh | 2 +- 13 files changed, 94 insertions(+), 95 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index a20f0f1378..f72f50ad39 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -367,30 +367,30 @@ future make_new_cdc_generation( * but if the cluster already supports CDC, then every newly joining node will propose a new CDC generation, * which means it will gossip the generation's timestamp. */ -static std::optional get_streams_timestamp_for(const gms::inet_address& endpoint, const gms::gossiper& g) { - auto streams_ts_string = g.get_application_state_value(endpoint, gms::application_state::CDC_STREAMS_TIMESTAMP); - cdc_log.trace("endpoint={}, streams_ts_string={}", endpoint, streams_ts_string); - return gms::versioned_value::cdc_streams_timestamp_from_string(streams_ts_string); +static std::optional get_generation_id_for(const gms::inet_address& endpoint, const gms::gossiper& g) { + auto gen_id_string = g.get_application_state_value(endpoint, gms::application_state::CDC_GENERATION_ID); + cdc_log.trace("endpoint={}, gen_id_string={}", endpoint, gen_id_string); + return gms::versioned_value::cdc_generation_id_from_string(gen_id_string); } static future<> do_update_streams_description( - db_clock::time_point streams_ts, + db_clock::time_point gen_id, db::system_distributed_keyspace& sys_dist_ks, db::system_distributed_keyspace::context ctx) { - if (co_await sys_dist_ks.cdc_desc_exists(streams_ts, ctx)) { - cdc_log.info("Generation {}: streams description table already updated.", streams_ts); + if (co_await sys_dist_ks.cdc_desc_exists(gen_id, ctx)) { + cdc_log.info("Generation {}: streams description table already updated.", gen_id); co_return; } // We might race with another node also inserting the description, but that's ok. It's an idempotent operation. - auto topo = co_await sys_dist_ks.read_cdc_topology_description(streams_ts, ctx); + auto topo = co_await sys_dist_ks.read_cdc_topology_description(gen_id, ctx); if (!topo) { - throw no_generation_data_exception(streams_ts); + throw no_generation_data_exception(gen_id); } - co_await sys_dist_ks.create_cdc_desc(streams_ts, *topo, ctx); - cdc_log.info("CDC description table successfully updated with generation {}.", streams_ts); + co_await sys_dist_ks.create_cdc_desc(gen_id, *topo, ctx); + cdc_log.info("CDC description table successfully updated with generation {}.", gen_id); } /* Inform CDC users about a generation of streams (identified by the given timestamp) @@ -402,34 +402,34 @@ static future<> do_update_streams_description( * might run an asynchronous task in the background. */ static future<> update_streams_description( - db_clock::time_point streams_ts, + db_clock::time_point gen_id, shared_ptr sys_dist_ks, noncopyable_function get_num_token_owners, abort_source& abort_src) { try { - co_await do_update_streams_description(streams_ts, *sys_dist_ks, { get_num_token_owners() }); + co_await do_update_streams_description(gen_id, *sys_dist_ks, { get_num_token_owners() }); } catch (...) { cdc_log.warn( "Could not update CDC description table with generation {}: {}. Will retry in the background.", - streams_ts, std::current_exception()); + gen_id, std::current_exception()); // It is safe to discard this future: we keep system distributed keyspace alive. - (void)(([] (db_clock::time_point streams_ts, + (void)(([] (db_clock::time_point gen_id, shared_ptr sys_dist_ks, noncopyable_function get_num_token_owners, abort_source& abort_src) -> future<> { while (true) { co_await sleep_abortable(std::chrono::seconds(60), abort_src); try { - co_await do_update_streams_description(streams_ts, *sys_dist_ks, { get_num_token_owners() }); + co_await do_update_streams_description(gen_id, *sys_dist_ks, { get_num_token_owners() }); co_return; } catch (...) { cdc_log.warn( "Could not update CDC description table with generation {}: {}. Will try again.", - streams_ts, std::current_exception()); + gen_id, std::current_exception()); } } - })(streams_ts, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src)); + })(gen_id, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src)); } } @@ -662,11 +662,11 @@ generation_service::~generation_service() { assert(_stopped); } -future<> generation_service::after_join(std::optional&& startup_gen_ts) { +future<> generation_service::after_join(std::optional&& startup_gen_id) { assert_shard_zero(__PRETTY_FUNCTION__); assert(db::system_keyspace::bootstrap_complete()); - _gen_ts = std::move(startup_gen_ts); + _gen_id = std::move(startup_gen_id); _gossiper.register_(shared_from_this()); _joined = true; @@ -678,23 +678,23 @@ future<> generation_service::after_join(std::optional&& st void generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) { assert_shard_zero(__PRETTY_FUNCTION__); - auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_STREAMS_TIMESTAMP); + auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID); if (!val) { return; } - on_change(ep, gms::application_state::CDC_STREAMS_TIMESTAMP, *val); + on_change(ep, gms::application_state::CDC_GENERATION_ID, *val); } void generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) { assert_shard_zero(__PRETTY_FUNCTION__); - if (app_state != gms::application_state::CDC_STREAMS_TIMESTAMP) { + if (app_state != gms::application_state::CDC_GENERATION_ID) { return; } - auto ts = gms::versioned_value::cdc_streams_timestamp_from_string(v.value); - cdc_log.debug("Endpoint: {}, CDC generation timestamp change: {}", ep, ts); + auto ts = gms::versioned_value::cdc_generation_id_from_string(v.value); + cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, ts); handle_cdc_generation(ts).get(); } @@ -704,7 +704,7 @@ future<> generation_service::check_and_repair_cdc_streams() { throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet"); } - auto latest = _gen_ts; + auto latest = _gen_id; const auto& endpoint_states = _gossiper.get_endpoint_states(); for (const auto& [addr, state] : endpoint_states) { if (!_gossiper.is_normal(addr)) { @@ -712,7 +712,7 @@ future<> generation_service::check_and_repair_cdc_streams() { " ({} is in state {})", addr, _gossiper.get_gossip_status(state))); } - const auto ts = get_streams_timestamp_for(addr, _gossiper); + const auto ts = get_generation_id_for(addr, _gossiper); if (!latest || (ts && *ts > *latest)) { latest = ts; } @@ -773,16 +773,16 @@ future<> generation_service::check_and_repair_cdc_streams() { } if (!should_regenerate) { - if (latest != _gen_ts) { + if (latest != _gen_id) { co_await do_handle_cdc_generation(*latest); } cdc_log.info("CDC generation {} does not need repair", latest); co_return; } - const auto new_gen_ts = co_await make_new_cdc_generation(_cfg, + const auto new_gen_id = co_await make_new_cdc_generation(_cfg, {}, std::move(tmptr), _gossiper, *sys_dist_ks, std::chrono::milliseconds(_cfg.ring_delay_ms()), true /* add delay */); - // Need to artificially update our STATUS so other nodes handle the timestamp change + // Need to artificially update our STATUS so other nodes handle the generation ID change auto status = _gossiper.get_application_state_ptr( utils::fb_utilities::get_broadcast_address(), gms::application_state::STATUS); if (!status) { @@ -790,14 +790,14 @@ future<> generation_service::check_and_repair_cdc_streams() { cdc_log.error("Aborting CDC generation repair due to missing STATUS"); co_return; } - // Update _gen_ts first, so that do_handle_cdc_generation (which will get called due to the status update) + // Update _gen_id first, so that do_handle_cdc_generation (which will get called due to the status update) // won't try to update the gossiper, which would result in a deadlock inside add_local_application_state - _gen_ts = new_gen_ts; + _gen_id = new_gen_id; co_await _gossiper.add_local_application_state({ - { gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(new_gen_ts) }, + { gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(new_gen_id) }, { gms::application_state::STATUS, *status } }); - co_await db::system_keyspace::update_cdc_streams_timestamp(new_gen_ts); + co_await db::system_keyspace::update_cdc_generation_id(new_gen_id); } future<> generation_service::handle_cdc_generation(std::optional ts) { @@ -881,7 +881,7 @@ future<> generation_service::scan_cdc_generations() { std::optional latest; for (const auto& ep: _gossiper.get_endpoint_states()) { - auto ts = get_streams_timestamp_for(ep.first, _gossiper); + auto ts = get_generation_id_for(ep.first, _gossiper); if (!latest || (ts && *ts > *latest)) { latest = ts; } @@ -928,17 +928,17 @@ future generation_service::do_handle_cdc_generation(db_clock::time_point t ts, db_clock::now())); } - // If we're not gossiping our own generation timestamp (because we've upgraded from a non-CDC/old version, - // or we somehow lost it due to a byzantine failure), start gossiping someone else's timestamp. - // This is to avoid the upgrade check on every restart (see `should_propose_first_cdc_generation`). - // And if we notice that `ts` is higher than our timestamp, we will start gossiping it instead, - // so if the node that initially gossiped `ts` leaves the cluster while `ts` is still the latest generation, - // the cluster will remember. - if (!_gen_ts || *_gen_ts < ts) { - _gen_ts = ts; - co_await db::system_keyspace::update_cdc_streams_timestamp(ts); + // We always gossip about the generation with the greatest timestamp. Specific nodes may remember older generations, + // but eventually they forget when their clocks move past the latest generation's timestamp. + // The cluster as a whole is only interested in the last generation so restarting nodes may learn what it is. + // We assume that generation changes don't happen ``too often'' so every node can learn about a generation + // before it is superseded by a newer one which causes nodes to start gossiping the about the newer one. + // The assumption follows from the requirement of bootstrapping nodes sequentially. + if (!_gen_id || *_gen_id < ts) { + _gen_id = ts; + co_await db::system_keyspace::update_cdc_generation_id(ts); co_await _gossiper.add_local_application_state( - gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(ts)); + gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(ts)); } // Return `true` iff the generation was inserted on any of our shards. diff --git a/cdc/generation.hh b/cdc/generation.hh index 9da89b3148..cc92289b76 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -22,8 +22,7 @@ /* This module contains classes and functions used to manage CDC generations: * sets of CDC stream identifiers used by the cluster to choose partition keys for CDC log writes. - * Each CDC generation begins operating at a specific time point, called the generation's timestamp - * (`cdc_streams_timpestamp` or `streams_timestamp` in the code). + * Each CDC generation begins operating at a specific time point, called the generation's timestamp. * The generation is used by all nodes in the cluster to pick CDC streams until superseded by a new generation. * * Functions from this module are used by the node joining procedure to introduce new CDC generations to the cluster diff --git a/cdc/generation_service.hh b/cdc/generation_service.hh index acbbad8ca6..6d3533fa32 100644 --- a/cdc/generation_service.hh +++ b/cdc/generation_service.hh @@ -56,7 +56,7 @@ class generation_service : public peering_sharded_service cdc::metadata _cdc_metadata; /* The latest known generation timestamp and the timestamp that we're currently gossiping - * (as CDC_STREAMS_TIMESTAMP application state). + * (as CDC_GENERATION_ID application state). * * Only shard 0 manages this, hence it will be std::nullopt on all shards other than 0. * This timestamp is also persisted in the system.cdc_local table. @@ -67,7 +67,7 @@ class generation_service : public peering_sharded_service * different node. In any case, eventually - after one of the nodes gossips the first timestamp * - we'll catch on and this variable will be updated with that generation. */ - std::optional _gen_ts; + std::optional _gen_id; public: generation_service(const db::config&, gms::gossiper&, sharded&, abort_source&, const locator::shared_token_metadata&); @@ -77,7 +77,7 @@ public: /* After the node bootstraps and creates a new CDC generation, or restarts and loads the last * known generation timestamp from persistent storage, this function should be called with - * that generation timestamp moved in as the `startup_gen_ts` parameter. + * that generation timestamp moved in as the `startup_gen_id` parameter. * This passes the responsibility of managing generations from the node startup code to this service; * until then, the service remains dormant. * At the time of writing this comment, the startup code is in `storage_service::join_token_ring`, hence @@ -85,7 +85,7 @@ public: * Precondition: the node has completed bootstrapping and system_distributed_keyspace is initialized. * Must be called on shard 0 - that's where the generation management happens. */ - future<> after_join(std::optional&& startup_gen_ts); + future<> after_join(std::optional&& startup_gen_id); cdc::metadata& get_cdc_metadata() { return _cdc_metadata; diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 79cf193eba..567bf746ff 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -96,11 +96,11 @@ public: future<> finish_view_build(sstring ks_name, sstring view_name) const; future<> remove_view(sstring ks_name, sstring view_name) const; - future<> insert_cdc_topology_description(db_clock::time_point streams_ts, const cdc::topology_description&, context); - future> read_cdc_topology_description(db_clock::time_point streams_ts, context); + future<> insert_cdc_topology_description(db_clock::time_point, const cdc::topology_description&, context); + future> read_cdc_topology_description(db_clock::time_point, context); - future<> create_cdc_desc(db_clock::time_point streams_ts, const cdc::topology_description&, context); - future cdc_desc_exists(db_clock::time_point streams_ts, context); + future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context); + future cdc_desc_exists(db_clock::time_point, context); /* Get all generation timestamps appearing in the "cdc_streams_descriptions" table * (the old CDC stream description table). */ diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 15ef5fb022..0278968be3 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1675,13 +1675,13 @@ future> get_local_tokens() { }); } -future<> update_cdc_streams_timestamp(db_clock::time_point tp) { +future<> update_cdc_generation_id(db_clock::time_point gen_id) { return qctx->execute_cql(format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", - v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), tp) + v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), gen_id) .discard_result().then([] { return force_blocking_flush(v3::CDC_LOCAL); }); } -future> get_saved_cdc_streams_timestamp() { +future> get_cdc_generation_id() { return qctx->execute_cql(format("SELECT streams_timestamp FROM system.{} WHERE key = ?", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL)) .then([] (::shared_ptr msg)-> std::optional { if (msg->empty() || !msg->one().has("streams_timestamp")) { @@ -1694,11 +1694,11 @@ future> get_saved_cdc_streams_timestamp() { static const sstring CDC_REWRITTEN_KEY = "rewritten"; -future<> cdc_set_rewritten(std::optional tp) { - if (tp) { +future<> cdc_set_rewritten(std::optional gen_id) { + if (gen_id) { return qctx->execute_cql( format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL), - CDC_REWRITTEN_KEY, *tp).discard_result(); + CDC_REWRITTEN_KEY, *gen_id).discard_result(); } else { // Insert just the row marker. return qctx->execute_cql( diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 1ca68d692c..f1edd6fc13 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -630,15 +630,15 @@ future<> delete_paxos_decision(const schema& s, const partition_key& key, const // CDC related functions /* - * Save the CDC streams generation timestamp announced by this node in persistent storage. + * Save the CDC generation ID announced by this node in persistent storage. */ -future<> update_cdc_streams_timestamp(db_clock::time_point); +future<> update_cdc_generation_id(db_clock::time_point); /* - * Read the CDC streams generation timestamp announced by this node from persistent storage. + * Read the CDC generation ID announced by this node from persistent storage. * Used to initialize a restarting node. */ -future> get_saved_cdc_streams_timestamp(); +future> get_cdc_generation_id(); future cdc_is_rewritten(); future<> cdc_set_rewritten(std::optional); diff --git a/docs/design-notes/cdc.md b/docs/design-notes/cdc.md index cfc524e6b6..e33eebbf78 100644 --- a/docs/design-notes/cdc.md +++ b/docs/design-notes/cdc.md @@ -141,7 +141,7 @@ Next, the node starts gossiping the timestamp of the new generation together wit ``` _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, - { gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) }, + { gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) }, { gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) }, }).get(); ``` diff --git a/gms/application_state.cc b/gms/application_state.cc index 7b5210bdcb..92a43effbc 100644 --- a/gms/application_state.cc +++ b/gms/application_state.cc @@ -67,7 +67,7 @@ static const std::map application_state_names = { {application_state::VIEW_BACKLOG, "VIEW_BACKLOG"}, {application_state::SHARD_COUNT, "SHARD_COUNT"}, {application_state::IGNORE_MSB_BITS, "IGNOR_MSB_BITS"}, - {application_state::CDC_STREAMS_TIMESTAMP, "CDC_STREAMS_TIMESTAMP"}, + {application_state::CDC_GENERATION_ID, "CDC_STREAMS_TIMESTAMP"}, /* not named "CDC_GENERATION_ID" for backward compatibility */ {application_state::SNITCH_NAME, "SNITCH_NAME"}, }; diff --git a/gms/application_state.hh b/gms/application_state.hh index 68613491c2..a57e3a6a07 100644 --- a/gms/application_state.hh +++ b/gms/application_state.hh @@ -64,7 +64,7 @@ enum class application_state { VIEW_BACKLOG, SHARD_COUNT, IGNORE_MSB_BITS, - CDC_STREAMS_TIMESTAMP, + CDC_GENERATION_ID, SNITCH_NAME, // pad to allow adding new states to existing cluster X10, diff --git a/gms/versioned_value.cc b/gms/versioned_value.cc index 1fe5d7b749..8cc892e847 100644 --- a/gms/versioned_value.cc +++ b/gms/versioned_value.cc @@ -74,7 +74,7 @@ sstring versioned_value::make_token_string(const std::unordered_set& return tokens.begin()->to_sstring(); } -sstring versioned_value::make_cdc_streams_timestamp_string(std::optional t) { +sstring versioned_value::make_cdc_generation_id_string(std::optional t) { // We assume that the db_clock epoch is the same on all receiving nodes. if (!t) { return ""; @@ -95,7 +95,7 @@ std::unordered_set versioned_value::tokens_from_string(const sstring return ret; } -std::optional versioned_value::cdc_streams_timestamp_from_string(const sstring& s) { +std::optional versioned_value::cdc_generation_id_from_string(const sstring& s) { if (s.empty()) { return {}; } diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index e03c254841..36a702ffc4 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -129,13 +129,13 @@ public: static sstring make_full_token_string(const std::unordered_set& tokens); static sstring make_token_string(const std::unordered_set& tokens); - static sstring make_cdc_streams_timestamp_string(std::optional t); + static sstring make_cdc_generation_id_string(std::optional t); // Reverse of `make_full_token_string`. static std::unordered_set tokens_from_string(const sstring&); - // Reverse of `make_cdc_streams_timestamp_string`. - static std::optional cdc_streams_timestamp_from_string(const sstring&); + // Reverse of `make_cdc_generation_id_string`. + static std::optional cdc_generation_id_from_string(const sstring&); static versioned_value clone_with_higher_version(const versioned_value& value) noexcept { return versioned_value(value.value); @@ -184,8 +184,8 @@ public: return versioned_value(make_full_token_string(tokens)); } - static versioned_value cdc_streams_timestamp(std::optional t) { - return versioned_value(make_cdc_streams_timestamp_string(t)); + static versioned_value cdc_generation_id(std::optional t) { + return versioned_value(make_cdc_generation_id_string(t)); } static versioned_value removing_nonlocal(const utils::UUID& host_id) { diff --git a/service/storage_service.cc b/service/storage_service.cc index aefa431091..64862b5879 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -316,8 +316,8 @@ void storage_service::prepare_to_join( // Therefore we update _token_metadata now, before gossip starts. tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get(); - _cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0(); - if (!_cdc_streams_ts) { + _cdc_gen_id = db::system_keyspace::get_cdc_generation_id().get0(); + if (!_cdc_gen_id) { // We could not have completed joining if we didn't generate and persist a CDC streams timestamp, // unless we are restarting after upgrading from non-CDC supported version. // In that case we won't begin a CDC generation: it should be done by one of the nodes @@ -368,7 +368,7 @@ void storage_service::prepare_to_join( // Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status. // Exception: there might be no CDC streams timestamp proposed by us if we're upgrading from a non-CDC version. app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(my_tokens)); - app_states.emplace(gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts)); + app_states.emplace(gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id)); app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens)); } if (replacing_a_node_with_same_ip || replacing_a_node_with_diff_ip) { @@ -404,18 +404,18 @@ void storage_service::maybe_start_sys_dist_ks() { /* Broadcasts the chosen tokens through gossip, * together with a CDC generation timestamp and STATUS=NORMAL. * - * Assumes that no other functions modify CDC_STREAMS_TIMESTAMP, TOKENS or STATUS + * Assumes that no other functions modify CDC_GENERATION_ID, TOKENS or STATUS * in the gossiper's local application state while this function runs. */ // Runs inside seastar::async context static void set_gossip_tokens(gms::gossiper& g, - const std::unordered_set& tokens, std::optional cdc_streams_ts) { + const std::unordered_set& tokens, std::optional cdc_gen_id) { assert(!tokens.empty()); // Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status. g.add_local_application_state({ { gms::application_state::TOKENS, gms::versioned_value::tokens(tokens) }, - { gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(cdc_streams_ts) }, + { gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(cdc_gen_id) }, { gms::application_state::STATUS, gms::versioned_value::normal(tokens) } }).get(); } @@ -577,14 +577,14 @@ void storage_service::join_token_ring(int delay) { if (!db::system_keyspace::bootstrap_complete()) { // If we're not bootstrapping then we shouldn't have chosen a CDC streams timestamp yet. - assert(should_bootstrap() || !_cdc_streams_ts); + assert(should_bootstrap() || !_cdc_gen_id); // Don't try rewriting CDC stream description tables. // See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation. db::system_keyspace::cdc_set_rewritten(std::nullopt).get(); } - if (!_cdc_streams_ts) { + if (!_cdc_gen_id) { // If we didn't choose a CDC streams timestamp at this point, then either // 1. we're replacing a node, // 2. we've already bootstrapped, but are upgrading from a non-CDC version, @@ -605,7 +605,7 @@ void storage_service::join_token_ring(int delay) { && (!db::system_keyspace::bootstrap_complete() || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { try { - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _cdc_gen_id = cdc::make_new_cdc_generation(db().local().get_config(), _bootstrap_tokens, get_token_metadata_ptr(), _gossiper, _sys_dist_ks.local(), get_ring_delay(), !_for_testing && !is_first_node()).get0(); } catch (...) { @@ -617,17 +617,17 @@ void storage_service::join_token_ring(int delay) { } // Persist the CDC streams timestamp before we persist bootstrap_state = COMPLETED. - if (_cdc_streams_ts) { - db::system_keyspace::update_cdc_streams_timestamp(*_cdc_streams_ts).get(); + if (_cdc_gen_id) { + db::system_keyspace::update_cdc_generation_id(*_cdc_gen_id).get(); } // If we crash now, we will choose a new CDC streams timestamp anyway (because we will also choose a new set of tokens). // But if we crash after setting bootstrap_state = COMPLETED, we will keep using the persisted CDC streams timestamp after restarting. db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get(); - // At this point our local tokens and CDC streams timestamp are chosen (_bootstrap_tokens, _cdc_streams_ts) and will not be changed. + // At this point our local tokens and CDC streams timestamp are chosen (_bootstrap_tokens, _cdc_gen_id) and will not be changed. // start participating in the ring. - set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_streams_ts); + set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id); set_mode(mode::NORMAL, "node is now in normal status", true); if (get_token_metadata().sorted_tokens().empty()) { @@ -636,7 +636,7 @@ void storage_service::join_token_ring(int delay) { throw std::runtime_error(err); } - _cdc_gen_service.local().after_join(std::move(_cdc_streams_ts)).get(); + _cdc_gen_service.local().after_join(std::move(_cdc_gen_id)).get(); // Ensure that the new CDC stream description table has all required streams. // See the function's comment for details. @@ -681,16 +681,16 @@ void storage_service::bootstrap() { // After we pick a generation timestamp, we start gossiping it, and we stick with it. // We don't do any other generation switches (unless we crash before complecting bootstrap). - assert(!_cdc_streams_ts); + assert(!_cdc_gen_id); - _cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(), + _cdc_gen_id = cdc::make_new_cdc_generation(db().local().get_config(), _bootstrap_tokens, get_token_metadata_ptr(), _gossiper, _sys_dist_ks.local(), get_ring_delay(), !_for_testing && !is_first_node()).get0(); _gossiper.add_local_application_state({ // Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status. { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, - { gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) }, + { gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) }, { gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) }, }).get(); @@ -705,7 +705,7 @@ void storage_service::bootstrap() { set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true); _gossiper.add_local_application_state({ { gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) }, - { gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) }, + { gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) }, { gms::application_state::STATUS, versioned_value::hibernate(true) }, }).get(); _gossiper.advertise_myself().get(); @@ -1760,7 +1760,7 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) { return seastar::async([&ss, do_bind] { if (!ss._initialized) { slogger.warn("Starting gossip by operator request"); - auto cdc_gen_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0(); + auto cdc_gen_ts = db::system_keyspace::get_cdc_generation_id().get0(); if (!cdc_gen_ts) { cdc_log.warn("CDC generation timestamp missing when starting gossip"); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 0ae160ac79..47d1acefa7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -336,7 +336,7 @@ private: * DO NOT use this variable after `join_token_ring` (i.e. after we call `generation_service::after_join` * and pass it the ownership of the timestamp. */ - std::optional _cdc_streams_ts; + std::optional _cdc_gen_id; public: void enable_all_features();