db/batchlog_manager: replace open-coded paging with internal one

query_processor has built-in paging support, no need to open-code paging
in batchlog manager code.
This commit is contained in:
Botond Dénes
2024-06-19 09:16:04 -04:00
parent 2dd057c96d
commit 29f610d861

View File

@@ -153,26 +153,26 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
auto batch = [this, limiter](const cql3::untyped_result_set::row& row) {
auto batch = [this, limiter](const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = get_batch_log_timeout();
if (db_clock::now() < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
return make_ready_future<>();
return make_ready_future<stop_iteration>(stop_iteration::no);
}
// check version of serialization format
if (!row.has("version")) {
blogger.warn("Skipping logged batch because of unknown version");
return make_ready_future<>();
return make_ready_future<stop_iteration>(stop_iteration::no);
}
auto version = row.get_as<int32_t>("version");
if (version != netw::messaging_service::current_version) {
blogger.warn("Skipping logged batch because of incorrect version");
return make_ready_future<>();
return make_ready_future<stop_iteration>(stop_iteration::no);
}
auto data = row.get_blob("data");
@@ -254,37 +254,17 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
});
}).then([] { return make_ready_future<stop_iteration>(stop_iteration::no); });
};
return seastar::with_gate(_gate, [this, batch = std::move(batch)] {
return seastar::with_gate(_gate, [this, batch = std::move(batch)] () mutable {
blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id());
typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
sstring query = format("SELECT id, data, written_at, version FROM {}.{} LIMIT {:d}", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this, batch = std::move(batch)](page_ptr page) {
return do_with(std::move(page), [this, batch = std::move(batch)](page_ptr & page) mutable {
return repeat([this, &page, batch = std::move(batch)]() mutable {
if (page->empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto id = page->back().get_as<utils::UUID>("id");
return parallel_for_each(*page, batch).then([this, &page, id]() {
if (page->size() < page_size) {
return make_ready_future<stop_iteration>(stop_iteration::yes); // we've exhausted the batchlog, next query would be empty.
}
sstring query = format("SELECT id, data, written_at, version FROM {}.{} WHERE token(id) > token(?) LIMIT {:d}",
system_keyspace::NAME,
system_keyspace::BATCHLOG,
page_size);
return _qp.execute_internal(query, {id}, cql3::query_processor::cache_internal::yes).then([&page](auto res) {
page = std::move(res);
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
});
});
}).then([this] {
return _qp.query_internal(
format("SELECT id, data, written_at, version FROM {}.{})", system_keyspace::NAME, system_keyspace::BATCHLOG),
db::consistency_level::ONE,
{},
page_size,
std::move(batch)).then([this] {
// Replaying batches could have generated tombstones, flush to disk,
// where they can be compacted away.
return replica::database::flush_table_on_all_shards(_qp.proxy().get_db(), system_keyspace::NAME, system_keyspace::BATCHLOG);