tree-wide: rename "cdc streams timestamp" to "cdc generation id"

Each CDC generation always has a timestamp, but the fact that the
timestamp identifies the generation is an implementation detail.
We abstract away from this detail by using a more generic naming scheme:
a generation "identifier" (whatever that is - a timestamp or something
else).

It's possible that a CDC generation will be identified by more than a
timestamp in the (near) future.

The actual string gossiped by nodes in their application state is left
as "CDC_STREAMS_TIMESTAMP" for backward compatibility.

Some stale comments have been updated.
This commit is contained in:
Kamil Braun
2021-03-11 12:19:36 +01:00
parent 0cb2f58514
commit e486e0f759
13 changed files with 94 additions and 95 deletions

View File

@@ -367,30 +367,30 @@ future<db_clock::time_point> make_new_cdc_generation(
* but if the cluster already supports CDC, then every newly joining node will propose a new CDC generation,
* which means it will gossip the generation's timestamp.
*/
static std::optional<db_clock::time_point> get_streams_timestamp_for(const gms::inet_address& endpoint, const gms::gossiper& g) {
auto streams_ts_string = g.get_application_state_value(endpoint, gms::application_state::CDC_STREAMS_TIMESTAMP);
cdc_log.trace("endpoint={}, streams_ts_string={}", endpoint, streams_ts_string);
return gms::versioned_value::cdc_streams_timestamp_from_string(streams_ts_string);
static std::optional<db_clock::time_point> get_generation_id_for(const gms::inet_address& endpoint, const gms::gossiper& g) {
auto gen_id_string = g.get_application_state_value(endpoint, gms::application_state::CDC_GENERATION_ID);
cdc_log.trace("endpoint={}, gen_id_string={}", endpoint, gen_id_string);
return gms::versioned_value::cdc_generation_id_from_string(gen_id_string);
}
static future<> do_update_streams_description(
db_clock::time_point streams_ts,
db_clock::time_point gen_id,
db::system_distributed_keyspace& sys_dist_ks,
db::system_distributed_keyspace::context ctx) {
if (co_await sys_dist_ks.cdc_desc_exists(streams_ts, ctx)) {
cdc_log.info("Generation {}: streams description table already updated.", streams_ts);
if (co_await sys_dist_ks.cdc_desc_exists(gen_id, ctx)) {
cdc_log.info("Generation {}: streams description table already updated.", gen_id);
co_return;
}
// We might race with another node also inserting the description, but that's ok. It's an idempotent operation.
auto topo = co_await sys_dist_ks.read_cdc_topology_description(streams_ts, ctx);
auto topo = co_await sys_dist_ks.read_cdc_topology_description(gen_id, ctx);
if (!topo) {
throw no_generation_data_exception(streams_ts);
throw no_generation_data_exception(gen_id);
}
co_await sys_dist_ks.create_cdc_desc(streams_ts, *topo, ctx);
cdc_log.info("CDC description table successfully updated with generation {}.", streams_ts);
co_await sys_dist_ks.create_cdc_desc(gen_id, *topo, ctx);
cdc_log.info("CDC description table successfully updated with generation {}.", gen_id);
}
/* Inform CDC users about a generation of streams (identified by the given timestamp)
@@ -402,34 +402,34 @@ static future<> do_update_streams_description(
* might run an asynchronous task in the background.
*/
static future<> update_streams_description(
db_clock::time_point streams_ts,
db_clock::time_point gen_id,
shared_ptr<db::system_distributed_keyspace> sys_dist_ks,
noncopyable_function<unsigned()> get_num_token_owners,
abort_source& abort_src) {
try {
co_await do_update_streams_description(streams_ts, *sys_dist_ks, { get_num_token_owners() });
co_await do_update_streams_description(gen_id, *sys_dist_ks, { get_num_token_owners() });
} catch (...) {
cdc_log.warn(
"Could not update CDC description table with generation {}: {}. Will retry in the background.",
streams_ts, std::current_exception());
gen_id, std::current_exception());
// It is safe to discard this future: we keep system distributed keyspace alive.
(void)(([] (db_clock::time_point streams_ts,
(void)(([] (db_clock::time_point gen_id,
shared_ptr<db::system_distributed_keyspace> sys_dist_ks,
noncopyable_function<unsigned()> get_num_token_owners,
abort_source& abort_src) -> future<> {
while (true) {
co_await sleep_abortable(std::chrono::seconds(60), abort_src);
try {
co_await do_update_streams_description(streams_ts, *sys_dist_ks, { get_num_token_owners() });
co_await do_update_streams_description(gen_id, *sys_dist_ks, { get_num_token_owners() });
co_return;
} catch (...) {
cdc_log.warn(
"Could not update CDC description table with generation {}: {}. Will try again.",
streams_ts, std::current_exception());
gen_id, std::current_exception());
}
}
})(streams_ts, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src));
})(gen_id, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src));
}
}
@@ -662,11 +662,11 @@ generation_service::~generation_service() {
assert(_stopped);
}
future<> generation_service::after_join(std::optional<db_clock::time_point>&& startup_gen_ts) {
future<> generation_service::after_join(std::optional<db_clock::time_point>&& startup_gen_id) {
assert_shard_zero(__PRETTY_FUNCTION__);
assert(db::system_keyspace::bootstrap_complete());
_gen_ts = std::move(startup_gen_ts);
_gen_id = std::move(startup_gen_id);
_gossiper.register_(shared_from_this());
_joined = true;
@@ -678,23 +678,23 @@ future<> generation_service::after_join(std::optional<db_clock::time_point>&& st
void generation_service::on_join(gms::inet_address ep, gms::endpoint_state ep_state) {
assert_shard_zero(__PRETTY_FUNCTION__);
auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_STREAMS_TIMESTAMP);
auto val = ep_state.get_application_state_ptr(gms::application_state::CDC_GENERATION_ID);
if (!val) {
return;
}
on_change(ep, gms::application_state::CDC_STREAMS_TIMESTAMP, *val);
on_change(ep, gms::application_state::CDC_GENERATION_ID, *val);
}
void generation_service::on_change(gms::inet_address ep, gms::application_state app_state, const gms::versioned_value& v) {
assert_shard_zero(__PRETTY_FUNCTION__);
if (app_state != gms::application_state::CDC_STREAMS_TIMESTAMP) {
if (app_state != gms::application_state::CDC_GENERATION_ID) {
return;
}
auto ts = gms::versioned_value::cdc_streams_timestamp_from_string(v.value);
cdc_log.debug("Endpoint: {}, CDC generation timestamp change: {}", ep, ts);
auto ts = gms::versioned_value::cdc_generation_id_from_string(v.value);
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, ts);
handle_cdc_generation(ts).get();
}
@@ -704,7 +704,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
throw std::runtime_error("check_and_repair_cdc_streams: node not initialized yet");
}
auto latest = _gen_ts;
auto latest = _gen_id;
const auto& endpoint_states = _gossiper.get_endpoint_states();
for (const auto& [addr, state] : endpoint_states) {
if (!_gossiper.is_normal(addr)) {
@@ -712,7 +712,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
" ({} is in state {})", addr, _gossiper.get_gossip_status(state)));
}
const auto ts = get_streams_timestamp_for(addr, _gossiper);
const auto ts = get_generation_id_for(addr, _gossiper);
if (!latest || (ts && *ts > *latest)) {
latest = ts;
}
@@ -773,16 +773,16 @@ future<> generation_service::check_and_repair_cdc_streams() {
}
if (!should_regenerate) {
if (latest != _gen_ts) {
if (latest != _gen_id) {
co_await do_handle_cdc_generation(*latest);
}
cdc_log.info("CDC generation {} does not need repair", latest);
co_return;
}
const auto new_gen_ts = co_await make_new_cdc_generation(_cfg,
const auto new_gen_id = co_await make_new_cdc_generation(_cfg,
{}, std::move(tmptr), _gossiper, *sys_dist_ks,
std::chrono::milliseconds(_cfg.ring_delay_ms()), true /* add delay */);
// Need to artificially update our STATUS so other nodes handle the timestamp change
// Need to artificially update our STATUS so other nodes handle the generation ID change
auto status = _gossiper.get_application_state_ptr(
utils::fb_utilities::get_broadcast_address(), gms::application_state::STATUS);
if (!status) {
@@ -790,14 +790,14 @@ future<> generation_service::check_and_repair_cdc_streams() {
cdc_log.error("Aborting CDC generation repair due to missing STATUS");
co_return;
}
// Update _gen_ts first, so that do_handle_cdc_generation (which will get called due to the status update)
// Update _gen_id first, so that do_handle_cdc_generation (which will get called due to the status update)
// won't try to update the gossiper, which would result in a deadlock inside add_local_application_state
_gen_ts = new_gen_ts;
_gen_id = new_gen_id;
co_await _gossiper.add_local_application_state({
{ gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(new_gen_ts) },
{ gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(new_gen_id) },
{ gms::application_state::STATUS, *status }
});
co_await db::system_keyspace::update_cdc_streams_timestamp(new_gen_ts);
co_await db::system_keyspace::update_cdc_generation_id(new_gen_id);
}
future<> generation_service::handle_cdc_generation(std::optional<db_clock::time_point> ts) {
@@ -881,7 +881,7 @@ future<> generation_service::scan_cdc_generations() {
std::optional<db_clock::time_point> latest;
for (const auto& ep: _gossiper.get_endpoint_states()) {
auto ts = get_streams_timestamp_for(ep.first, _gossiper);
auto ts = get_generation_id_for(ep.first, _gossiper);
if (!latest || (ts && *ts > *latest)) {
latest = ts;
}
@@ -928,17 +928,17 @@ future<bool> generation_service::do_handle_cdc_generation(db_clock::time_point t
ts, db_clock::now()));
}
// If we're not gossiping our own generation timestamp (because we've upgraded from a non-CDC/old version,
// or we somehow lost it due to a byzantine failure), start gossiping someone else's timestamp.
// This is to avoid the upgrade check on every restart (see `should_propose_first_cdc_generation`).
// And if we notice that `ts` is higher than our timestamp, we will start gossiping it instead,
// so if the node that initially gossiped `ts` leaves the cluster while `ts` is still the latest generation,
// the cluster will remember.
if (!_gen_ts || *_gen_ts < ts) {
_gen_ts = ts;
co_await db::system_keyspace::update_cdc_streams_timestamp(ts);
// We always gossip about the generation with the greatest timestamp. Specific nodes may remember older generations,
// but eventually they forget when their clocks move past the latest generation's timestamp.
// The cluster as a whole is only interested in the last generation so restarting nodes may learn what it is.
// We assume that generation changes don't happen ``too often'' so every node can learn about a generation
// before it is superseded by a newer one which causes nodes to start gossiping the about the newer one.
// The assumption follows from the requirement of bootstrapping nodes sequentially.
if (!_gen_id || *_gen_id < ts) {
_gen_id = ts;
co_await db::system_keyspace::update_cdc_generation_id(ts);
co_await _gossiper.add_local_application_state(
gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(ts));
gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(ts));
}
// Return `true` iff the generation was inserted on any of our shards.

View File

@@ -22,8 +22,7 @@
/* This module contains classes and functions used to manage CDC generations:
* sets of CDC stream identifiers used by the cluster to choose partition keys for CDC log writes.
* Each CDC generation begins operating at a specific time point, called the generation's timestamp
* (`cdc_streams_timpestamp` or `streams_timestamp` in the code).
* Each CDC generation begins operating at a specific time point, called the generation's timestamp.
* The generation is used by all nodes in the cluster to pick CDC streams until superseded by a new generation.
*
* Functions from this module are used by the node joining procedure to introduce new CDC generations to the cluster

View File

@@ -56,7 +56,7 @@ class generation_service : public peering_sharded_service<generation_service>
cdc::metadata _cdc_metadata;
/* The latest known generation timestamp and the timestamp that we're currently gossiping
* (as CDC_STREAMS_TIMESTAMP application state).
* (as CDC_GENERATION_ID application state).
*
* Only shard 0 manages this, hence it will be std::nullopt on all shards other than 0.
* This timestamp is also persisted in the system.cdc_local table.
@@ -67,7 +67,7 @@ class generation_service : public peering_sharded_service<generation_service>
* different node. In any case, eventually - after one of the nodes gossips the first timestamp
* - we'll catch on and this variable will be updated with that generation.
*/
std::optional<db_clock::time_point> _gen_ts;
std::optional<db_clock::time_point> _gen_id;
public:
generation_service(const db::config&, gms::gossiper&,
sharded<db::system_distributed_keyspace>&, abort_source&, const locator::shared_token_metadata&);
@@ -77,7 +77,7 @@ public:
/* After the node bootstraps and creates a new CDC generation, or restarts and loads the last
* known generation timestamp from persistent storage, this function should be called with
* that generation timestamp moved in as the `startup_gen_ts` parameter.
* that generation timestamp moved in as the `startup_gen_id` parameter.
* This passes the responsibility of managing generations from the node startup code to this service;
* until then, the service remains dormant.
* At the time of writing this comment, the startup code is in `storage_service::join_token_ring`, hence
@@ -85,7 +85,7 @@ public:
* Precondition: the node has completed bootstrapping and system_distributed_keyspace is initialized.
* Must be called on shard 0 - that's where the generation management happens.
*/
future<> after_join(std::optional<db_clock::time_point>&& startup_gen_ts);
future<> after_join(std::optional<db_clock::time_point>&& startup_gen_id);
cdc::metadata& get_cdc_metadata() {
return _cdc_metadata;

View File

@@ -96,11 +96,11 @@ public:
future<> finish_view_build(sstring ks_name, sstring view_name) const;
future<> remove_view(sstring ks_name, sstring view_name) const;
future<> insert_cdc_topology_description(db_clock::time_point streams_ts, const cdc::topology_description&, context);
future<std::optional<cdc::topology_description>> read_cdc_topology_description(db_clock::time_point streams_ts, context);
future<> insert_cdc_topology_description(db_clock::time_point, const cdc::topology_description&, context);
future<std::optional<cdc::topology_description>> read_cdc_topology_description(db_clock::time_point, context);
future<> create_cdc_desc(db_clock::time_point streams_ts, const cdc::topology_description&, context);
future<bool> cdc_desc_exists(db_clock::time_point streams_ts, context);
future<> create_cdc_desc(db_clock::time_point, const cdc::topology_description&, context);
future<bool> cdc_desc_exists(db_clock::time_point, context);
/* Get all generation timestamps appearing in the "cdc_streams_descriptions" table
* (the old CDC stream description table). */

View File

@@ -1675,13 +1675,13 @@ future<std::unordered_set<dht::token>> get_local_tokens() {
});
}
future<> update_cdc_streams_timestamp(db_clock::time_point tp) {
future<> update_cdc_generation_id(db_clock::time_point gen_id) {
return qctx->execute_cql(format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)",
v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), tp)
v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), gen_id)
.discard_result().then([] { return force_blocking_flush(v3::CDC_LOCAL); });
}
future<std::optional<db_clock::time_point>> get_saved_cdc_streams_timestamp() {
future<std::optional<db_clock::time_point>> get_cdc_generation_id() {
return qctx->execute_cql(format("SELECT streams_timestamp FROM system.{} WHERE key = ?", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL))
.then([] (::shared_ptr<cql3::untyped_result_set> msg)-> std::optional<db_clock::time_point> {
if (msg->empty() || !msg->one().has("streams_timestamp")) {
@@ -1694,11 +1694,11 @@ future<std::optional<db_clock::time_point>> get_saved_cdc_streams_timestamp() {
static const sstring CDC_REWRITTEN_KEY = "rewritten";
future<> cdc_set_rewritten(std::optional<db_clock::time_point> tp) {
if (tp) {
future<> cdc_set_rewritten(std::optional<db_clock::time_point> gen_id) {
if (gen_id) {
return qctx->execute_cql(
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
CDC_REWRITTEN_KEY, *tp).discard_result();
CDC_REWRITTEN_KEY, *gen_id).discard_result();
} else {
// Insert just the row marker.
return qctx->execute_cql(

View File

@@ -630,15 +630,15 @@ future<> delete_paxos_decision(const schema& s, const partition_key& key, const
// CDC related functions
/*
* Save the CDC streams generation timestamp announced by this node in persistent storage.
* Save the CDC generation ID announced by this node in persistent storage.
*/
future<> update_cdc_streams_timestamp(db_clock::time_point);
future<> update_cdc_generation_id(db_clock::time_point);
/*
* Read the CDC streams generation timestamp announced by this node from persistent storage.
* Read the CDC generation ID announced by this node from persistent storage.
* Used to initialize a restarting node.
*/
future<std::optional<db_clock::time_point>> get_saved_cdc_streams_timestamp();
future<std::optional<db_clock::time_point>> get_cdc_generation_id();
future<bool> cdc_is_rewritten();
future<> cdc_set_rewritten(std::optional<db_clock::time_point>);

View File

@@ -141,7 +141,7 @@ Next, the node starts gossiping the timestamp of the new generation together wit
```
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) },
{ gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) },
{ gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) },
{ gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) },
}).get();
```

View File

@@ -67,7 +67,7 @@ static const std::map<application_state, sstring> application_state_names = {
{application_state::VIEW_BACKLOG, "VIEW_BACKLOG"},
{application_state::SHARD_COUNT, "SHARD_COUNT"},
{application_state::IGNORE_MSB_BITS, "IGNOR_MSB_BITS"},
{application_state::CDC_STREAMS_TIMESTAMP, "CDC_STREAMS_TIMESTAMP"},
{application_state::CDC_GENERATION_ID, "CDC_STREAMS_TIMESTAMP"}, /* not named "CDC_GENERATION_ID" for backward compatibility */
{application_state::SNITCH_NAME, "SNITCH_NAME"},
};

View File

@@ -64,7 +64,7 @@ enum class application_state {
VIEW_BACKLOG,
SHARD_COUNT,
IGNORE_MSB_BITS,
CDC_STREAMS_TIMESTAMP,
CDC_GENERATION_ID,
SNITCH_NAME,
// pad to allow adding new states to existing cluster
X10,

View File

@@ -74,7 +74,7 @@ sstring versioned_value::make_token_string(const std::unordered_set<dht::token>&
return tokens.begin()->to_sstring();
}
sstring versioned_value::make_cdc_streams_timestamp_string(std::optional<db_clock::time_point> t) {
sstring versioned_value::make_cdc_generation_id_string(std::optional<db_clock::time_point> t) {
// We assume that the db_clock epoch is the same on all receiving nodes.
if (!t) {
return "";
@@ -95,7 +95,7 @@ std::unordered_set<dht::token> versioned_value::tokens_from_string(const sstring
return ret;
}
std::optional<db_clock::time_point> versioned_value::cdc_streams_timestamp_from_string(const sstring& s) {
std::optional<db_clock::time_point> versioned_value::cdc_generation_id_from_string(const sstring& s) {
if (s.empty()) {
return {};
}

View File

@@ -129,13 +129,13 @@ public:
static sstring make_full_token_string(const std::unordered_set<dht::token>& tokens);
static sstring make_token_string(const std::unordered_set<dht::token>& tokens);
static sstring make_cdc_streams_timestamp_string(std::optional<db_clock::time_point> t);
static sstring make_cdc_generation_id_string(std::optional<db_clock::time_point> t);
// Reverse of `make_full_token_string`.
static std::unordered_set<dht::token> tokens_from_string(const sstring&);
// Reverse of `make_cdc_streams_timestamp_string`.
static std::optional<db_clock::time_point> cdc_streams_timestamp_from_string(const sstring&);
// Reverse of `make_cdc_generation_id_string`.
static std::optional<db_clock::time_point> cdc_generation_id_from_string(const sstring&);
static versioned_value clone_with_higher_version(const versioned_value& value) noexcept {
return versioned_value(value.value);
@@ -184,8 +184,8 @@ public:
return versioned_value(make_full_token_string(tokens));
}
static versioned_value cdc_streams_timestamp(std::optional<db_clock::time_point> t) {
return versioned_value(make_cdc_streams_timestamp_string(t));
static versioned_value cdc_generation_id(std::optional<db_clock::time_point> t) {
return versioned_value(make_cdc_generation_id_string(t));
}
static versioned_value removing_nonlocal(const utils::UUID& host_id) {

View File

@@ -316,8 +316,8 @@ void storage_service::prepare_to_join(
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_normal_tokens(my_tokens, get_broadcast_address()).get();
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
if (!_cdc_streams_ts) {
_cdc_gen_id = db::system_keyspace::get_cdc_generation_id().get0();
if (!_cdc_gen_id) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
// In that case we won't begin a CDC generation: it should be done by one of the nodes
@@ -368,7 +368,7 @@ void storage_service::prepare_to_join(
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
// Exception: there might be no CDC streams timestamp proposed by us if we're upgrading from a non-CDC version.
app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(my_tokens));
app_states.emplace(gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts));
app_states.emplace(gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id));
app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens));
}
if (replacing_a_node_with_same_ip || replacing_a_node_with_diff_ip) {
@@ -404,18 +404,18 @@ void storage_service::maybe_start_sys_dist_ks() {
/* Broadcasts the chosen tokens through gossip,
* together with a CDC generation timestamp and STATUS=NORMAL.
*
* Assumes that no other functions modify CDC_STREAMS_TIMESTAMP, TOKENS or STATUS
* Assumes that no other functions modify CDC_GENERATION_ID, TOKENS or STATUS
* in the gossiper's local application state while this function runs.
*/
// Runs inside seastar::async context
static void set_gossip_tokens(gms::gossiper& g,
const std::unordered_set<dht::token>& tokens, std::optional<db_clock::time_point> cdc_streams_ts) {
const std::unordered_set<dht::token>& tokens, std::optional<db_clock::time_point> cdc_gen_id) {
assert(!tokens.empty());
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
g.add_local_application_state({
{ gms::application_state::TOKENS, gms::versioned_value::tokens(tokens) },
{ gms::application_state::CDC_STREAMS_TIMESTAMP, gms::versioned_value::cdc_streams_timestamp(cdc_streams_ts) },
{ gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(cdc_gen_id) },
{ gms::application_state::STATUS, gms::versioned_value::normal(tokens) }
}).get();
}
@@ -577,14 +577,14 @@ void storage_service::join_token_ring(int delay) {
if (!db::system_keyspace::bootstrap_complete()) {
// If we're not bootstrapping then we shouldn't have chosen a CDC streams timestamp yet.
assert(should_bootstrap() || !_cdc_streams_ts);
assert(should_bootstrap() || !_cdc_gen_id);
// Don't try rewriting CDC stream description tables.
// See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation.
db::system_keyspace::cdc_set_rewritten(std::nullopt).get();
}
if (!_cdc_streams_ts) {
if (!_cdc_gen_id) {
// If we didn't choose a CDC streams timestamp at this point, then either
// 1. we're replacing a node,
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
@@ -605,7 +605,7 @@ void storage_service::join_token_ring(int delay) {
&& (!db::system_keyspace::bootstrap_complete()
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
try {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_cdc_gen_id = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
_sys_dist_ks.local(), get_ring_delay(), !_for_testing && !is_first_node()).get0();
} catch (...) {
@@ -617,17 +617,17 @@ void storage_service::join_token_ring(int delay) {
}
// Persist the CDC streams timestamp before we persist bootstrap_state = COMPLETED.
if (_cdc_streams_ts) {
db::system_keyspace::update_cdc_streams_timestamp(*_cdc_streams_ts).get();
if (_cdc_gen_id) {
db::system_keyspace::update_cdc_generation_id(*_cdc_gen_id).get();
}
// If we crash now, we will choose a new CDC streams timestamp anyway (because we will also choose a new set of tokens).
// But if we crash after setting bootstrap_state = COMPLETED, we will keep using the persisted CDC streams timestamp after restarting.
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
// At this point our local tokens and CDC streams timestamp are chosen (_bootstrap_tokens, _cdc_streams_ts) and will not be changed.
// At this point our local tokens and CDC streams timestamp are chosen (_bootstrap_tokens, _cdc_gen_id) and will not be changed.
// start participating in the ring.
set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_streams_ts);
set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id);
set_mode(mode::NORMAL, "node is now in normal status", true);
if (get_token_metadata().sorted_tokens().empty()) {
@@ -636,7 +636,7 @@ void storage_service::join_token_ring(int delay) {
throw std::runtime_error(err);
}
_cdc_gen_service.local().after_join(std::move(_cdc_streams_ts)).get();
_cdc_gen_service.local().after_join(std::move(_cdc_gen_id)).get();
// Ensure that the new CDC stream description table has all required streams.
// See the function's comment for details.
@@ -681,16 +681,16 @@ void storage_service::bootstrap() {
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!_cdc_streams_ts);
assert(!_cdc_gen_id);
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_cdc_gen_id = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, get_token_metadata_ptr(), _gossiper,
_sys_dist_ks.local(), get_ring_delay(), !_for_testing && !is_first_node()).get0();
_gossiper.add_local_application_state({
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
{ gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) },
{ gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) },
{ gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) },
{ gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) },
}).get();
@@ -705,7 +705,7 @@ void storage_service::bootstrap() {
set_mode(mode::JOINING, sprint("Announce tokens and status of the replacing node"), true);
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, versioned_value::tokens(_bootstrap_tokens) },
{ gms::application_state::CDC_STREAMS_TIMESTAMP, versioned_value::cdc_streams_timestamp(_cdc_streams_ts) },
{ gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(_cdc_gen_id) },
{ gms::application_state::STATUS, versioned_value::hibernate(true) },
}).get();
_gossiper.advertise_myself().get();
@@ -1760,7 +1760,7 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) {
return seastar::async([&ss, do_bind] {
if (!ss._initialized) {
slogger.warn("Starting gossip by operator request");
auto cdc_gen_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
auto cdc_gen_ts = db::system_keyspace::get_cdc_generation_id().get0();
if (!cdc_gen_ts) {
cdc_log.warn("CDC generation timestamp missing when starting gossip");
}

View File

@@ -336,7 +336,7 @@ private:
* DO NOT use this variable after `join_token_ring` (i.e. after we call `generation_service::after_join`
* and pass it the ownership of the timestamp.
*/
std::optional<db_clock::time_point> _cdc_streams_ts;
std::optional<db_clock::time_point> _cdc_gen_id;
public:
void enable_all_features();