Files
scylladb/test/boost/batchlog_manager_test.cc
Botond Dénes 51a25c8af3 test/boost/batchlog_manager_test: add tests for v1 batchlog
The v1 table is used while upgrading from a pre-v2 version. We need
tests to ensure it still works.
2026-02-20 07:03:46 +02:00

858 lines
38 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <stdint.h>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_assertions.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/error_injection.hh"
#include "test/lib/log.hh"
#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include "cql3/statements/batch_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/batchlog.hh"
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "db/config.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "message/messaging_service.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
BOOST_AUTO_TEST_SUITE(batchlog_manager_test)
static atomic_cell make_atomic_cell(data_type dt, bytes value) {
return atomic_cell::make_live(*dt, 0, std::move(value));
};
SEASTAR_TEST_CASE(test_execute_batch) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
auto& bp = e.batchlog_manager().local();
return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, &bp] () mutable {
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
const auto batchlog_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(100)));
using namespace std::chrono_literals;
auto version = netw::messaging_service::current_version;
auto bm = db::get_batchlog_mutation_for(batchlog_schema, { m }, version, db_clock::now() - db_clock::duration(3h), s->id().uuid());
return qp.proxy().mutate_locally(bm, tracing::trace_state_ptr(), db::commitlog::force_sync::no).then([&bp] () mutable {
return bp.count_all_batches().then([](auto n) {
BOOST_CHECK_EQUAL(n, 1);
}).then([&bp] () mutable {
return bp.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).discard_result();
});
});
}).then([&qp] {
return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }, cql3::query_processor::cache_internal::yes).then([](auto rs) {
BOOST_REQUIRE(!rs->empty());
auto i = rs->one().template get_as<int32_t>("r1");
BOOST_CHECK_EQUAL(i, int32_t(100));
});
});
});
}
namespace {
struct fragment {
position_in_partition pos;
bool is_range_tombstone;
bool is_live;
fragment(position_in_partition p, bool is_rt, bool is_live)
: pos(std::move(p)), is_range_tombstone(is_rt), is_live(is_live)
{}
class less_comparator {
schema_ptr _s;
public:
explicit less_comparator(schema_ptr s) : _s(std::move(s)) {}
bool operator()(const fragment& a, const fragment& b) const {
position_in_partition::less_compare cmp{*_s};
return cmp(a.pos, b.pos);
}
};
};
struct batchlog_as_fragments {
using fragment_set = std::set<fragment, fragment::less_comparator>;
std::unordered_map<int32_t, fragment_set> initial_fragments_per_shard;
std::unordered_map<int32_t, fragment_set> failed_replay_fragments_per_shard;
};
position_in_partition parse_batchlog_position(const schema& s, const cql3::untyped_result_set::row& row) {
std::vector<managed_bytes> ck_components;
ck_components.reserve(2);
for (const auto ck_component : {"written_at", "id"}) {
if (!row.has(ck_component) || row.get_blob_fragmented(ck_component).empty()) {
// ck is prefix
break;
}
ck_components.push_back(row.get_blob_fragmented(ck_component));
}
return position_in_partition(
static_cast<partition_region>(row.get_as<int8_t>("partition_region")),
static_cast<bound_weight>(row.get_as<int8_t>("position_weight")),
clustering_key_prefix::from_exploded(s, ck_components));
}
batchlog_as_fragments extract_batchlog_fragments(const cql3::untyped_result_set& fragment_results, schema_ptr batchlog_v2_schema) {
auto is_live = [] (std::optional<sstring> value) {
// no value can be stored as null or empty json object ("{}")
return value && value->size() > 2;
};
batchlog_as_fragments result;
for (const auto& row : fragment_results) {
const auto stage = static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
const auto shard = row.get_as<int32_t>("shard");
auto& fragments_per_shard = stage == db::batchlog_stage::initial
? result.initial_fragments_per_shard
: result.failed_replay_fragments_per_shard;
const auto is_rtc = row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change";
auto pos = parse_batchlog_position(*batchlog_v2_schema, row);
BOOST_REQUIRE_EQUAL(is_rtc, (pos.get_bound_weight() != bound_weight::equal));
testlog.info("[stage {}, bathlog shard {}] fragment: pos={}, kind={}, live={}", int8_t(stage), shard, pos, row.get_as<sstring>("mutation_fragment_kind"), is_live(row.get_opt<sstring>("value")));
auto& fragments = fragments_per_shard.try_emplace(shard, batchlog_as_fragments::fragment_set(fragment::less_comparator(batchlog_v2_schema))).first->second;
fragments.emplace(pos, is_rtc, !is_rtc && is_live(row.get_opt<sstring>("value")));
}
return result;
}
void check_range_tombstone_start(const fragment& f) {
BOOST_REQUIRE(f.is_range_tombstone);
BOOST_REQUIRE(f.pos.region() == partition_region::clustered);
BOOST_REQUIRE(f.pos.has_key());
BOOST_REQUIRE(f.pos.key().explode().empty());
BOOST_REQUIRE(f.pos.get_bound_weight() == bound_weight::before_all_prefixed);
}
void check_range_tombstone_end(const fragment& f, std::optional<bound_weight> bound_weight_opt = {}) {
BOOST_REQUIRE(f.is_range_tombstone);
BOOST_REQUIRE(f.pos.region() == partition_region::clustered);
BOOST_REQUIRE(f.pos.has_key());
BOOST_REQUIRE_EQUAL(f.pos.key().explode().size(), 1);
if (bound_weight_opt) {
BOOST_REQUIRE(f.pos.get_bound_weight() == *bound_weight_opt);
}
}
uint64_t prepare_batches(cql_test_env& env, std::string_view batchlog_table_name, uint64_t batch_count, bool replay_fails,
db::batchlog_manager::post_replay_cleanup cleanup) {
const bool is_v1 = batchlog_table_name == db::system_keyspace::BATCHLOG;
uint64_t failed_batches = 0;
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
for (uint64_t j = 0; j != i+2; ++j) {
queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j));
query_views.emplace_back(queries.back());
}
const bool fail = i % 2;
bool injected_exception_thrown = false;
std::optional<scoped_error_injection> error_injection;
if (fail) {
++failed_batches;
error_injection.emplace("storage_proxy_fail_send_batch");
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
if (fail) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
injected_exception_thrown = true;
} else {
throw;
}
}
BOOST_REQUIRE_EQUAL(injected_exception_thrown, fail);
}
// v1 (system.batchlog) is partition-oriented, while v2 (system.batchlog_v2) is row oriented. We need to switch the partition-region filter accordingly.
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = {} ALLOW FILTERING", db::system_keyspace::NAME, batchlog_table_name, is_v1 ? 0 : 2);
assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get())
.is_rows()
.with_size(failed_batches);
assert_that(env.execute_cql(fragments_query).get())
.is_rows(tests::dump_to_logs::yes)
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<sstring>("mutation_source", "memtable:0");
});
std::optional<scoped_error_injection> error_injection;
if (replay_fails) {
error_injection.emplace("storage_proxy_fail_replay_batch");
}
bm.do_batch_log_replay(cleanup).get();
assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get())
.is_rows(tests::dump_to_logs::yes)
.with_size(replay_fails ? failed_batches : 0);
return failed_batches;
}
} // anonymous namespace
future<> run_batchlog_v1_cleanup_with_failed_batches_test(bool replay_fails) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
cfg.disabled_features.insert("BATCHLOG_V2");
return do_with_cql_env_thread([=] (cql_test_env& env) -> void {
const uint64_t batch_count = 8;
const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG, batch_count, replay_fails, db::batchlog_manager::post_replay_cleanup::no);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 0 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
const auto batchlog_v1_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
size_t live{0};
size_t dead{0};
for (const auto& row : fragment_results) {
const auto metadata = row.get_as<sstring>("metadata");
auto metadata_json = rjson::parse(metadata);
if (metadata_json.HasMember("tombstone") && metadata_json["tombstone"].IsObject() && metadata_json["tombstone"].HasMember("deletion_time")) {
++dead;
} else {
++live;
}
}
if (replay_fails) {
BOOST_REQUIRE_EQUAL(failed_batches, live);
} else {
BOOST_REQUIRE_EQUAL(0, live);
}
BOOST_REQUIRE_EQUAL(batch_count, dead + live);
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_v1_replay_fails) {
return run_batchlog_v1_cleanup_with_failed_batches_test(true);
}
SEASTAR_TEST_CASE(test_batchlog_v1_replay) {
return run_batchlog_v1_cleanup_with_failed_batches_test(false);
}
future<> run_batchlog_cleanup_with_failed_batches_test(bool replay_fails, db::batchlog_manager::post_replay_cleanup cleanup) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([=] (cql_test_env& env) -> void {
const uint64_t batch_count = 8;
const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG_V2, batch_count, replay_fails, cleanup);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments(
cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema);
if (cleanup) {
size_t initial_rows{};
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
// some batchlog shards can be empty, just ignore
if (batchlog_shard_fragments.empty()) {
continue;
}
testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard);
size_t rts{}, rows{};
for (const auto& fragment : batchlog_shard_fragments) {
rts += fragment.is_range_tombstone;
rows += !fragment.is_range_tombstone;
BOOST_REQUIRE(!fragment.is_live);
}
// cleanup affects only batchlog shards which contributed to replay
if (rts) {
BOOST_REQUIRE_EQUAL(rts, 2);
check_range_tombstone_start(*batchlog_shard_fragments.begin());
check_range_tombstone_end(*batchlog_shard_fragments.rbegin(), bound_weight::after_all_prefixed);
}
initial_rows += rows;
}
// some of the initial fragments could have been garbage collected after cleanup (shadowed by range tombstone)
// this happens in the background so we can have up to total_batches rows
BOOST_REQUIRE_LE(initial_rows, batch_count);
if (replay_fails) {
size_t failed_replay_rows{};
for (const auto& [batchlog_shard, batchlog_shard_fragments] : failed_replay_fragments_per_shard) {
for (const auto& fragment : batchlog_shard_fragments) {
BOOST_REQUIRE(!fragment.is_range_tombstone);
BOOST_REQUIRE(fragment.is_live);
++failed_replay_rows;
}
}
BOOST_REQUIRE_EQUAL(failed_replay_rows, failed_batches);
} else {
BOOST_REQUIRE(failed_replay_fragments_per_shard.empty());
}
} else {
size_t live{}, dead{};
BOOST_REQUIRE(failed_replay_fragments_per_shard.empty());
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
for (const auto& fragment : batchlog_shard_fragments) {
BOOST_REQUIRE(!fragment.is_range_tombstone);
if (fragment.is_live) {
++live;
} else {
++dead;
}
}
}
const auto total = live + dead;
BOOST_REQUIRE_EQUAL(total, batch_count);
if (replay_fails) {
BOOST_REQUIRE_EQUAL(live, failed_batches);
} else {
BOOST_REQUIRE_EQUAL(dead, total);
}
}
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_replay_fails_no_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::no);
}
SEASTAR_TEST_CASE(test_batchlog_replay_fails_with_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::yes);
}
SEASTAR_TEST_CASE(test_batchlog_replay_no_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::no);
}
SEASTAR_TEST_CASE(test_batchlog_replay_with_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::yes);
}
SEASTAR_TEST_CASE(test_batchlog_replay_stage) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
const uint64_t batch_count = 8;
const auto shard_count = 256;
{
scoped_error_injection error_injection("storage_proxy_fail_send_batch");
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
for (uint64_t j = 0; j != i+2; ++j) {
queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j));
query_views.emplace_back(queries.back());
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
}
}
}
// Ensure select ... where write_time < write_time_limit (=now) picks up all batches.
sleep(2ms).get();
const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
std::set<utils::UUID> ids;
std::set<db_clock::time_point> written_ats;
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<int32_t>("version", netw::messaging_service::current_version)
.with_typed_column<int8_t>("stage", int8_t(db::batchlog_stage::initial))
.with_typed_column<db_clock::time_point>("written_at", [&] (db_clock::time_point written_at) {
written_ats.insert(written_at);
return true;
})
.with_typed_column<utils::UUID>("id", [&] (utils::UUID id) {
ids.insert(id);
return true;
});
});
BOOST_REQUIRE_EQUAL(ids.size(), batch_count);
BOOST_REQUIRE_LE(written_ats.size(), batch_count);
auto do_replays = [&] (db::batchlog_manager::post_replay_cleanup cleanup) {
for (unsigned i = 0; i < 3; ++i) {
testlog.info("Replay attempt [cleanup={}] #{} - batches should be in failed_replay stage", cleanup, i);
bm.do_batch_log_replay(cleanup).get();
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<int32_t>("version", netw::messaging_service::current_version)
.with_typed_column<int8_t>("stage", [&] (int8_t stage) {
// (0) cleanup::no == db::batchlog_stage::initial
// (1) cleanup::yes == db::batchlog_stage::failed_replay
return stage == int8_t(bool(cleanup));
})
.with_typed_column<db_clock::time_point>("written_at", [&] (db_clock::time_point written_at) {
return written_ats.contains(written_at);
})
.with_typed_column<utils::UUID>("id", [&] (utils::UUID id) {
return ids.contains(id);
});
});
auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
if (cleanup) {
// Each shard has a range tombstone (shard_count * 2 range tombstone changes)
// There should be batch_count clustering rows, with stage=1
// There may be [0, batch_count] clustering rows with stage=0
BOOST_REQUIRE_GT(fragment_results.size(), batch_count);
BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2);
} else {
BOOST_REQUIRE_EQUAL(fragment_results.size(), batch_count); // only clustering rows
}
for (const auto& row : fragment_results) {
if (row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change") {
continue;
}
const auto id = row.get_as<utils::UUID>("id");
const auto stage = row.get_as<int8_t>("stage");
testlog.trace("Processing row for batch id={}, stage={}: ", id, stage);
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), netw::messaging_service::current_version);
BOOST_REQUIRE(ids.contains(id));
const auto metadata = row.get_as<sstring>("metadata");
auto metadata_json = rjson::parse(metadata);
BOOST_REQUIRE(metadata_json.IsObject());
if (!cleanup || stage == int8_t(db::batchlog_stage::failed_replay)) {
BOOST_REQUIRE_NE(row.get_as<sstring>("value"), "{}");
BOOST_REQUIRE(!metadata_json.HasMember("tombstone"));
const auto value_json = rjson::parse(row.get_as<sstring>("value"));
BOOST_REQUIRE(value_json.IsObject());
BOOST_REQUIRE(value_json.HasMember("data"));
} else if (stage == int8_t(db::batchlog_stage::initial)) {
BOOST_REQUIRE_EQUAL(row.get_as<sstring>("value"), "{}"); // row should be dead -- data column shadowed by tombstone
if (!cleanup) {
BOOST_REQUIRE(metadata_json.HasMember("tombstone"));
}
} else {
BOOST_FAIL(format("Unexpected stage: {}", stage));
}
}
}
};
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
do_replays(db::batchlog_manager::post_replay_cleanup::no);
do_replays(db::batchlog_manager::post_replay_cleanup::yes);
}
testlog.info("Successful replay - should remove all batches");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.is_empty();
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
// Each shard can have a range tombstone (shard_count * 2 range tombstone changes)
// There should be batch_count clustering rows, with stage=1
// There may be [0, batch_count] clustering rows with stage=0
BOOST_REQUIRE_GT(fragment_results.size(), batch_count);
BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2);
for (const auto& row : fragment_results) {
if (row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change") {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::initial));
continue;
}
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), netw::messaging_service::current_version);
BOOST_REQUIRE(written_ats.contains(row.get_as<db_clock::time_point>("written_at")));
BOOST_REQUIRE(ids.contains(row.get_as<utils::UUID>("id")));
BOOST_REQUIRE_EQUAL(row.get_as<sstring>("value"), "{}");
}
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_migrate_v1_v2) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
const auto batch_replay_timeout = 1s;
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = batch_replay_timeout;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([batch_replay_timeout] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
auto& sp = env.get_storage_proxy().local();
auto& db = env.local_db();
auto& tbl = db.find_column_family("ks", "tbl");
auto tbl_schema = tbl.schema();
auto cdef_tbl_v = tbl_schema->get_column_definition(to_bytes("v"));
auto batchlog_v1_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
auto cdef_batchlog_v1_data = batchlog_v1_schema->get_column_definition(to_bytes("data"));
const uint64_t batch_count = 8;
struct batchlog {
utils::UUID id;
int32_t version;
db_clock::time_point written_at;
managed_bytes data;
};
std::map<utils::UUID, const batchlog> batchlogs;
for (int64_t i = 0; i != batch_count; ++i) {
bytes_ostream batchlog_data;
for (int64_t j = 0; j != i+2; ++j) {
auto key = partition_key::from_single_value(*tbl_schema, serialized(j));
mutation m(tbl_schema, key);
m.set_clustered_cell(clustering_key::make_empty(), *cdef_tbl_v, make_atomic_cell(utf8_type, serialized("value")));
ser::serialize(batchlog_data, canonical_mutation(m));
}
const auto id = utils::UUID_gen::get_time_UUID();
auto [it, _] = batchlogs.emplace(id, batchlog{
.id = id,
.version = netw::messaging_service::current_version,
.written_at = db_clock::now() - batch_replay_timeout * 10,
.data = std::move(batchlog_data).to_managed_bytes()});
auto& batch = it->second;
const auto timestamp = api::new_timestamp();
mutation m(batchlog_v1_schema, partition_key::from_single_value(*batchlog_v1_schema, serialized(batch.id)));
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("version"), batch.version, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("written_at"), batch.written_at, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), *cdef_batchlog_v1_data, atomic_cell::make_live(*cdef_batchlog_v1_data->type, timestamp, std::move(batch.data)));
sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no).get();
}
const auto batchlog_v1_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
const auto batchlog_v2_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
// Initial state, all entries are in the v1 table.
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.with_size(batch_count);
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
{
scoped_error_injection error_injection("batchlog_manager_fail_migration");
testlog.info("First replay - migration should fail, all entries stay in v1 table");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
}
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.with_size(batch_count);
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
testlog.info("Second replay - migration should run again and succeed, but replay of migrated entries should fail, so they should remain in the v2 table.");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
}
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.is_empty();
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_v2_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
std::set<utils::UUID> migrated_batchlog_ids;
for (const auto& row : results) {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::failed_replay));
const auto id = row.get_as<utils::UUID>("id");
auto it = batchlogs.find(id);
BOOST_REQUIRE(it != batchlogs.end());
const auto& batch = it->second;
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), batch.version);
BOOST_REQUIRE(row.get_as<db_clock::time_point>("written_at") == batch.written_at);
BOOST_REQUIRE_EQUAL(row.get_blob_fragmented("data"), batch.data);
migrated_batchlog_ids.emplace(id);
}
BOOST_REQUIRE_EQUAL(batchlogs.size(), migrated_batchlog_ids.size());
testlog.info("Third replay - migration is already done, replay of migrated entries should succeed, v2 table should be empty afterwards.");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_replay_write_time) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 1h;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([ks = get_name()] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql(format("CREATE KEYSPACE {} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 2}} AND tablets = {{'enabled': 'false'}}", ks)).get();
env.execute_cql(format("CREATE TABLE {}.tbl1 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 1}}", ks)).get();
env.execute_cql(format("CREATE TABLE {}.tbl2 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'timeout'}}", ks)).get();
const uint64_t batch_count = 8;
const uint64_t mutations_per_batch = 2;
{
scoped_error_injection error_injection("storage_proxy_fail_send_batch");
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
if (i % 2) {
for (const auto& tbl_name : {"tbl1", "tbl2"}) {
queries.emplace_back(format("INSERT INTO {}.{} (pk, v) VALUES (0, 'value');", ks, tbl_name, i));
query_views.emplace_back(queries.back());
}
} else {
for (uint64_t j = 0; j != mutations_per_batch; ++j) {
queries.emplace_back(format("INSERT INTO {}.tbl2 (pk, v) VALUES ({}, 'value');", ks, i * 2 + j));
query_views.emplace_back(queries.back());
}
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
}
}
}
const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count);
auto get_write_attempts = [&] () -> uint64_t {
return env.batchlog_manager().map_reduce0([] (const db::batchlog_manager& bm) {
return bm.get_stats().write_attempts;
}, uint64_t(0), std::plus<uint64_t>{}).get();
};
BOOST_REQUIRE_EQUAL(get_write_attempts(), 0);
// We need this sleep here to make sure all batches are older than propagation delay of 1s.
sleep(2s).get();
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
// Half the batches are skipped due to being too fresh
BOOST_REQUIRE_EQUAL(get_write_attempts(), (batch_count / 2) * mutations_per_batch);
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
for (const auto& row : results) {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::initial));
}
}
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).get();
// Half the batches are skipped (again) due to being too fresh
BOOST_REQUIRE_EQUAL(get_write_attempts(), batch_count * mutations_per_batch);
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
size_t initial = 0;
size_t failed_replay = 0;
for (const auto& row : results) {
const auto stage = static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
switch (stage) {
case db::batchlog_stage::initial:
++initial;
break;
case db::batchlog_stage::failed_replay:
++failed_replay;
break;
default:
BOOST_FAIL(format("Unexpected stage: {}", int8_t(stage)));
}
}
BOOST_REQUIRE_EQUAL(initial, batch_count / 2);
BOOST_REQUIRE_EQUAL(failed_replay, batch_count / 2);
}
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments(
cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema);
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
if (batchlog_shard_fragments.empty()) {
continue;
}
testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard);
position_in_partition::less_compare less_cmp{*batchlog_v2_schema};
size_t rts = 0;
auto min_live_pos = position_in_partition::after_all_clustered_rows();
for (const auto& f : batchlog_shard_fragments) {
if (f.is_range_tombstone) {
++rts;
continue;
}
if (!f.is_live) {
continue;
}
min_live_pos = std::min(min_live_pos, f.pos, less_cmp);
}
BOOST_REQUIRE(rts == 0 || rts == 2);
if (!rts) {
continue;
}
check_range_tombstone_start(*batchlog_shard_fragments.begin());
bool found_range_tombstone_end = false;
for (auto it = std::next(batchlog_shard_fragments.begin()); it != batchlog_shard_fragments.end(); ++it) {
if (it->is_range_tombstone) {
BOOST_REQUIRE(!std::exchange(found_range_tombstone_end, true));
check_range_tombstone_end(*it);
BOOST_REQUIRE(less_cmp(it->pos, min_live_pos));
}
}
}
}, cfg);
}
BOOST_AUTO_TEST_SUITE_END()