From b3d139bbdb9cc1ba861d2118117a02122a0cb1ef Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 4 Apr 2022 12:20:36 +0300 Subject: [PATCH] system_keyspace: De-static update_cdc_generation_id() Signed-off-by: Pavel Emelyanov --- cdc/generation.cc | 4 ++-- db/system_keyspace.cc | 8 ++++---- db/system_keyspace.hh | 2 +- service/storage_service.cc | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cdc/generation.cc b/cdc/generation.cc index a145a8e7c0..7ecccd0a6f 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -875,7 +875,7 @@ future<> generation_service::check_and_repair_cdc_streams() { { gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(new_gen_id) }, { gms::application_state::STATUS, *status } }); - co_await db::system_keyspace::update_cdc_generation_id(new_gen_id); + co_await _sys_ks.local().update_cdc_generation_id(new_gen_id); } future<> generation_service::handle_cdc_generation(std::optional gen_id) { @@ -1018,7 +1018,7 @@ future generation_service::do_handle_cdc_generation(cdc::generation_id gen // The assumption follows from the requirement of bootstrapping nodes sequentially. if (!_gen_id || get_ts(*_gen_id) < get_ts(gen_id)) { _gen_id = gen_id; - co_await db::system_keyspace::update_cdc_generation_id(gen_id); + co_await _sys_ks.local().update_cdc_generation_id(gen_id); co_await _gossiper.add_local_application_state( gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(gen_id)); } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index c2f638f127..92b07649fc 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1750,13 +1750,13 @@ future> system_keyspace::get_local_tokens() { future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) { co_await std::visit(make_visitor( - [] (cdc::generation_id_v1 id) -> future<> { - co_await qctx->execute_cql( + [this] (cdc::generation_id_v1 id) -> future<> { + co_await execute_cql( format("INSERT INTO system.{} (key, streams_timestamp) VALUES (?, ?)", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), id.ts); }, - [] (cdc::generation_id_v2 id) -> future<> { - co_await qctx->execute_cql( + [this] (cdc::generation_id_v2 id) -> future<> { + co_await execute_cql( format("INSERT INTO system.{} (key, streams_timestamp, uuid) VALUES (?, ?, ?)", v3::CDC_LOCAL), sstring(v3::CDC_LOCAL), id.ts, id.id); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 4cbc980b76..e23af385e7 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -410,7 +410,7 @@ public: /* * Save the CDC generation ID announced by this node in persistent storage. */ - static future<> update_cdc_generation_id(cdc::generation_id); + future<> update_cdc_generation_id(cdc::generation_id); /* * Read the CDC generation ID announced by this node from persistent storage. diff --git a/service/storage_service.cc b/service/storage_service.cc index 099604672a..a3ebcf8c92 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -582,7 +582,7 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // Persist the CDC streams timestamp before we persist bootstrap_state = COMPLETED. if (cdc_gen_id) { - co_await db::system_keyspace::update_cdc_generation_id(*cdc_gen_id); + co_await _sys_ks.local().update_cdc_generation_id(*cdc_gen_id); } // If we crash now, we will choose a new CDC streams timestamp anyway (because we will also choose a new set of tokens). // But if we crash after setting bootstrap_state = COMPLETED, we will keep using the persisted CDC streams timestamp after restarting.