From 29f610d8615712fdff04b43d3f5698fe211ef45e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 19 Jun 2024 09:16:04 -0400 Subject: [PATCH] db/batchlog_manager: replace open-coded paging with internal one query_processor has built-in paging support, no need to open-code paging in batchlog manager code. --- db/batchlog_manager.cc | 44 ++++++++++++------------------------------ 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 1a7259e70b..a8503c4e4d 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -153,26 +153,26 @@ future<> db::batchlog_manager::replay_all_failed_batches() { auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners(); auto limiter = make_lw_shared(throttle); - auto batch = [this, limiter](const cql3::untyped_result_set::row& row) { + auto batch = [this, limiter](const cql3::untyped_result_set::row& row) -> future { auto written_at = row.get_as("written_at"); auto id = row.get_as("id"); // enough time for the actual write + batchlog entry mutation delivery (two separate requests). auto timeout = get_batch_log_timeout(); if (db_clock::now() < written_at + timeout) { blogger.debug("Skipping replay of {}, too fresh", id); - return make_ready_future<>(); + return make_ready_future(stop_iteration::no); } // check version of serialization format if (!row.has("version")) { blogger.warn("Skipping logged batch because of unknown version"); - return make_ready_future<>(); + return make_ready_future(stop_iteration::no); } auto version = row.get_as("version"); if (version != netw::messaging_service::current_version) { blogger.warn("Skipping logged batch because of incorrect version"); - return make_ready_future<>(); + return make_ready_future(stop_iteration::no); } auto data = row.get_blob("data"); @@ -254,37 +254,17 @@ future<> db::batchlog_manager::replay_all_failed_batches() { auto now = service::client_state(service::client_state::internal_tag()).get_timestamp(); m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now())); return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no); - }); + }).then([] { return make_ready_future(stop_iteration::no); }); }; - return seastar::with_gate(_gate, [this, batch = std::move(batch)] { + return seastar::with_gate(_gate, [this, batch = std::move(batch)] () mutable { blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id()); - - typedef ::shared_ptr page_ptr; - sstring query = format("SELECT id, data, written_at, version FROM {}.{} LIMIT {:d}", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size); - return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this, batch = std::move(batch)](page_ptr page) { - return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable { - return repeat([this, &page, batch = std::move(batch)]() mutable { - if (page->empty()) { - return make_ready_future(stop_iteration::yes); - } - auto id = page->back().get_as("id"); - return parallel_for_each(*page, batch).then([this, &page, id]() { - if (page->size() < page_size) { - return make_ready_future(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty. - } - sstring query = format("SELECT id, data, written_at, version FROM {}.{} WHERE token(id) > token(?) LIMIT {:d}", - system_keyspace::NAME, - system_keyspace::BATCHLOG, - page_size); - return _qp.execute_internal(query, {id}, cql3::query_processor::cache_internal::yes).then([&page](auto res) { - page = std::move(res); - return make_ready_future(stop_iteration::no); - }); - }); - }); - }); - }).then([this] { + return _qp.query_internal( + format("SELECT id, data, written_at, version FROM {}.{})", system_keyspace::NAME, system_keyspace::BATCHLOG), + db::consistency_level::ONE, + {}, + page_size, + std::move(batch)).then([this] { // Replaying batches could have generated tombstones, flush to disk, // where they can be compacted away. return replica::database::flush_table_on_all_shards(_qp.proxy().get_db(), system_keyspace::NAME, system_keyspace::BATCHLOG);