/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #undef SEASTAR_TESTING_MAIN #include #include "test/lib/cql_assertions.hh" #include "test/lib/cql_test_env.hh" #include "test/lib/error_injection.hh" #include "test/lib/log.hh" #include #include #include "cql3/statements/batch_statement.hh" #include "cql3/query_processor.hh" #include "cql3/untyped_result_set.hh" #include "db/batchlog.hh" #include "db/batchlog_manager.hh" #include "db/commitlog/commitlog.hh" #include "db/config.hh" #include "idl/frozen_schema.dist.hh" #include "idl/frozen_schema.dist.impl.hh" #include "message/messaging_service.hh" #include "service/storage_proxy.hh" #include "utils/rjson.hh" BOOST_AUTO_TEST_SUITE(batchlog_manager_test) static atomic_cell make_atomic_cell(data_type dt, bytes value) { return atomic_cell::make_live(*dt, 0, std::move(value)); }; SEASTAR_TEST_CASE(test_execute_batch) { return do_with_cql_env([] (auto& e) { auto& qp = e.local_qp(); auto& bp = e.batchlog_manager().local(); return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, &bp] () mutable { auto& db = e.local_db(); auto s = db.find_schema("ks", "cf"); const auto batchlog_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const column_definition& r1_col = *s->get_column_definition("r1"); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)}); mutation m(s, key); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(100))); using namespace std::chrono_literals; auto version = netw::messaging_service::current_version; auto bm = db::get_batchlog_mutation_for(batchlog_schema, { m }, version, db_clock::now() - db_clock::duration(3h), s->id().uuid()); return qp.proxy().mutate_locally(bm, tracing::trace_state_ptr(), db::commitlog::force_sync::no).then([&bp] () mutable { return bp.count_all_batches().then([](auto n) { BOOST_CHECK_EQUAL(n, 1); }).then([&bp] () mutable { return bp.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).discard_result(); }); }); }).then([&qp] { return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }, cql3::query_processor::cache_internal::yes).then([](auto rs) { BOOST_REQUIRE(!rs->empty()); auto i = rs->one().template get_as("r1"); BOOST_CHECK_EQUAL(i, int32_t(100)); }); }); }); } namespace { struct fragment { position_in_partition pos; bool is_range_tombstone; bool is_live; fragment(position_in_partition p, bool is_rt, bool is_live) : pos(std::move(p)), is_range_tombstone(is_rt), is_live(is_live) {} class less_comparator { schema_ptr _s; public: explicit less_comparator(schema_ptr s) : _s(std::move(s)) {} bool operator()(const fragment& a, const fragment& b) const { position_in_partition::less_compare cmp{*_s}; return cmp(a.pos, b.pos); } }; }; struct batchlog_as_fragments { using fragment_set = std::set; std::unordered_map initial_fragments_per_shard; std::unordered_map failed_replay_fragments_per_shard; }; position_in_partition parse_batchlog_position(const schema& s, const cql3::untyped_result_set::row& row) { std::vector ck_components; ck_components.reserve(2); for (const auto ck_component : {"written_at", "id"}) { if (!row.has(ck_component) || row.get_blob_fragmented(ck_component).empty()) { // ck is prefix break; } ck_components.push_back(row.get_blob_fragmented(ck_component)); } return position_in_partition( static_cast(row.get_as("partition_region")), static_cast(row.get_as("position_weight")), clustering_key_prefix::from_exploded(s, ck_components)); } batchlog_as_fragments extract_batchlog_fragments(const cql3::untyped_result_set& fragment_results, schema_ptr batchlog_v2_schema) { auto is_live = [] (std::optional value) { // no value can be stored as null or empty json object ("{}") return value && value->size() > 2; }; batchlog_as_fragments result; for (const auto& row : fragment_results) { const auto stage = static_cast(row.get_as("stage")); const auto shard = row.get_as("shard"); auto& fragments_per_shard = stage == db::batchlog_stage::initial ? result.initial_fragments_per_shard : result.failed_replay_fragments_per_shard; const auto is_rtc = row.get_as("mutation_fragment_kind") == "range tombstone change"; auto pos = parse_batchlog_position(*batchlog_v2_schema, row); BOOST_REQUIRE_EQUAL(is_rtc, (pos.get_bound_weight() != bound_weight::equal)); testlog.info("[stage {}, bathlog shard {}] fragment: pos={}, kind={}, live={}", int8_t(stage), shard, pos, row.get_as("mutation_fragment_kind"), is_live(row.get_opt("value"))); auto& fragments = fragments_per_shard.try_emplace(shard, batchlog_as_fragments::fragment_set(fragment::less_comparator(batchlog_v2_schema))).first->second; fragments.emplace(pos, is_rtc, !is_rtc && is_live(row.get_opt("value"))); } return result; } void check_range_tombstone_start(const fragment& f) { BOOST_REQUIRE(f.is_range_tombstone); BOOST_REQUIRE(f.pos.region() == partition_region::clustered); BOOST_REQUIRE(f.pos.has_key()); BOOST_REQUIRE(f.pos.key().explode().empty()); BOOST_REQUIRE(f.pos.get_bound_weight() == bound_weight::before_all_prefixed); } void check_range_tombstone_end(const fragment& f, std::optional bound_weight_opt = {}) { BOOST_REQUIRE(f.is_range_tombstone); BOOST_REQUIRE(f.pos.region() == partition_region::clustered); BOOST_REQUIRE(f.pos.has_key()); BOOST_REQUIRE_EQUAL(f.pos.key().explode().size(), 1); if (bound_weight_opt) { BOOST_REQUIRE(f.pos.get_bound_weight() == *bound_weight_opt); } } uint64_t prepare_batches(cql_test_env& env, std::string_view batchlog_table_name, uint64_t batch_count, bool replay_fails, db::batchlog_manager::post_replay_cleanup cleanup) { const bool is_v1 = batchlog_table_name == db::system_keyspace::BATCHLOG; uint64_t failed_batches = 0; auto& bm = env.batchlog_manager().local(); env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get(); for (uint64_t i = 0; i != batch_count; ++i) { std::vector queries; std::vector query_views; for (uint64_t j = 0; j != i+2; ++j) { queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j)); query_views.emplace_back(queries.back()); } const bool fail = i % 2; bool injected_exception_thrown = false; std::optional error_injection; if (fail) { ++failed_batches; error_injection.emplace("storage_proxy_fail_send_batch"); } try { env.execute_batch( query_views, cql3::statements::batch_statement::type::LOGGED, std::make_unique(db::consistency_level::ONE, std::vector())).get(); } catch (std::runtime_error& ex) { if (fail) { BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch"); injected_exception_thrown = true; } else { throw; } } BOOST_REQUIRE_EQUAL(injected_exception_thrown, fail); } // v1 (system.batchlog) is partition-oriented, while v2 (system.batchlog_v2) is row oriented. We need to switch the partition-region filter accordingly. const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = {} ALLOW FILTERING", db::system_keyspace::NAME, batchlog_table_name, is_v1 ? 0 : 2); assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get()) .is_rows() .with_size(failed_batches); assert_that(env.execute_cql(fragments_query).get()) .is_rows(tests::dump_to_logs::yes) .with_size(batch_count) .assert_for_columns_of_each_row([&] (columns_assertions& columns) { columns.with_typed_column("mutation_source", "memtable:0"); }); std::optional error_injection; if (replay_fails) { error_injection.emplace("storage_proxy_fail_replay_batch"); } bm.do_batch_log_replay(cleanup).get(); assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get()) .is_rows(tests::dump_to_logs::yes) .with_size(replay_fails ? failed_batches : 0); return failed_batches; } } // anonymous namespace future<> run_batchlog_v1_cleanup_with_failed_batches_test(bool replay_fails) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION return make_ready_future<>(); #endif cql_test_config cfg; cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal); cfg.batchlog_replay_timeout = 0s; cfg.batchlog_delay = 9999h; cfg.disabled_features.insert("BATCHLOG_V2"); return do_with_cql_env_thread([=] (cql_test_env& env) -> void { const uint64_t batch_count = 8; const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG, batch_count, replay_fails, db::batchlog_manager::post_replay_cleanup::no); const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 0 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG); const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get()); const auto batchlog_v1_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG); size_t live{0}; size_t dead{0}; for (const auto& row : fragment_results) { const auto metadata = row.get_as("metadata"); auto metadata_json = rjson::parse(metadata); if (metadata_json.HasMember("tombstone") && metadata_json["tombstone"].IsObject() && metadata_json["tombstone"].HasMember("deletion_time")) { ++dead; } else { ++live; } } if (replay_fails) { BOOST_REQUIRE_EQUAL(failed_batches, live); } else { BOOST_REQUIRE_EQUAL(0, live); } BOOST_REQUIRE_EQUAL(batch_count, dead + live); }, cfg); } SEASTAR_TEST_CASE(test_batchlog_v1_replay_fails) { return run_batchlog_v1_cleanup_with_failed_batches_test(true); } SEASTAR_TEST_CASE(test_batchlog_v1_replay) { return run_batchlog_v1_cleanup_with_failed_batches_test(false); } future<> run_batchlog_cleanup_with_failed_batches_test(bool replay_fails, db::batchlog_manager::post_replay_cleanup cleanup) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION return make_ready_future<>(); #endif cql_test_config cfg; cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal); cfg.batchlog_replay_timeout = 0s; cfg.batchlog_delay = 9999h; return do_with_cql_env_thread([=] (cql_test_env& env) -> void { const uint64_t batch_count = 8; const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG_V2, batch_count, replay_fails, cleanup); const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get()); const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments( cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema); if (cleanup) { size_t initial_rows{}; for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) { // some batchlog shards can be empty, just ignore if (batchlog_shard_fragments.empty()) { continue; } testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard); size_t rts{}, rows{}; for (const auto& fragment : batchlog_shard_fragments) { rts += fragment.is_range_tombstone; rows += !fragment.is_range_tombstone; BOOST_REQUIRE(!fragment.is_live); } // cleanup affects only batchlog shards which contributed to replay if (rts) { BOOST_REQUIRE_EQUAL(rts, 2); check_range_tombstone_start(*batchlog_shard_fragments.begin()); check_range_tombstone_end(*batchlog_shard_fragments.rbegin(), bound_weight::after_all_prefixed); } initial_rows += rows; } // some of the initial fragments could have been garbage collected after cleanup (shadowed by range tombstone) // this happens in the background so we can have up to total_batches rows BOOST_REQUIRE_LE(initial_rows, batch_count); if (replay_fails) { size_t failed_replay_rows{}; for (const auto& [batchlog_shard, batchlog_shard_fragments] : failed_replay_fragments_per_shard) { for (const auto& fragment : batchlog_shard_fragments) { BOOST_REQUIRE(!fragment.is_range_tombstone); BOOST_REQUIRE(fragment.is_live); ++failed_replay_rows; } } BOOST_REQUIRE_EQUAL(failed_replay_rows, failed_batches); } else { BOOST_REQUIRE(failed_replay_fragments_per_shard.empty()); } } else { size_t live{}, dead{}; BOOST_REQUIRE(failed_replay_fragments_per_shard.empty()); for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) { for (const auto& fragment : batchlog_shard_fragments) { BOOST_REQUIRE(!fragment.is_range_tombstone); if (fragment.is_live) { ++live; } else { ++dead; } } } const auto total = live + dead; BOOST_REQUIRE_EQUAL(total, batch_count); if (replay_fails) { BOOST_REQUIRE_EQUAL(live, failed_batches); } else { BOOST_REQUIRE_EQUAL(dead, total); } } }, cfg); } SEASTAR_TEST_CASE(test_batchlog_replay_fails_no_cleanup) { return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::no); } SEASTAR_TEST_CASE(test_batchlog_replay_fails_with_cleanup) { return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::yes); } SEASTAR_TEST_CASE(test_batchlog_replay_no_cleanup) { return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::no); } SEASTAR_TEST_CASE(test_batchlog_replay_with_cleanup) { return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::yes); } SEASTAR_TEST_CASE(test_batchlog_replay_stage) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION return make_ready_future<>(); #endif cql_test_config cfg; cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal); cfg.batchlog_replay_timeout = 0s; cfg.batchlog_delay = 9999h; return do_with_cql_env_thread([] (cql_test_env& env) -> void { auto& bm = env.batchlog_manager().local(); env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get(); const uint64_t batch_count = 8; const auto shard_count = 256; { scoped_error_injection error_injection("storage_proxy_fail_send_batch"); for (uint64_t i = 0; i != batch_count; ++i) { std::vector queries; std::vector query_views; for (uint64_t j = 0; j != i+2; ++j) { queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j)); query_views.emplace_back(queries.back()); } try { env.execute_batch( query_views, cql3::statements::batch_statement::type::LOGGED, std::make_unique(db::consistency_level::ONE, std::vector())).get(); } catch (std::runtime_error& ex) { BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch"); } } } // Ensure select ... where write_time < write_time_limit (=now) picks up all batches. sleep(2ms).get(); const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); std::set ids; std::set written_ats; assert_that(env.execute_cql(batchlog_query).get()) .is_rows() .with_size(batch_count) .assert_for_columns_of_each_row([&] (columns_assertions& columns) { columns.with_typed_column("version", netw::messaging_service::current_version) .with_typed_column("stage", int8_t(db::batchlog_stage::initial)) .with_typed_column("written_at", [&] (db_clock::time_point written_at) { written_ats.insert(written_at); return true; }) .with_typed_column("id", [&] (utils::UUID id) { ids.insert(id); return true; }); }); BOOST_REQUIRE_EQUAL(ids.size(), batch_count); BOOST_REQUIRE_LE(written_ats.size(), batch_count); auto do_replays = [&] (db::batchlog_manager::post_replay_cleanup cleanup) { for (unsigned i = 0; i < 3; ++i) { testlog.info("Replay attempt [cleanup={}] #{} - batches should be in failed_replay stage", cleanup, i); bm.do_batch_log_replay(cleanup).get(); assert_that(env.execute_cql(batchlog_query).get()) .is_rows() .with_size(batch_count) .assert_for_columns_of_each_row([&] (columns_assertions& columns) { columns.with_typed_column("version", netw::messaging_service::current_version) .with_typed_column("stage", [&] (int8_t stage) { // (0) cleanup::no == db::batchlog_stage::initial // (1) cleanup::yes == db::batchlog_stage::failed_replay return stage == int8_t(bool(cleanup)); }) .with_typed_column("written_at", [&] (db_clock::time_point written_at) { return written_ats.contains(written_at); }) .with_typed_column("id", [&] (utils::UUID id) { return ids.contains(id); }); }); auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get()); if (cleanup) { // Each shard has a range tombstone (shard_count * 2 range tombstone changes) // There should be batch_count clustering rows, with stage=1 // There may be [0, batch_count] clustering rows with stage=0 BOOST_REQUIRE_GT(fragment_results.size(), batch_count); BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2); } else { BOOST_REQUIRE_EQUAL(fragment_results.size(), batch_count); // only clustering rows } for (const auto& row : fragment_results) { if (row.get_as("mutation_fragment_kind") == "range tombstone change") { continue; } const auto id = row.get_as("id"); const auto stage = row.get_as("stage"); testlog.trace("Processing row for batch id={}, stage={}: ", id, stage); BOOST_REQUIRE_EQUAL(row.get_as("version"), netw::messaging_service::current_version); BOOST_REQUIRE(ids.contains(id)); const auto metadata = row.get_as("metadata"); auto metadata_json = rjson::parse(metadata); BOOST_REQUIRE(metadata_json.IsObject()); if (!cleanup || stage == int8_t(db::batchlog_stage::failed_replay)) { BOOST_REQUIRE_NE(row.get_as("value"), "{}"); BOOST_REQUIRE(!metadata_json.HasMember("tombstone")); const auto value_json = rjson::parse(row.get_as("value")); BOOST_REQUIRE(value_json.IsObject()); BOOST_REQUIRE(value_json.HasMember("data")); } else if (stage == int8_t(db::batchlog_stage::initial)) { BOOST_REQUIRE_EQUAL(row.get_as("value"), "{}"); // row should be dead -- data column shadowed by tombstone if (!cleanup) { BOOST_REQUIRE(metadata_json.HasMember("tombstone")); } } else { BOOST_FAIL(format("Unexpected stage: {}", stage)); } } } }; { scoped_error_injection error_injection("storage_proxy_fail_replay_batch"); do_replays(db::batchlog_manager::post_replay_cleanup::no); do_replays(db::batchlog_manager::post_replay_cleanup::yes); } testlog.info("Successful replay - should remove all batches"); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get(); assert_that(env.execute_cql(batchlog_query).get()) .is_rows() .is_empty(); const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get()); // Each shard can have a range tombstone (shard_count * 2 range tombstone changes) // There should be batch_count clustering rows, with stage=1 // There may be [0, batch_count] clustering rows with stage=0 BOOST_REQUIRE_GT(fragment_results.size(), batch_count); BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2); for (const auto& row : fragment_results) { if (row.get_as("mutation_fragment_kind") == "range tombstone change") { BOOST_REQUIRE_EQUAL(row.get_as("stage"), int8_t(db::batchlog_stage::initial)); continue; } BOOST_REQUIRE_EQUAL(row.get_as("version"), netw::messaging_service::current_version); BOOST_REQUIRE(written_ats.contains(row.get_as("written_at"))); BOOST_REQUIRE(ids.contains(row.get_as("id"))); BOOST_REQUIRE_EQUAL(row.get_as("value"), "{}"); } }, cfg); } SEASTAR_TEST_CASE(test_batchlog_migrate_v1_v2) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION return make_ready_future<>(); #endif const auto batch_replay_timeout = 1s; cql_test_config cfg; cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal); cfg.batchlog_replay_timeout = batch_replay_timeout; cfg.batchlog_delay = 9999h; return do_with_cql_env_thread([batch_replay_timeout] (cql_test_env& env) -> void { auto& bm = env.batchlog_manager().local(); env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get(); auto& sp = env.get_storage_proxy().local(); auto& db = env.local_db(); auto& tbl = db.find_column_family("ks", "tbl"); auto tbl_schema = tbl.schema(); auto cdef_tbl_v = tbl_schema->get_column_definition(to_bytes("v")); auto batchlog_v1_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG); auto cdef_batchlog_v1_data = batchlog_v1_schema->get_column_definition(to_bytes("data")); const uint64_t batch_count = 8; struct batchlog { utils::UUID id; int32_t version; db_clock::time_point written_at; managed_bytes data; }; std::map batchlogs; for (int64_t i = 0; i != batch_count; ++i) { bytes_ostream batchlog_data; for (int64_t j = 0; j != i+2; ++j) { auto key = partition_key::from_single_value(*tbl_schema, serialized(j)); mutation m(tbl_schema, key); m.set_clustered_cell(clustering_key::make_empty(), *cdef_tbl_v, make_atomic_cell(utf8_type, serialized("value"))); ser::serialize(batchlog_data, canonical_mutation(m)); } const auto id = utils::UUID_gen::get_time_UUID(); auto [it, _] = batchlogs.emplace(id, batchlog{ .id = id, .version = netw::messaging_service::current_version, .written_at = db_clock::now() - batch_replay_timeout * 10, .data = std::move(batchlog_data).to_managed_bytes()}); auto& batch = it->second; const auto timestamp = api::new_timestamp(); mutation m(batchlog_v1_schema, partition_key::from_single_value(*batchlog_v1_schema, serialized(batch.id))); m.set_cell(clustering_key_prefix::make_empty(), to_bytes("version"), batch.version, timestamp); m.set_cell(clustering_key_prefix::make_empty(), to_bytes("written_at"), batch.written_at, timestamp); m.set_cell(clustering_key_prefix::make_empty(), *cdef_batchlog_v1_data, atomic_cell::make_live(*cdef_batchlog_v1_data->type, timestamp, std::move(batch.data))); sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no).get(); } const auto batchlog_v1_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG); const auto batchlog_v2_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); // Initial state, all entries are in the v1 table. assert_that(env.execute_cql(batchlog_v1_query).get()) .is_rows() .with_size(batch_count); assert_that(env.execute_cql(batchlog_v2_query).get()) .is_rows() .is_empty(); { scoped_error_injection error_injection("batchlog_manager_fail_migration"); testlog.info("First replay - migration should fail, all entries stay in v1 table"); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get(); } assert_that(env.execute_cql(batchlog_v1_query).get()) .is_rows() .with_size(batch_count); assert_that(env.execute_cql(batchlog_v2_query).get()) .is_rows() .is_empty(); { scoped_error_injection error_injection("storage_proxy_fail_replay_batch"); testlog.info("Second replay - migration should run again and succeed, but replay of migrated entries should fail, so they should remain in the v2 table."); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get(); } assert_that(env.execute_cql(batchlog_v1_query).get()) .is_rows() .is_empty(); auto results = cql3::untyped_result_set(env.execute_cql(batchlog_v2_query).get()); BOOST_REQUIRE_EQUAL(results.size(), batch_count); std::set migrated_batchlog_ids; for (const auto& row : results) { BOOST_REQUIRE_EQUAL(row.get_as("stage"), int8_t(db::batchlog_stage::failed_replay)); const auto id = row.get_as("id"); auto it = batchlogs.find(id); BOOST_REQUIRE(it != batchlogs.end()); const auto& batch = it->second; BOOST_REQUIRE_EQUAL(row.get_as("version"), batch.version); BOOST_REQUIRE(row.get_as("written_at") == batch.written_at); BOOST_REQUIRE_EQUAL(row.get_blob_fragmented("data"), batch.data); migrated_batchlog_ids.emplace(id); } BOOST_REQUIRE_EQUAL(batchlogs.size(), migrated_batchlog_ids.size()); testlog.info("Third replay - migration is already done, replay of migrated entries should succeed, v2 table should be empty afterwards."); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get(); assert_that(env.execute_cql(batchlog_v2_query).get()) .is_rows() .is_empty(); }, cfg); } SEASTAR_TEST_CASE(test_batchlog_replay_write_time) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION return make_ready_future<>(); #endif cql_test_config cfg; cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal); cfg.batchlog_replay_timeout = 1h; cfg.batchlog_delay = 9999h; return do_with_cql_env_thread([ks = get_name()] (cql_test_env& env) -> void { auto& bm = env.batchlog_manager().local(); env.execute_cql(format("CREATE KEYSPACE {} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 2}} AND tablets = {{'enabled': 'false'}}", ks)).get(); env.execute_cql(format("CREATE TABLE {}.tbl1 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 1}}", ks)).get(); env.execute_cql(format("CREATE TABLE {}.tbl2 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'timeout'}}", ks)).get(); const uint64_t batch_count = 8; const uint64_t mutations_per_batch = 2; { scoped_error_injection error_injection("storage_proxy_fail_send_batch"); for (uint64_t i = 0; i != batch_count; ++i) { std::vector queries; std::vector query_views; if (i % 2) { for (const auto& tbl_name : {"tbl1", "tbl2"}) { queries.emplace_back(format("INSERT INTO {}.{} (pk, v) VALUES (0, 'value');", ks, tbl_name, i)); query_views.emplace_back(queries.back()); } } else { for (uint64_t j = 0; j != mutations_per_batch; ++j) { queries.emplace_back(format("INSERT INTO {}.tbl2 (pk, v) VALUES ({}, 'value');", ks, i * 2 + j)); query_views.emplace_back(queries.back()); } } try { env.execute_batch( query_views, cql3::statements::batch_statement::type::LOGGED, std::make_unique(db::consistency_level::ONE, std::vector())).get(); } catch (std::runtime_error& ex) { BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch"); } } } const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); assert_that(env.execute_cql(batchlog_query).get()) .is_rows() .with_size(batch_count); auto get_write_attempts = [&] () -> uint64_t { return env.batchlog_manager().map_reduce0([] (const db::batchlog_manager& bm) { return bm.get_stats().write_attempts; }, uint64_t(0), std::plus{}).get(); }; BOOST_REQUIRE_EQUAL(get_write_attempts(), 0); // We need this sleep here to make sure all batches are older than propagation delay of 1s. sleep(2s).get(); { scoped_error_injection error_injection("storage_proxy_fail_replay_batch"); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get(); // Half the batches are skipped due to being too fresh BOOST_REQUIRE_EQUAL(get_write_attempts(), (batch_count / 2) * mutations_per_batch); auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get()); BOOST_REQUIRE_EQUAL(results.size(), batch_count); for (const auto& row : results) { BOOST_REQUIRE_EQUAL(row.get_as("stage"), int8_t(db::batchlog_stage::initial)); } } { scoped_error_injection error_injection("storage_proxy_fail_replay_batch"); bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).get(); // Half the batches are skipped (again) due to being too fresh BOOST_REQUIRE_EQUAL(get_write_attempts(), batch_count * mutations_per_batch); auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get()); BOOST_REQUIRE_EQUAL(results.size(), batch_count); size_t initial = 0; size_t failed_replay = 0; for (const auto& row : results) { const auto stage = static_cast(row.get_as("stage")); switch (stage) { case db::batchlog_stage::initial: ++initial; break; case db::batchlog_stage::failed_replay: ++failed_replay; break; default: BOOST_FAIL(format("Unexpected stage: {}", int8_t(stage))); } } BOOST_REQUIRE_EQUAL(initial, batch_count / 2); BOOST_REQUIRE_EQUAL(failed_replay, batch_count / 2); } const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2); const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments( cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema); for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) { if (batchlog_shard_fragments.empty()) { continue; } testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard); position_in_partition::less_compare less_cmp{*batchlog_v2_schema}; size_t rts = 0; auto min_live_pos = position_in_partition::after_all_clustered_rows(); for (const auto& f : batchlog_shard_fragments) { if (f.is_range_tombstone) { ++rts; continue; } if (!f.is_live) { continue; } min_live_pos = std::min(min_live_pos, f.pos, less_cmp); } BOOST_REQUIRE(rts == 0 || rts == 2); if (!rts) { continue; } check_range_tombstone_start(*batchlog_shard_fragments.begin()); bool found_range_tombstone_end = false; for (auto it = std::next(batchlog_shard_fragments.begin()); it != batchlog_shard_fragments.end(); ++it) { if (it->is_range_tombstone) { BOOST_REQUIRE(!std::exchange(found_range_tombstone_end, true)); check_range_tombstone_end(*it); BOOST_REQUIRE(less_cmp(it->pos, min_live_pos)); } } } }, cfg); } BOOST_AUTO_TEST_SUITE_END()