/* * Copyright (C) 2015-present ScyllaDB * * Modified by ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #include #include #include #include #include #include #include #include #include #include #include #include "batchlog_manager.hh" #include "batchlog.hh" #include "data_dictionary/data_dictionary.hh" #include "mutation/canonical_mutation.hh" #include "service/storage_proxy.hh" #include "system_keyspace.hh" #include "utils/rate_limiter.hh" #include "utils/log.hh" #include "utils/murmur_hash.hh" #include "db_clock.hh" #include "unimplemented.hh" #include "idl/frozen_schema.dist.hh" #include "idl/frozen_schema.dist.impl.hh" #include "db/schema_tables.hh" #include "message/messaging_service.hh" #include "cql3/untyped_result_set.hh" #include "service_permit.hh" #include "cql3/query_processor.hh" static logging::logger blogger("batchlog_manager"); namespace db { // Yields 256 batchlog shards. Even on the largest nodes we currently run on, // this should be enough to give every core a batchlog partition. static constexpr unsigned batchlog_shard_bits = 8; int32_t batchlog_shard_of(db_clock::time_point written_at) { const int64_t count = written_at.time_since_epoch().count(); std::array result; utils::murmur_hash::hash3_x64_128(bytes_view(reinterpret_cast(&count), sizeof(count)), 0, result); uint64_t hash = result[0] ^ result[1]; return hash & ((1ULL << batchlog_shard_bits) - 1); } bool is_batchlog_v1(const schema& schema) { return schema.cf_name() == system_keyspace::BATCHLOG; } std::pair get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional id) { if (is_batchlog_v1(schema)) { if (!id) { on_internal_error(blogger, "get_batchlog_key(): key for batchlog v1 requires batchlog id"); } auto pkey = partition_key::from_single_value(schema, {serialized(*id)}); auto ckey = clustering_key::make_empty(); return std::pair(std::move(pkey), std::move(ckey)); } auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)}); std::vector ckey_components; ckey_components.reserve(2); ckey_components.push_back(serialized(written_at)); if (id) { ckey_components.push_back(serialized(*id)); } auto ckey = clustering_key::from_exploded(schema, ckey_components); return {std::move(pkey), std::move(ckey)}; } std::pair get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, db_clock::time_point written_at, std::optional id) { return get_batchlog_key(schema, version, stage, batchlog_shard_of(written_at), written_at, id); } mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) { auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id); auto timestamp = api::new_timestamp(); mutation m(schema, key); // Avoid going through data_value and therefore `bytes`, as it can be large (#24809). auto cdef_data = schema->get_column_definition(to_bytes("data")); m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data))); if (is_batchlog_v1(*schema)) { auto cdef_version = schema->get_column_definition(to_bytes("version")); m.set_cell(ckey, *cdef_version, atomic_cell::make_live(*cdef_version->type, timestamp, serialized(version))); auto cdef_written_at = schema->get_column_definition(to_bytes("written_at")); m.set_cell(ckey, *cdef_written_at, atomic_cell::make_live(*cdef_written_at->type, timestamp, serialized(now))); } return m; } mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector& mutations, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) { auto data = [&mutations] { utils::chunked_vector fm(mutations.begin(), mutations.end()); bytes_ostream out; for (auto& m : fm) { ser::serialize(out, m); } return std::move(out).to_managed_bytes(); }(); return get_batchlog_mutation_for(std::move(schema), std::move(data), version, stage, now, id); } mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id) { return get_batchlog_mutation_for(std::move(schema), mutations, version, batchlog_stage::initial, now, id); } mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) { auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id); mutation m(schema, key); auto timestamp = api::new_timestamp(); m.partition().apply_delete(*schema, ckey, tombstone(timestamp, gc_clock::now())); return m; } mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id) { return get_batchlog_delete_mutation(std::move(schema), version, batchlog_stage::initial, now, id); } } // namespace db const std::chrono::seconds db::batchlog_manager::replay_interval; const uint32_t db::batchlog_manager::page_size; db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, gms::feature_service& fs, batchlog_manager_config config) : _qp(qp) , _sys_ks(sys_ks) , _fs(fs) , _replay_timeout(config.replay_timeout) , _replay_rate(config.replay_rate) , _delay(config.delay) , _replay_cleanup_after_replays(config.replay_cleanup_after_replays) , _gate("batchlog_manager") , _loop_done(batchlog_replay_loop()) { namespace sm = seastar::metrics; _metrics.add_group("batchlog_manager", { sm::make_counter("total_write_replay_attempts", _stats.write_attempts, sm::description("Counts write operations issued in a batchlog replay flow. " "The high value of this metric indicates that we have a long batch replay list.")), }); } future db::batchlog_manager::do_batch_log_replay(post_replay_cleanup cleanup) { return container().invoke_on(0, [cleanup] (auto& bm) -> future { auto gate_holder = bm._gate.hold(); auto sem_units = co_await get_units(bm._sem, 1); auto dest = bm._cpu++ % smp::count; blogger.debug("Batchlog replay on shard {}: starts", dest); auto last_replay = gc_clock::now(); all_batches_replayed all_replayed = all_batches_replayed::yes; if (dest == 0) { all_replayed = co_await bm.replay_all_failed_batches(cleanup); } else { all_replayed = co_await bm.container().invoke_on(dest, [cleanup] (auto& bm) { return with_gate(bm._gate, [&bm, cleanup] { return bm.replay_all_failed_batches(cleanup); }); }); } if (all_replayed == all_batches_replayed::yes) { co_await bm.container().invoke_on_all([last_replay] (auto& bm) { bm._last_replay = last_replay; }); } blogger.debug("Batchlog replay on shard {}: done", dest); co_return all_replayed; }); } future<> db::batchlog_manager::batchlog_replay_loop() { if (this_shard_id() != 0) { // Since replay is a "node global" operation, we should not attempt to do // it in parallel on each shard. It will just overlap/interfere. To // simplify syncing between batchlog_replay_loop and user initiated replay operations, // we use the _sem on shard zero only. Replaying batchlog can // generate a lot of work, so we distrute the real work on all cpus with // round-robin scheduling. co_return; } unsigned replay_counter = 0; auto delay = _delay; while (!_stop.abort_requested()) { try { co_await sleep_abortable(delay, _stop); } catch (sleep_aborted&) { co_return; } try { auto cleanup = post_replay_cleanup::no; if (++replay_counter >= _replay_cleanup_after_replays) { replay_counter = 0; cleanup = post_replay_cleanup::yes; } co_await do_batch_log_replay(cleanup); } catch (seastar::broken_semaphore&) { if (_stop.abort_requested()) { co_return; } on_internal_error_noexcept(blogger, fmt::format("Unexcepted exception in batchlog reply: {}", std::current_exception())); } catch (...) { blogger.error("Exception in batch replay: {}", std::current_exception()); } delay = utils::get_local_injector().is_enabled("short_batchlog_manager_replay_interval") ? std::chrono::seconds(1) : replay_interval; } } future<> db::batchlog_manager::drain() { if (_stop.abort_requested()) { co_return; } blogger.info("Asked to drain"); _stop.request_abort(); if (this_shard_id() == 0) { // Abort do_batch_log_replay if waiting on the semaphore. _sem.broken(); } co_await _qp.proxy().abort_batch_writes(); co_await std::move(_loop_done); blogger.info("Drained"); } future<> db::batchlog_manager::stop() { blogger.info("Asked to stop"); co_await drain(); co_await _gate.close(); blogger.info("Stopped"); } future db::batchlog_manager::count_all_batches() const { sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2); return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([](::shared_ptr rs) { return size_t(rs->one().get_as("count")); }); } future<> db::batchlog_manager::maybe_migrate_v1_to_v2() { if (_migration_done) { return make_ready_future<>(); } return with_gate(_gate, [this] () mutable -> future<> { blogger.info("Migrating batchlog entries from v1 -> v2"); auto schema_v1 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); auto schema_v2 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2); auto batch = [this, schema_v1, schema_v2] (const cql3::untyped_result_set::row& row) -> future { // check version of serialization format if (!row.has("version")) { blogger.warn("Not migrating logged batch because of unknown version"); co_return stop_iteration::no; } auto version = row.get_as("version"); if (version != netw::messaging_service::current_version) { blogger.warn("Not migrating logged batch because of incorrect version"); co_return stop_iteration::no; } auto id = row.get_as("id"); auto written_at = row.get_as("written_at"); auto data = row.get_blob_fragmented("data"); auto& sp = _qp.proxy(); utils::get_local_injector().inject("batchlog_manager_fail_migration", [] { throw std::runtime_error("Error injection: failing batchlog migration"); }); auto migrate_mut = get_batchlog_mutation_for(schema_v2, std::move(data), version, batchlog_stage::failed_replay, written_at, id); co_await sp.mutate_locally(migrate_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no); mutation delete_mut(schema_v1, partition_key::from_single_value(*schema_v1, serialized(id))); delete_mut.partition().apply_delete(*schema_v1, clustering_key_prefix::make_empty(), tombstone(api::new_timestamp(), gc_clock::now())); co_await sp.mutate_locally(delete_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no); co_return stop_iteration::no; }; try { co_await _qp.query_internal( format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG), db::consistency_level::ONE, {}, page_size, std::move(batch)); } catch (...) { blogger.warn("Batchlog v1 to v2 migration failed: {}; will retry", std::current_exception()); co_return; } co_await container().invoke_on_all([] (auto& bm) { bm._migration_done = true; }); blogger.info("Done migrating batchlog entries from v1 -> v2"); }); } namespace { using clock_type = db_clock::rep; struct replay_stats { std::optional min_too_fresh; bool need_cleanup = false; }; } // anonymous namespace static future process_batch( cql3::query_processor& qp, db::batchlog_manager::stats& stats, db::batchlog_manager::post_replay_cleanup cleanup, utils::rate_limiter& limiter, schema_ptr schema, std::unordered_map& replay_stats_per_shard, const db_clock::time_point now, db_clock::duration replay_timeout, std::chrono::seconds write_timeout, const cql3::untyped_result_set::row& row) { const bool is_v1 = db::is_batchlog_v1(*schema); const auto stage = is_v1 ? db::batchlog_stage::initial : static_cast(row.get_as("stage")); const auto batch_shard = is_v1 ? 0 : row.get_as("shard"); auto written_at = row.get_as("written_at"); auto id = row.get_as("id"); // enough time for the actual write + batchlog entry mutation delivery (two separate requests). auto timeout = replay_timeout; if (utils::get_local_injector().is_enabled("skip_batch_replay")) { blogger.debug("Skipping batch replay due to skip_batch_replay injection"); co_return db::all_batches_replayed::no; } auto data = row.get_blob_unfragmented("data"); blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard); utils::chunked_vector mutations; bool send_failed = false; auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second; try { utils::chunked_vector> fms; auto in = ser::as_input_stream(data); while (in.size()) { auto fm = ser::deserialize(in, std::type_identity()); const auto tbl = qp.db().try_find_table(fm.column_family_id()); if (!tbl) { continue; } if (written_at <= tbl->get_truncation_time()) { continue; } schema_ptr s = tbl->schema(); if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) { timeout = std::min(timeout, std::chrono::duration_cast(s->tombstone_gc_options().propagation_delay_in_seconds())); } fms.emplace_back(std::move(fm), std::move(s)); } if (now < written_at + timeout) { blogger.debug("Skipping replay of {}, too fresh", id); shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at); co_return db::all_batches_replayed::no; } auto size = data.size(); for (const auto& [fm, s] : fms) { mutations.emplace_back(fm.to_mutation(s)); co_await coroutine::maybe_yield(); } if (!mutations.empty()) { const auto ttl = [written_at]() -> clock_type { /* * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). * This ensures that deletes aren't "undone" by an old batch replay. */ auto unadjusted_ttl = std::numeric_limits::max(); warn(unimplemented::cause::HINT); #if 0 for (auto& m : *mutations) { unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); } #endif return unadjusted_ttl - std::chrono::duration_cast(db_clock::now() - written_at).count(); }(); if (ttl > 0) { // Origin does the send manually, however I can't see a super great reason to do so. // Our normal write path does not add much redundancy to the dispatch, and rate is handled after send // in both cases. // FIXME: verify that the above is reasonably true. co_await limiter.reserve(size); stats.write_attempts += mutations.size(); auto timeout = db::timeout_clock::now() + write_timeout; if (cleanup) { co_await qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout); } else { co_await qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout); } } } } catch (data_dictionary::no_such_keyspace& ex) { // should probably ignore and drop the batch } catch (const data_dictionary::no_such_column_family&) { // As above -- we should drop the batch if the table doesn't exist anymore. } catch (...) { blogger.warn("Replay failed (will retry): {}", std::current_exception()); // timeout, overload etc. // Do _not_ remove the batch, assuning we got a node write error. // Since we don't have hints (which origin is satisfied with), // we have to resort to keeping this batch to next lap. if (is_v1 || !cleanup || stage == db::batchlog_stage::failed_replay) { co_return db::all_batches_replayed::no; } send_failed = true; } auto& sp = qp.proxy(); if (send_failed) { blogger.debug("Moving batch {} to stage failed_replay", id); auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, db::batchlog_stage::failed_replay, written_at, id); co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no); } // delete batch auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id); co_await qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no); shard_written_at.need_cleanup = true; co_return db::all_batches_replayed(!send_failed); } future db::batchlog_manager::replay_all_failed_batches_v1(post_replay_cleanup) { db::all_batches_replayed all_replayed = all_batches_replayed::yes; // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners(); utils::rate_limiter limiter(throttle); auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); std::unordered_map replay_stats_per_shard; // Use a stable `now` across all batches, so skip/replay decisions are the // same across a while prefix of written_at (across all ids). const auto now = db_clock::now(); auto batch = [this, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future { all_replayed = all_replayed && co_await process_batch(_qp, _stats, post_replay_cleanup::no, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row); co_return stop_iteration::no; }; co_await with_gate(_gate, [this, &all_replayed, batch = std::move(batch)] () mutable -> future<> { blogger.debug("Started replayAllFailedBatches"); co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000)); auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG); co_await _qp.query_internal( format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG), db::consistency_level::ONE, {}, page_size, batch); blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed); }); co_return all_replayed; } future db::batchlog_manager::replay_all_failed_batches_v2(post_replay_cleanup cleanup) { co_await maybe_migrate_v1_to_v2(); db::all_batches_replayed all_replayed = all_batches_replayed::yes; // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml). // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272). auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners(); utils::rate_limiter limiter(throttle); auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2); std::unordered_map replay_stats_per_shard; // Use a stable `now` across all batches, so skip/replay decisions are the // same across a while prefix of written_at (across all ids). const auto now = db_clock::now(); auto batch = [this, cleanup, &limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) mutable -> future { all_replayed = all_replayed && co_await process_batch(_qp, _stats, cleanup, limiter, schema, replay_stats_per_shard, now, _replay_timeout, write_timeout, row); co_return stop_iteration::no; }; co_await with_gate(_gate, [this, cleanup, &all_replayed, batch = std::move(batch), now, &replay_stats_per_shard] () mutable -> future<> { blogger.debug("Started replayAllFailedBatches with cleanup: {}", cleanup); co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000)); auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2); co_await coroutine::parallel_for_each(std::views::iota(0, 16), [&] (int32_t chunk) -> future<> { const int32_t batchlog_chunk_base = chunk * 16; for (int32_t i = 0; i < 16; ++i) { int32_t batchlog_shard = batchlog_chunk_base + i; co_await _qp.query_internal( format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2), db::consistency_level::ONE, {data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::failed_replay)), data_value(batchlog_shard)}, page_size, batch); co_await _qp.query_internal( format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2), db::consistency_level::ONE, {data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::initial)), data_value(batchlog_shard)}, page_size, batch); if (cleanup != post_replay_cleanup::yes) { continue; } auto it = replay_stats_per_shard.find(batchlog_shard); if (it == replay_stats_per_shard.end() || !it->second.need_cleanup) { // Nothing was replayed on this batchlog shard, nothing to cleanup. continue; } const auto write_time = it->second.min_too_fresh.value_or(now - _replay_timeout); const auto end_weight = it->second.min_too_fresh ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed; auto [key, ckey] = get_batchlog_key(*schema, netw::messaging_service::current_version, batchlog_stage::initial, batchlog_shard, write_time, {}); auto end_pos = position_in_partition(partition_region::clustered, end_weight, std::move(ckey)); range_tombstone rt(position_in_partition::before_all_clustered_rows(), std::move(end_pos), tombstone(api::new_timestamp(), gc_clock::now())); blogger.trace("Clean up batchlog shard {} with range tombstone {}", batchlog_shard, rt); mutation m(schema, key); m.partition().apply_row_tombstone(*schema, std::move(rt)); co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no); } }); blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed); }); co_return all_replayed; } future db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) { if (_fs.batchlog_v2) { return replay_all_failed_batches_v2(cleanup); } return replay_all_failed_batches_v1(cleanup); }