mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
system_keyspace: De-static update_cdc_generation_id()
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -875,7 +875,7 @@ future<> generation_service::check_and_repair_cdc_streams() {
|
||||
{ gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(new_gen_id) },
|
||||
{ gms::application_state::STATUS, *status }
|
||||
});
|
||||
co_await db::system_keyspace::update_cdc_generation_id(new_gen_id);
|
||||
co_await _sys_ks.local().update_cdc_generation_id(new_gen_id);
|
||||
}
|
||||
|
||||
future<> generation_service::handle_cdc_generation(std::optional<cdc::generation_id> gen_id) {
|
||||
@@ -1018,7 +1018,7 @@ future<bool> generation_service::do_handle_cdc_generation(cdc::generation_id gen
|
||||
// The assumption follows from the requirement of bootstrapping nodes sequentially.
|
||||
if (!_gen_id || get_ts(*_gen_id) < get_ts(gen_id)) {
|
||||
_gen_id = gen_id;
|
||||
co_await db::system_keyspace::update_cdc_generation_id(gen_id);
|
||||
co_await _sys_ks.local().update_cdc_generation_id(gen_id);
|
||||
co_await _gossiper.add_local_application_state(
|
||||
gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(gen_id));
|
||||
}
|
||||
|
||||
@@ -1750,13 +1750,13 @@ future<std::unordered_set<dht::token>> system_keyspace::get_local_tokens() {
|
||||
|
||||
future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) {
|
||||
co_await std::visit(make_visitor(
|
||||
[] (cdc::generation_id_v1 id) -> future<> {
|
||||
co_await qctx->execute_cql(
|
||||
[this] (cdc::generation_id_v1 id) -> future<> {
|
||||
co_await execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL),
|
||||
sstring(v3::CDC_LOCAL), id.ts);
|
||||
},
|
||||
[] (cdc::generation_id_v2 id) -> future<> {
|
||||
co_await qctx->execute_cql(
|
||||
[this] (cdc::generation_id_v2 id) -> future<> {
|
||||
co_await execute_cql(
|
||||
format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", v3::CDC_LOCAL),
|
||||
sstring(v3::CDC_LOCAL), id.ts, id.id);
|
||||
}
|
||||
|
||||
@@ -410,7 +410,7 @@ public:
|
||||
/*
|
||||
* Save the CDC generation ID announced by this node in persistent storage.
|
||||
*/
|
||||
static future<> update_cdc_generation_id(cdc::generation_id);
|
||||
future<> update_cdc_generation_id(cdc::generation_id);
|
||||
|
||||
/*
|
||||
* Read the CDC generation ID announced by this node from persistent storage.
|
||||
|
||||
@@ -582,7 +582,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
|
||||
// Persist the CDC streams timestamp before we persist bootstrap_state = COMPLETED.
|
||||
if (cdc_gen_id) {
|
||||
co_await db::system_keyspace::update_cdc_generation_id(*cdc_gen_id);
|
||||
co_await _sys_ks.local().update_cdc_generation_id(*cdc_gen_id);
|
||||
}
|
||||
// If we crash now, we will choose a new CDC streams timestamp anyway (because we will also choose a new set of tokens).
|
||||
// But if we crash after setting bootstrap_state = COMPLETED, we will keep using the persisted CDC streams timestamp after restarting.
|
||||
|
||||
Reference in New Issue
Block a user