system.batchlog will still have to be used while the cluster is upgrading from an older version, which doesn't know v2 yet. Re-add support for replaying v1 batchlogs. The switch to v2 will happen after the BATCHLOG_V2 cluster feature is enabled. The only external user -- storage_proxy -- only needs a minor adjustment: switch between the table names. The rest is handled transparently by the db/batchlog.hh interface and the batchlog_manager.
590 lines
26 KiB
C++
590 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include <chrono>
|
|
#include <exception>
|
|
#include <ranges>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/do_with.hh>
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/metrics.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
|
|
#include "batchlog_manager.hh"
|
|
#include "batchlog.hh"
|
|
#include "data_dictionary/data_dictionary.hh"
|
|
#include "mutation/canonical_mutation.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "system_keyspace.hh"
|
|
#include "utils/rate_limiter.hh"
|
|
#include "utils/log.hh"
|
|
#include "utils/murmur_hash.hh"
|
|
#include "db_clock.hh"
|
|
#include "unimplemented.hh"
|
|
#include "idl/frozen_schema.dist.hh"
|
|
#include "idl/frozen_schema.dist.impl.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "cql3/untyped_result_set.hh"
|
|
#include "service_permit.hh"
|
|
#include "cql3/query_processor.hh"
|
|
|
|
static logging::logger blogger("batchlog_manager");
|
|
|
|
namespace db {
|
|
|
|
// Yields 256 batchlog shards. Even on the largest nodes we currently run on,
|
|
// this should be enough to give every core a batchlog partition.
|
|
static constexpr unsigned batchlog_shard_bits = 8;
|
|
|
|
int32_t batchlog_shard_of(db_clock::time_point written_at) {
|
|
const int64_t count = written_at.time_since_epoch().count();
|
|
std::array<uint64_t, 2> result;
|
|
utils::murmur_hash::hash3_x64_128(bytes_view(reinterpret_cast<const signed char*>(&count), sizeof(count)), 0, result);
|
|
uint64_t hash = result[0] ^ result[1];
|
|
return hash & ((1ULL << batchlog_shard_bits) - 1);
|
|
}
|
|
|
|
bool is_batchlog_v1(const schema& schema) {
|
|
return schema.cf_name() == system_keyspace::BATCHLOG;
|
|
}
|
|
|
|
std::pair<partition_key, clustering_key>
|
|
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
|
if (is_batchlog_v1(schema)) {
|
|
if (!id) {
|
|
on_internal_error(blogger, "get_batchlog_key(): key for batchlog v1 requires batchlog id");
|
|
}
|
|
auto pkey = partition_key::from_single_value(schema, {serialized(*id)});
|
|
auto ckey = clustering_key::make_empty();
|
|
return std::pair(std::move(pkey), std::move(ckey));
|
|
}
|
|
|
|
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
|
|
|
|
std::vector<bytes> ckey_components;
|
|
ckey_components.reserve(2);
|
|
ckey_components.push_back(serialized(written_at));
|
|
if (id) {
|
|
ckey_components.push_back(serialized(*id));
|
|
}
|
|
auto ckey = clustering_key::from_exploded(schema, ckey_components);
|
|
|
|
return {std::move(pkey), std::move(ckey)};
|
|
}
|
|
|
|
std::pair<partition_key, clustering_key>
|
|
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, db_clock::time_point written_at, std::optional<utils::UUID> id) {
|
|
return get_batchlog_key(schema, version, stage, batchlog_shard_of(written_at), written_at, id);
|
|
}
|
|
|
|
mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
|
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
|
|
|
auto timestamp = api::new_timestamp();
|
|
|
|
mutation m(schema, key);
|
|
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
|
|
auto cdef_data = schema->get_column_definition(to_bytes("data"));
|
|
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
|
|
|
|
if (is_batchlog_v1(*schema)) {
|
|
auto cdef_version = schema->get_column_definition(to_bytes("version"));
|
|
m.set_cell(ckey, *cdef_version, atomic_cell::make_live(*cdef_version->type, timestamp, serialized(version)));
|
|
|
|
auto cdef_written_at = schema->get_column_definition(to_bytes("written_at"));
|
|
m.set_cell(ckey, *cdef_written_at, atomic_cell::make_live(*cdef_written_at->type, timestamp, serialized(now)));
|
|
}
|
|
|
|
return m;
|
|
}
|
|
|
|
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
|
auto data = [&mutations] {
|
|
utils::chunked_vector<canonical_mutation> fm(mutations.begin(), mutations.end());
|
|
bytes_ostream out;
|
|
for (auto& m : fm) {
|
|
ser::serialize(out, m);
|
|
}
|
|
return std::move(out).to_managed_bytes();
|
|
}();
|
|
|
|
return get_batchlog_mutation_for(std::move(schema), std::move(data), version, stage, now, id);
|
|
}
|
|
|
|
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
|
return get_batchlog_mutation_for(std::move(schema), mutations, version, batchlog_stage::initial, now, id);
|
|
}
|
|
|
|
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
|
|
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
|
|
mutation m(schema, key);
|
|
auto timestamp = api::new_timestamp();
|
|
m.partition().apply_delete(*schema, ckey, tombstone(timestamp, gc_clock::now()));
|
|
return m;
|
|
}
|
|
|
|
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id) {
|
|
return get_batchlog_delete_mutation(std::move(schema), version, batchlog_stage::initial, now, id);
|
|
}
|
|
|
|
} // namespace db
|
|
|
|
const std::chrono::seconds db::batchlog_manager::replay_interval;
|
|
const uint32_t db::batchlog_manager::page_size;
|
|
|
|
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config)
|
|
: _qp(qp)
|
|
, _sys_ks(sys_ks)
|
|
, _fs(fs)
|
|
, _replay_timeout(config.replay_timeout)
|
|
, _replay_rate(config.replay_rate)
|
|
, _delay(config.delay)
|
|
, _replay_cleanup_after_replays(config.replay_cleanup_after_replays)
|
|
, _gate("batchlog_manager")
|
|
, _loop_done(batchlog_replay_loop())
|
|
{
|
|
namespace sm = seastar::metrics;
|
|
|
|
_metrics.add_group("batchlog_manager", {
|
|
sm::make_counter("total_write_replay_attempts", _stats.write_attempts,
|
|
sm::description("Counts write operations issued in a batchlog replay flow. "
|
|
"The high value of this metric indicates that we have a long batch replay list.")),
|
|
});
|
|
}
|
|
|
|
future<db::all_batches_replayed> db::batchlog_manager::do_batch_log_replay(post_replay_cleanup cleanup) {
|
|
return container().invoke_on(0, [cleanup] (auto& bm) -> future<db::all_batches_replayed> {
|
|
auto gate_holder = bm._gate.hold();
|
|
auto sem_units = co_await get_units(bm._sem, 1);
|
|
|
|
auto dest = bm._cpu++ % smp::count;
|
|
blogger.debug("Batchlog replay on shard {}: starts", dest);
|
|
auto last_replay = gc_clock::now();
|
|
all_batches_replayed all_replayed = all_batches_replayed::yes;
|
|
if (dest == 0) {
|
|
all_replayed = co_await bm.replay_all_failed_batches(cleanup);
|
|
} else {
|
|
all_replayed = co_await bm.container().invoke_on(dest, [cleanup] (auto& bm) {
|
|
return with_gate(bm._gate, [&bm, cleanup] {
|
|
return bm.replay_all_failed_batches(cleanup);
|
|
});
|
|
});
|
|
}
|
|
if (all_replayed == all_batches_replayed::yes) {
|
|
co_await bm.container().invoke_on_all([last_replay] (auto& bm) {
|
|
bm._last_replay = last_replay;
|
|
});
|
|
}
|
|
blogger.debug("Batchlog replay on shard {}: done", dest);
|
|
co_return all_replayed;
|
|
});
|
|
}
|
|
|
|
future<> db::batchlog_manager::batchlog_replay_loop() {
|
|
if (this_shard_id() != 0) {
|
|
// Since replay is a "node global" operation, we should not attempt to do
|
|
// it in parallel on each shard. It will just overlap/interfere. To
|
|
// simplify syncing between batchlog_replay_loop and user initiated replay operations,
|
|
// we use the _sem on shard zero only. Replaying batchlog can
|
|
// generate a lot of work, so we distrute the real work on all cpus with
|
|
// round-robin scheduling.
|
|
co_return;
|
|
}
|
|
|
|
unsigned replay_counter = 0;
|
|
auto delay = _delay;
|
|
while (!_stop.abort_requested()) {
|
|
try {
|
|
co_await sleep_abortable(delay, _stop);
|
|
} catch (sleep_aborted&) {
|
|
co_return;
|
|
}
|
|
try {
|
|
auto cleanup = post_replay_cleanup::no;
|
|
if (++replay_counter >= _replay_cleanup_after_replays) {
|
|
replay_counter = 0;
|
|
cleanup = post_replay_cleanup::yes;
|
|
}
|
|
co_await do_batch_log_replay(cleanup);
|
|
} catch (seastar::broken_semaphore&) {
|
|
if (_stop.abort_requested()) {
|
|
co_return;
|
|
}
|
|
on_internal_error_noexcept(blogger, fmt::format("Unexcepted exception in batchlog reply: {}", std::current_exception()));
|
|
} catch (...) {
|
|
blogger.error("Exception in batch replay: {}", std::current_exception());
|
|
}
|
|
delay = utils::get_local_injector().is_enabled("short_batchlog_manager_replay_interval") ?
|
|
std::chrono::seconds(1) : replay_interval;
|
|
}
|
|
}
|
|
|
|
future<> db::batchlog_manager::drain() {
|
|
if (_stop.abort_requested()) {
|
|
co_return;
|
|
}
|
|
|
|
blogger.info("Asked to drain");
|
|
_stop.request_abort();
|
|
if (this_shard_id() == 0) {
|
|
// Abort do_batch_log_replay if waiting on the semaphore.
|
|
_sem.broken();
|
|
}
|
|
|
|
co_await _qp.proxy().abort_batch_writes();
|
|
|
|
co_await std::move(_loop_done);
|
|
blogger.info("Drained");
|
|
}
|
|
|
|
future<> db::batchlog_manager::stop() {
|
|
blogger.info("Asked to stop");
|
|
co_await drain();
|
|
co_await _gate.close();
|
|
blogger.info("Stopped");
|
|
}
|
|
|
|
future<size_t> db::batchlog_manager::count_all_batches() const {
|
|
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
|
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> rs) {
|
|
return size_t(rs->one().get_as<int64_t>("count"));
|
|
});
|
|
}
|
|
|
|
future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
|
|
if (_migration_done) {
|
|
return make_ready_future<>();
|
|
}
|
|
return with_gate(_gate, [this] () mutable -> future<> {
|
|
blogger.info("Migrating batchlog entries from v1 -> v2");
|
|
|
|
auto schema_v1 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
|
auto schema_v2 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
|
|
|
auto batch = [this, schema_v1, schema_v2] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
|
// check version of serialization format
|
|
if (!row.has("version")) {
|
|
blogger.warn("Not migrating logged batch because of unknown version");
|
|
co_return stop_iteration::no;
|
|
}
|
|
|
|
auto version = row.get_as<int32_t>("version");
|
|
if (version != netw::messaging_service::current_version) {
|
|
blogger.warn("Not migrating logged batch because of incorrect version");
|
|
co_return stop_iteration::no;
|
|
}
|
|
|
|
auto id = row.get_as<utils::UUID>("id");
|
|
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
|
auto data = row.get_blob_fragmented("data");
|
|
|
|
auto& sp = _qp.proxy();
|
|
|
|
utils::get_local_injector().inject("batchlog_manager_fail_migration", [] { throw std::runtime_error("Error injection: failing batchlog migration"); });
|
|
|
|
auto migrate_mut = get_batchlog_mutation_for(schema_v2, std::move(data), version, batchlog_stage::failed_replay, written_at, id);
|
|
co_await sp.mutate_locally(migrate_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
|
|
|
mutation delete_mut(schema_v1, partition_key::from_single_value(*schema_v1, serialized(id)));
|
|
delete_mut.partition().apply_delete(*schema_v1, clustering_key_prefix::make_empty(), tombstone(api::new_timestamp(), gc_clock::now()));
|
|
co_await sp.mutate_locally(delete_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
|
|
|
co_return stop_iteration::no;
|
|
};
|
|
try {
|
|
co_await _qp.query_internal(
|
|
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
|
db::consistency_level::ONE,
|
|
{},
|
|
page_size,
|
|
std::move(batch));
|
|
} catch (...) {
|
|
blogger.warn("Batchlog v1 to v2 migration failed: {}; will retry", std::current_exception());
|
|
co_return;
|
|
}
|
|
|
|
co_await container().invoke_on_all([] (auto& bm) {
|
|
bm._migration_done = true;
|
|
});
|
|
|
|
blogger.info("Done migrating batchlog entries from v1 -> v2");
|
|
});
|
|
}
|
|
|
|
namespace {
|
|
|
|
using clock_type = db_clock::rep;
|
|
|
|
struct replay_stats {
|
|
std::optional<db_clock::time_point> min_too_fresh;
|
|
bool need_cleanup = false;
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
static future<db::all_batches_replayed> process_batch(
|
|
cql3::query_processor& qp,
|
|
db::batchlog_manager::stats& stats,
|
|
db::batchlog_manager::post_replay_cleanup cleanup,
|
|
utils::rate_limiter& limiter,
|
|
schema_ptr schema,
|
|
std::unordered_map<int32_t, replay_stats>& replay_stats_per_shard,
|
|
const db_clock::time_point now,
|
|
db_clock::duration replay_timeout,
|
|
std::chrono::seconds write_timeout,
|
|
const cql3::untyped_result_set::row& row) {
|
|
const bool is_v1 = db::is_batchlog_v1(*schema);
|
|
const auto stage = is_v1 ? db::batchlog_stage::initial : static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
|
|
const auto batch_shard = is_v1 ? 0 : row.get_as<int32_t>("shard");
|
|
auto written_at = row.get_as<db_clock::time_point>("written_at");
|
|
auto id = row.get_as<utils::UUID>("id");
|
|
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
|
|
auto timeout = replay_timeout;
|
|
|
|
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
|
|
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
|
|
co_return db::all_batches_replayed::no;
|
|
}
|
|
|
|
auto data = row.get_blob_unfragmented("data");
|
|
|
|
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
|
|
|
|
utils::chunked_vector<mutation> mutations;
|
|
bool send_failed = false;
|
|
|
|
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
|
|
|
|
try {
|
|
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
|
|
auto in = ser::as_input_stream(data);
|
|
while (in.size()) {
|
|
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
|
|
const auto tbl = qp.db().try_find_table(fm.column_family_id());
|
|
if (!tbl) {
|
|
continue;
|
|
}
|
|
if (written_at <= tbl->get_truncation_time()) {
|
|
continue;
|
|
}
|
|
schema_ptr s = tbl->schema();
|
|
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
|
|
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
|
|
}
|
|
fms.emplace_back(std::move(fm), std::move(s));
|
|
}
|
|
|
|
if (now < written_at + timeout) {
|
|
blogger.debug("Skipping replay of {}, too fresh", id);
|
|
|
|
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
|
|
|
|
co_return db::all_batches_replayed::no;
|
|
}
|
|
|
|
auto size = data.size();
|
|
|
|
for (const auto& [fm, s] : fms) {
|
|
mutations.emplace_back(fm.to_mutation(s));
|
|
co_await coroutine::maybe_yield();
|
|
}
|
|
|
|
if (!mutations.empty()) {
|
|
const auto ttl = [written_at]() -> clock_type {
|
|
/*
|
|
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
|
|
* This ensures that deletes aren't "undone" by an old batch replay.
|
|
*/
|
|
auto unadjusted_ttl = std::numeric_limits<gc_clock::rep>::max();
|
|
warn(unimplemented::cause::HINT);
|
|
#if 0
|
|
for (auto& m : *mutations) {
|
|
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
|
|
}
|
|
#endif
|
|
return unadjusted_ttl - std::chrono::duration_cast<gc_clock::duration>(db_clock::now() - written_at).count();
|
|
}();
|
|
|
|
if (ttl > 0) {
|
|
// Origin does the send manually, however I can't see a super great reason to do so.
|
|
// Our normal write path does not add much redundancy to the dispatch, and rate is handled after send
|
|
// in both cases.
|
|
// FIXME: verify that the above is reasonably true.
|
|
co_await limiter.reserve(size);
|
|
stats.write_attempts += mutations.size();
|
|
auto timeout = db::timeout_clock::now() + write_timeout;
|
|
if (cleanup) {
|
|
co_await qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
|
|
} else {
|
|
co_await qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
|
|
}
|
|
}
|
|
}
|
|
} catch (data_dictionary::no_such_keyspace& ex) {
|
|
// should probably ignore and drop the batch
|
|
} catch (const data_dictionary::no_such_column_family&) {
|
|
// As above -- we should drop the batch if the table doesn't exist anymore.
|
|
} catch (...) {
|
|
blogger.warn("Replay failed (will retry): {}", std::current_exception());
|
|
// timeout, overload etc.
|
|
// Do _not_ remove the batch, assuning we got a node write error.
|
|
// Since we don't have hints (which origin is satisfied with),
|
|
// we have to resort to keeping this batch to next lap.
|
|
if (is_v1 || !cleanup || stage == db::batchlog_stage::failed_replay) {
|
|
co_return db::all_batches_replayed::no;
|
|
}
|
|
send_failed = true;
|
|
}
|
|
|
|
auto& sp = qp.proxy();
|
|
|
|
if (send_failed) {
|
|
blogger.debug("Moving batch {} to stage failed_replay", id);
|
|
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, db::batchlog_stage::failed_replay, written_at, id);
|
|
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
|
}
|
|
|
|
// delete batch
|
|
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
|
|
co_await qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
|
|
|
shard_written_at.need_cleanup = true;
|
|
|
|
co_return db::all_batches_replayed(!send_failed);
|
|
}
|
|
|
|
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v1(post_replay_cleanup) {
|
|
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
|
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
|
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
|
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
|
utils::rate_limiter limiter(throttle);
|
|
|
|
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
|
|
|
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
|
|
|
// Use a stable `now` across all batches, so skip/replay decisions are the
|
|
// same across a while prefix of written_at (across all ids).
|
|
const auto now = db_clock::now();
|
|
|
|
auto batch = [this, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
|
all_replayed = all_replayed && co_await process_batch(_qp, _stats, post_replay_cleanup::no, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
|
|
co_return stop_iteration::no;
|
|
};
|
|
|
|
co_await with_gate(_gate, [this, &all_replayed, batch = std::move(batch)] () mutable -> future<> {
|
|
blogger.debug("Started replayAllFailedBatches");
|
|
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
|
|
|
|
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
|
|
|
|
co_await _qp.query_internal(
|
|
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
|
|
db::consistency_level::ONE,
|
|
{},
|
|
page_size,
|
|
batch);
|
|
|
|
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
|
|
});
|
|
|
|
co_return all_replayed;
|
|
}
|
|
|
|
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches_v2(post_replay_cleanup cleanup) {
|
|
co_await maybe_migrate_v1_to_v2();
|
|
|
|
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
|
|
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
|
|
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
|
|
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
|
|
utils::rate_limiter limiter(throttle);
|
|
|
|
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
|
|
|
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
|
|
|
|
// Use a stable `now` across all batches, so skip/replay decisions are the
|
|
// same across a while prefix of written_at (across all ids).
|
|
const auto now = db_clock::now();
|
|
|
|
auto batch = [this, cleanup, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
|
all_replayed = all_replayed && co_await process_batch(_qp, _stats, cleanup, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row);
|
|
co_return stop_iteration::no;
|
|
};
|
|
|
|
co_await with_gate(_gate, [this, cleanup, &all_replayed, batch = std::move(batch), now, &replay_stats_per_shard] () mutable -> future<> {
|
|
blogger.debug("Started replayAllFailedBatches with cleanup: {}", cleanup);
|
|
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
|
|
|
|
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
|
|
|
|
co_await coroutine::parallel_for_each(std::views::iota(0, 16), [&] (int32_t chunk) -> future<> {
|
|
const int32_t batchlog_chunk_base = chunk * 16;
|
|
for (int32_t i = 0; i < 16; ++i) {
|
|
int32_t batchlog_shard = batchlog_chunk_base + i;
|
|
|
|
co_await _qp.query_internal(
|
|
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
|
db::consistency_level::ONE,
|
|
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::failed_replay)), data_value(batchlog_shard)},
|
|
page_size,
|
|
batch);
|
|
|
|
co_await _qp.query_internal(
|
|
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
|
|
db::consistency_level::ONE,
|
|
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::initial)), data_value(batchlog_shard)},
|
|
page_size,
|
|
batch);
|
|
|
|
if (cleanup != post_replay_cleanup::yes) {
|
|
continue;
|
|
}
|
|
|
|
auto it = replay_stats_per_shard.find(batchlog_shard);
|
|
if (it == replay_stats_per_shard.end() || !it->second.need_cleanup) {
|
|
// Nothing was replayed on this batchlog shard, nothing to cleanup.
|
|
continue;
|
|
}
|
|
|
|
const auto write_time = it->second.min_too_fresh.value_or(now - _replay_timeout);
|
|
const auto end_weight = it->second.min_too_fresh ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed;
|
|
auto [key, ckey] = get_batchlog_key(*schema, netw::messaging_service::current_version, batchlog_stage::initial, batchlog_shard, write_time, {});
|
|
auto end_pos = position_in_partition(partition_region::clustered, end_weight, std::move(ckey));
|
|
|
|
range_tombstone rt(position_in_partition::before_all_clustered_rows(), std::move(end_pos), tombstone(api::new_timestamp(), gc_clock::now()));
|
|
|
|
blogger.trace("Clean up batchlog shard {} with range tombstone {}", batchlog_shard, rt);
|
|
|
|
mutation m(schema, key);
|
|
m.partition().apply_row_tombstone(*schema, std::move(rt));
|
|
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
|
|
}
|
|
});
|
|
|
|
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
|
|
});
|
|
|
|
co_return all_replayed;
|
|
}
|
|
|
|
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
|
|
if (_fs.batchlog_v2) {
|
|
return replay_all_failed_batches_v2(cleanup);
|
|
}
|
|
return replay_all_failed_batches_v1(cleanup);
|
|
}
|