Files
scylladb/test/boost/batchlog_manager_test.cc
Marcin Maliszkiewicz 81685b0d06 Merge 'db/batchlog_manager: re-add v1 support for mixed clusters' from Botond Dénes
3f7ee3ce5d introduced system.batchlog_v2, with a schema designed to speed up batchlog replays and make post-replay cleanups much more effective.
It did not introduce a cluster feature for the new table, because it is node local table, so the cluster can switch to the new table gradually, one node at a time.
However, https://github.com/scylladb/scylladb/issues/27886 showed that the switching causes timeouts during upgrades, in mixed clusters. Furthermore, switching to the new table unconditionally  on upgrades nodes, means that on rollback, the batches saved into the v2 table are lost.
This PR introduces re-introduces v1 (`system.batchlog`) support and guards the use of the v2 table with a cluster feature, so mixed clusters keep using v1 and thus be rollback-compatible.
The re-introduced v1 support doesn't support post-replay cleanups for simplicity. The cleanup in v1 was never particularly effective anyway and we ended up disabling it for heavy batchlog users, so I don't think the lack of support for cleanup is a problem.

Fixes: https://github.com/scylladb/scylladb/issues/27886

Needs backport to 2026.1, to fix upgrades for clusters using batches

Closes scylladb/scylladb#28736

* github.com:scylladb/scylladb:
  test/boost/batchlog_manager_test: add tests for v1 batchlog
  test/boost/batchlog_manager_test: make prepare_batches() work with both v1 and v2
  test/boost/batchlog_manager_test: fix indentation
  test/boost/batchlog_manager_test: extract prepare_batches() method
  test/lib/cql_assertions: is_rows(): add dump parameter
  tools/scylla-sstable: extract query result printers
  tools/scylla-sstable: add std::ostream& arg to query result printers
  repair/row_level: repair_flush_hints_batchlog_handler(): add all_replayed to finish log
  db/batchlog_manager: re-add v1 support
  db/batchlog_manager: return all_replayed from process_batch()
  db/batchlog_manager: process_bath() fix indentation
  db/batchlog_manager: make batch() a standalone function
  db/batchlog_manager: make structs stats public
  db/batchlog_manager: allocate limiter on the stack
  db/batchlog_manager: add feature_service dependency
  gms/feature_service: add batchlog_v2 feature

(cherry picked from commit a83ee6cf66)

Closes scylladb/scylladb#28853
2026-03-04 08:28:39 +02:00

858 lines
38 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <stdint.h>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_assertions.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/error_injection.hh"
#include "test/lib/log.hh"
#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include "cql3/statements/batch_statement.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/batchlog.hh"
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "db/config.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "message/messaging_service.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
BOOST_AUTO_TEST_SUITE(batchlog_manager_test)
static atomic_cell make_atomic_cell(data_type dt, bytes value) {
return atomic_cell::make_live(*dt, 0, std::move(value));
};
SEASTAR_TEST_CASE(test_execute_batch) {
return do_with_cql_env([] (auto& e) {
auto& qp = e.local_qp();
auto& bp = e.batchlog_manager().local();
return e.execute_cql("create table cf (p1 varchar, c1 int, r1 int, PRIMARY KEY (p1, c1));").discard_result().then([&qp, &e, &bp] () mutable {
auto& db = e.local_db();
auto s = db.find_schema("ks", "cf");
const auto batchlog_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(100)));
using namespace std::chrono_literals;
auto version = netw::messaging_service::current_version;
auto bm = db::get_batchlog_mutation_for(batchlog_schema, { m }, version, db_clock::now() - db_clock::duration(3h), s->id().uuid());
return qp.proxy().mutate_locally(bm, tracing::trace_state_ptr(), db::commitlog::force_sync::no).then([&bp] () mutable {
return bp.count_all_batches().then([](auto n) {
BOOST_CHECK_EQUAL(n, 1);
}).then([&bp] () mutable {
return bp.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).discard_result();
});
});
}).then([&qp] {
return qp.execute_internal("select * from ks.cf where p1 = ? and c1 = ?;", { sstring("key1"), 1 }, cql3::query_processor::cache_internal::yes).then([](auto rs) {
BOOST_REQUIRE(!rs->empty());
auto i = rs->one().template get_as<int32_t>("r1");
BOOST_CHECK_EQUAL(i, int32_t(100));
});
});
});
}
namespace {
struct fragment {
position_in_partition pos;
bool is_range_tombstone;
bool is_live;
fragment(position_in_partition p, bool is_rt, bool is_live)
: pos(std::move(p)), is_range_tombstone(is_rt), is_live(is_live)
{}
class less_comparator {
schema_ptr _s;
public:
explicit less_comparator(schema_ptr s) : _s(std::move(s)) {}
bool operator()(const fragment& a, const fragment& b) const {
position_in_partition::less_compare cmp{*_s};
return cmp(a.pos, b.pos);
}
};
};
struct batchlog_as_fragments {
using fragment_set = std::set<fragment, fragment::less_comparator>;
std::unordered_map<int32_t, fragment_set> initial_fragments_per_shard;
std::unordered_map<int32_t, fragment_set> failed_replay_fragments_per_shard;
};
position_in_partition parse_batchlog_position(const schema& s, const cql3::untyped_result_set::row& row) {
std::vector<managed_bytes> ck_components;
ck_components.reserve(2);
for (const auto ck_component : {"written_at", "id"}) {
if (!row.has(ck_component) || row.get_blob_fragmented(ck_component).empty()) {
// ck is prefix
break;
}
ck_components.push_back(row.get_blob_fragmented(ck_component));
}
return position_in_partition(
static_cast<partition_region>(row.get_as<int8_t>("partition_region")),
static_cast<bound_weight>(row.get_as<int8_t>("position_weight")),
clustering_key_prefix::from_exploded(s, ck_components));
}
batchlog_as_fragments extract_batchlog_fragments(const cql3::untyped_result_set& fragment_results, schema_ptr batchlog_v2_schema) {
auto is_live = [] (std::optional<sstring> value) {
// no value can be stored as null or empty json object ("{}")
return value && value->size() > 2;
};
batchlog_as_fragments result;
for (const auto& row : fragment_results) {
const auto stage = static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
const auto shard = row.get_as<int32_t>("shard");
auto& fragments_per_shard = stage == db::batchlog_stage::initial
? result.initial_fragments_per_shard
: result.failed_replay_fragments_per_shard;
const auto is_rtc = row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change";
auto pos = parse_batchlog_position(*batchlog_v2_schema, row);
BOOST_REQUIRE_EQUAL(is_rtc, (pos.get_bound_weight() != bound_weight::equal));
testlog.info("[stage {}, bathlog shard {}] fragment: pos={}, kind={}, live={}", int8_t(stage), shard, pos, row.get_as<sstring>("mutation_fragment_kind"), is_live(row.get_opt<sstring>("value")));
auto& fragments = fragments_per_shard.try_emplace(shard, batchlog_as_fragments::fragment_set(fragment::less_comparator(batchlog_v2_schema))).first->second;
fragments.emplace(pos, is_rtc, !is_rtc && is_live(row.get_opt<sstring>("value")));
}
return result;
}
void check_range_tombstone_start(const fragment& f) {
BOOST_REQUIRE(f.is_range_tombstone);
BOOST_REQUIRE(f.pos.region() == partition_region::clustered);
BOOST_REQUIRE(f.pos.has_key());
BOOST_REQUIRE(f.pos.key().explode().empty());
BOOST_REQUIRE(f.pos.get_bound_weight() == bound_weight::before_all_prefixed);
}
void check_range_tombstone_end(const fragment& f, std::optional<bound_weight> bound_weight_opt = {}) {
BOOST_REQUIRE(f.is_range_tombstone);
BOOST_REQUIRE(f.pos.region() == partition_region::clustered);
BOOST_REQUIRE(f.pos.has_key());
BOOST_REQUIRE_EQUAL(f.pos.key().explode().size(), 1);
if (bound_weight_opt) {
BOOST_REQUIRE(f.pos.get_bound_weight() == *bound_weight_opt);
}
}
uint64_t prepare_batches(cql_test_env& env, std::string_view batchlog_table_name, uint64_t batch_count, bool replay_fails,
db::batchlog_manager::post_replay_cleanup cleanup) {
const bool is_v1 = batchlog_table_name == db::system_keyspace::BATCHLOG;
uint64_t failed_batches = 0;
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
for (uint64_t j = 0; j != i+2; ++j) {
queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j));
query_views.emplace_back(queries.back());
}
const bool fail = i % 2;
bool injected_exception_thrown = false;
std::optional<scoped_error_injection> error_injection;
if (fail) {
++failed_batches;
error_injection.emplace("storage_proxy_fail_send_batch");
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
if (fail) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
injected_exception_thrown = true;
} else {
throw;
}
}
BOOST_REQUIRE_EQUAL(injected_exception_thrown, fail);
}
// v1 (system.batchlog) is partition-oriented, while v2 (system.batchlog_v2) is row oriented. We need to switch the partition-region filter accordingly.
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = {} ALLOW FILTERING", db::system_keyspace::NAME, batchlog_table_name, is_v1 ? 0 : 2);
assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get())
.is_rows()
.with_size(failed_batches);
assert_that(env.execute_cql(fragments_query).get())
.is_rows(tests::dump_to_logs::yes)
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<sstring>("mutation_source", "memtable:0");
});
std::optional<scoped_error_injection> error_injection;
if (replay_fails) {
error_injection.emplace("storage_proxy_fail_replay_batch");
}
bm.do_batch_log_replay(cleanup).get();
assert_that(env.execute_cql(format("SELECT id FROM {}.{}", db::system_keyspace::NAME, batchlog_table_name)).get())
.is_rows(tests::dump_to_logs::yes)
.with_size(replay_fails ? failed_batches : 0);
return failed_batches;
}
} // anonymous namespace
future<> run_batchlog_v1_cleanup_with_failed_batches_test(bool replay_fails) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
cfg.disabled_features.insert("BATCHLOG_V2");
return do_with_cql_env_thread([=] (cql_test_env& env) -> void {
const uint64_t batch_count = 8;
const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG, batch_count, replay_fails, db::batchlog_manager::post_replay_cleanup::no);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 0 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
const auto batchlog_v1_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
size_t live{0};
size_t dead{0};
for (const auto& row : fragment_results) {
const auto metadata = row.get_as<sstring>("metadata");
auto metadata_json = rjson::parse(metadata);
if (metadata_json.HasMember("tombstone") && metadata_json["tombstone"].IsObject() && metadata_json["tombstone"].HasMember("deletion_time")) {
++dead;
} else {
++live;
}
}
if (replay_fails) {
BOOST_REQUIRE_EQUAL(failed_batches, live);
} else {
BOOST_REQUIRE_EQUAL(0, live);
}
BOOST_REQUIRE_EQUAL(batch_count, dead + live);
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_v1_replay_fails) {
return run_batchlog_v1_cleanup_with_failed_batches_test(true);
}
SEASTAR_TEST_CASE(test_batchlog_v1_replay) {
return run_batchlog_v1_cleanup_with_failed_batches_test(false);
}
future<> run_batchlog_cleanup_with_failed_batches_test(bool replay_fails, db::batchlog_manager::post_replay_cleanup cleanup) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([=] (cql_test_env& env) -> void {
const uint64_t batch_count = 8;
const uint64_t failed_batches = prepare_batches(env, db::system_keyspace::BATCHLOG_V2, batch_count, replay_fails, cleanup);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments(
cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema);
if (cleanup) {
size_t initial_rows{};
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
// some batchlog shards can be empty, just ignore
if (batchlog_shard_fragments.empty()) {
continue;
}
testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard);
size_t rts{}, rows{};
for (const auto& fragment : batchlog_shard_fragments) {
rts += fragment.is_range_tombstone;
rows += !fragment.is_range_tombstone;
BOOST_REQUIRE(!fragment.is_live);
}
// cleanup affects only batchlog shards which contributed to replay
if (rts) {
BOOST_REQUIRE_EQUAL(rts, 2);
check_range_tombstone_start(*batchlog_shard_fragments.begin());
check_range_tombstone_end(*batchlog_shard_fragments.rbegin(), bound_weight::after_all_prefixed);
}
initial_rows += rows;
}
// some of the initial fragments could have been garbage collected after cleanup (shadowed by range tombstone)
// this happens in the background so we can have up to total_batches rows
BOOST_REQUIRE_LE(initial_rows, batch_count);
if (replay_fails) {
size_t failed_replay_rows{};
for (const auto& [batchlog_shard, batchlog_shard_fragments] : failed_replay_fragments_per_shard) {
for (const auto& fragment : batchlog_shard_fragments) {
BOOST_REQUIRE(!fragment.is_range_tombstone);
BOOST_REQUIRE(fragment.is_live);
++failed_replay_rows;
}
}
BOOST_REQUIRE_EQUAL(failed_replay_rows, failed_batches);
} else {
BOOST_REQUIRE(failed_replay_fragments_per_shard.empty());
}
} else {
size_t live{}, dead{};
BOOST_REQUIRE(failed_replay_fragments_per_shard.empty());
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
for (const auto& fragment : batchlog_shard_fragments) {
BOOST_REQUIRE(!fragment.is_range_tombstone);
if (fragment.is_live) {
++live;
} else {
++dead;
}
}
}
const auto total = live + dead;
BOOST_REQUIRE_EQUAL(total, batch_count);
if (replay_fails) {
BOOST_REQUIRE_EQUAL(live, failed_batches);
} else {
BOOST_REQUIRE_EQUAL(dead, total);
}
}
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_replay_fails_no_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::no);
}
SEASTAR_TEST_CASE(test_batchlog_replay_fails_with_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(true, db::batchlog_manager::post_replay_cleanup::yes);
}
SEASTAR_TEST_CASE(test_batchlog_replay_no_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::no);
}
SEASTAR_TEST_CASE(test_batchlog_replay_with_cleanup) {
return run_batchlog_cleanup_with_failed_batches_test(false, db::batchlog_manager::post_replay_cleanup::yes);
}
SEASTAR_TEST_CASE(test_batchlog_replay_stage) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 0s;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
const uint64_t batch_count = 8;
const auto shard_count = 256;
{
scoped_error_injection error_injection("storage_proxy_fail_send_batch");
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
for (uint64_t j = 0; j != i+2; ++j) {
queries.emplace_back(format("INSERT INTO tbl (pk, v) VALUES ({}, 'value');", j));
query_views.emplace_back(queries.back());
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
}
}
}
// Ensure select ... where write_time < write_time_limit (=now) picks up all batches.
sleep(2ms).get();
const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
std::set<utils::UUID> ids;
std::set<db_clock::time_point> written_ats;
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<int32_t>("version", netw::messaging_service::current_version)
.with_typed_column<int8_t>("stage", int8_t(db::batchlog_stage::initial))
.with_typed_column<db_clock::time_point>("written_at", [&] (db_clock::time_point written_at) {
written_ats.insert(written_at);
return true;
})
.with_typed_column<utils::UUID>("id", [&] (utils::UUID id) {
ids.insert(id);
return true;
});
});
BOOST_REQUIRE_EQUAL(ids.size(), batch_count);
BOOST_REQUIRE_LE(written_ats.size(), batch_count);
auto do_replays = [&] (db::batchlog_manager::post_replay_cleanup cleanup) {
for (unsigned i = 0; i < 3; ++i) {
testlog.info("Replay attempt [cleanup={}] #{} - batches should be in failed_replay stage", cleanup, i);
bm.do_batch_log_replay(cleanup).get();
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count)
.assert_for_columns_of_each_row([&] (columns_assertions& columns) {
columns.with_typed_column<int32_t>("version", netw::messaging_service::current_version)
.with_typed_column<int8_t>("stage", [&] (int8_t stage) {
// (0) cleanup::no == db::batchlog_stage::initial
// (1) cleanup::yes == db::batchlog_stage::failed_replay
return stage == int8_t(bool(cleanup));
})
.with_typed_column<db_clock::time_point>("written_at", [&] (db_clock::time_point written_at) {
return written_ats.contains(written_at);
})
.with_typed_column<utils::UUID>("id", [&] (utils::UUID id) {
return ids.contains(id);
});
});
auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
if (cleanup) {
// Each shard has a range tombstone (shard_count * 2 range tombstone changes)
// There should be batch_count clustering rows, with stage=1
// There may be [0, batch_count] clustering rows with stage=0
BOOST_REQUIRE_GT(fragment_results.size(), batch_count);
BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2);
} else {
BOOST_REQUIRE_EQUAL(fragment_results.size(), batch_count); // only clustering rows
}
for (const auto& row : fragment_results) {
if (row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change") {
continue;
}
const auto id = row.get_as<utils::UUID>("id");
const auto stage = row.get_as<int8_t>("stage");
testlog.trace("Processing row for batch id={}, stage={}: ", id, stage);
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), netw::messaging_service::current_version);
BOOST_REQUIRE(ids.contains(id));
const auto metadata = row.get_as<sstring>("metadata");
auto metadata_json = rjson::parse(metadata);
BOOST_REQUIRE(metadata_json.IsObject());
if (!cleanup || stage == int8_t(db::batchlog_stage::failed_replay)) {
BOOST_REQUIRE_NE(row.get_as<sstring>("value"), "{}");
BOOST_REQUIRE(!metadata_json.HasMember("tombstone"));
const auto value_json = rjson::parse(row.get_as<sstring>("value"));
BOOST_REQUIRE(value_json.IsObject());
BOOST_REQUIRE(value_json.HasMember("data"));
} else if (stage == int8_t(db::batchlog_stage::initial)) {
BOOST_REQUIRE_EQUAL(row.get_as<sstring>("value"), "{}"); // row should be dead -- data column shadowed by tombstone
if (!cleanup) {
BOOST_REQUIRE(metadata_json.HasMember("tombstone"));
}
} else {
BOOST_FAIL(format("Unexpected stage: {}", stage));
}
}
}
};
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
do_replays(db::batchlog_manager::post_replay_cleanup::no);
do_replays(db::batchlog_manager::post_replay_cleanup::yes);
}
testlog.info("Successful replay - should remove all batches");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.is_empty();
const auto fragment_results = cql3::untyped_result_set(env.execute_cql(fragments_query).get());
// Each shard can have a range tombstone (shard_count * 2 range tombstone changes)
// There should be batch_count clustering rows, with stage=1
// There may be [0, batch_count] clustering rows with stage=0
BOOST_REQUIRE_GT(fragment_results.size(), batch_count);
BOOST_REQUIRE_LE(fragment_results.size(), shard_count * 2 + batch_count * 2);
for (const auto& row : fragment_results) {
if (row.get_as<sstring>("mutation_fragment_kind") == "range tombstone change") {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::initial));
continue;
}
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), netw::messaging_service::current_version);
BOOST_REQUIRE(written_ats.contains(row.get_as<db_clock::time_point>("written_at")));
BOOST_REQUIRE(ids.contains(row.get_as<utils::UUID>("id")));
BOOST_REQUIRE_EQUAL(row.get_as<sstring>("value"), "{}");
}
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_migrate_v1_v2) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
const auto batch_replay_timeout = 1s;
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = batch_replay_timeout;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([batch_replay_timeout] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql("CREATE TABLE tbl (pk bigint PRIMARY KEY, v text)").get();
auto& sp = env.get_storage_proxy().local();
auto& db = env.local_db();
auto& tbl = db.find_column_family("ks", "tbl");
auto tbl_schema = tbl.schema();
auto cdef_tbl_v = tbl_schema->get_column_definition(to_bytes("v"));
auto batchlog_v1_schema = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
auto cdef_batchlog_v1_data = batchlog_v1_schema->get_column_definition(to_bytes("data"));
const uint64_t batch_count = 8;
struct batchlog {
utils::UUID id;
int32_t version;
db_clock::time_point written_at;
managed_bytes data;
};
std::map<utils::UUID, const batchlog> batchlogs;
for (int64_t i = 0; i != batch_count; ++i) {
bytes_ostream batchlog_data;
for (int64_t j = 0; j != i+2; ++j) {
auto key = partition_key::from_single_value(*tbl_schema, serialized(j));
mutation m(tbl_schema, key);
m.set_clustered_cell(clustering_key::make_empty(), *cdef_tbl_v, make_atomic_cell(utf8_type, serialized("value")));
ser::serialize(batchlog_data, canonical_mutation(m));
}
const auto id = utils::UUID_gen::get_time_UUID();
auto [it, _] = batchlogs.emplace(id, batchlog{
.id = id,
.version = netw::messaging_service::current_version,
.written_at = db_clock::now() - batch_replay_timeout * 10,
.data = std::move(batchlog_data).to_managed_bytes()});
auto& batch = it->second;
const auto timestamp = api::new_timestamp();
mutation m(batchlog_v1_schema, partition_key::from_single_value(*batchlog_v1_schema, serialized(batch.id)));
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("version"), batch.version, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), to_bytes("written_at"), batch.written_at, timestamp);
m.set_cell(clustering_key_prefix::make_empty(), *cdef_batchlog_v1_data, atomic_cell::make_live(*cdef_batchlog_v1_data->type, timestamp, std::move(batch.data)));
sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no).get();
}
const auto batchlog_v1_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG);
const auto batchlog_v2_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
// Initial state, all entries are in the v1 table.
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.with_size(batch_count);
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
{
scoped_error_injection error_injection("batchlog_manager_fail_migration");
testlog.info("First replay - migration should fail, all entries stay in v1 table");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
}
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.with_size(batch_count);
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
testlog.info("Second replay - migration should run again and succeed, but replay of migrated entries should fail, so they should remain in the v2 table.");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
}
assert_that(env.execute_cql(batchlog_v1_query).get())
.is_rows()
.is_empty();
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_v2_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
std::set<utils::UUID> migrated_batchlog_ids;
for (const auto& row : results) {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::failed_replay));
const auto id = row.get_as<utils::UUID>("id");
auto it = batchlogs.find(id);
BOOST_REQUIRE(it != batchlogs.end());
const auto& batch = it->second;
BOOST_REQUIRE_EQUAL(row.get_as<int32_t>("version"), batch.version);
BOOST_REQUIRE(row.get_as<db_clock::time_point>("written_at") == batch.written_at);
BOOST_REQUIRE_EQUAL(row.get_blob_fragmented("data"), batch.data);
migrated_batchlog_ids.emplace(id);
}
BOOST_REQUIRE_EQUAL(batchlogs.size(), migrated_batchlog_ids.size());
testlog.info("Third replay - migration is already done, replay of migrated entries should succeed, v2 table should be empty afterwards.");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
assert_that(env.execute_cql(batchlog_v2_query).get())
.is_rows()
.is_empty();
}, cfg);
}
SEASTAR_TEST_CASE(test_batchlog_replay_write_time) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
return make_ready_future<>();
#endif
cql_test_config cfg;
cfg.db_config->batchlog_replay_cleanup_after_replays.set_value("9999999", utils::config_file::config_source::Internal);
cfg.batchlog_replay_timeout = 1h;
cfg.batchlog_delay = 9999h;
return do_with_cql_env_thread([ks = get_name()] (cql_test_env& env) -> void {
auto& bm = env.batchlog_manager().local();
env.execute_cql(format("CREATE KEYSPACE {} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 2}} AND tablets = {{'enabled': 'false'}}", ks)).get();
env.execute_cql(format("CREATE TABLE {}.tbl1 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 1}}", ks)).get();
env.execute_cql(format("CREATE TABLE {}.tbl2 (pk bigint PRIMARY KEY, v text) WITH tombstone_gc = {{'mode': 'timeout'}}", ks)).get();
const uint64_t batch_count = 8;
const uint64_t mutations_per_batch = 2;
{
scoped_error_injection error_injection("storage_proxy_fail_send_batch");
for (uint64_t i = 0; i != batch_count; ++i) {
std::vector<sstring> queries;
std::vector<std::string_view> query_views;
if (i % 2) {
for (const auto& tbl_name : {"tbl1", "tbl2"}) {
queries.emplace_back(format("INSERT INTO {}.{} (pk, v) VALUES (0, 'value');", ks, tbl_name, i));
query_views.emplace_back(queries.back());
}
} else {
for (uint64_t j = 0; j != mutations_per_batch; ++j) {
queries.emplace_back(format("INSERT INTO {}.tbl2 (pk, v) VALUES ({}, 'value');", ks, i * 2 + j));
query_views.emplace_back(queries.back());
}
}
try {
env.execute_batch(
query_views,
cql3::statements::batch_statement::type::LOGGED,
std::make_unique<cql3::query_options>(db::consistency_level::ONE, std::vector<cql3::raw_value>())).get();
} catch (std::runtime_error& ex) {
BOOST_REQUIRE_EQUAL(std::string(ex.what()), "Error injection: failing to send batch");
}
}
}
const auto batchlog_query = format("SELECT * FROM {}.{}", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
assert_that(env.execute_cql(batchlog_query).get())
.is_rows()
.with_size(batch_count);
auto get_write_attempts = [&] () -> uint64_t {
return env.batchlog_manager().map_reduce0([] (const db::batchlog_manager& bm) {
return bm.get_stats().write_attempts;
}, uint64_t(0), std::plus<uint64_t>{}).get();
};
BOOST_REQUIRE_EQUAL(get_write_attempts(), 0);
// We need this sleep here to make sure all batches are older than propagation delay of 1s.
sleep(2s).get();
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no).get();
// Half the batches are skipped due to being too fresh
BOOST_REQUIRE_EQUAL(get_write_attempts(), (batch_count / 2) * mutations_per_batch);
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
for (const auto& row : results) {
BOOST_REQUIRE_EQUAL(row.get_as<int8_t>("stage"), int8_t(db::batchlog_stage::initial));
}
}
{
scoped_error_injection error_injection("storage_proxy_fail_replay_batch");
bm.do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::yes).get();
// Half the batches are skipped (again) due to being too fresh
BOOST_REQUIRE_EQUAL(get_write_attempts(), batch_count * mutations_per_batch);
auto results = cql3::untyped_result_set(env.execute_cql(batchlog_query).get());
BOOST_REQUIRE_EQUAL(results.size(), batch_count);
size_t initial = 0;
size_t failed_replay = 0;
for (const auto& row : results) {
const auto stage = static_cast<db::batchlog_stage>(row.get_as<int8_t>("stage"));
switch (stage) {
case db::batchlog_stage::initial:
++initial;
break;
case db::batchlog_stage::failed_replay:
++failed_replay;
break;
default:
BOOST_FAIL(format("Unexpected stage: {}", int8_t(stage)));
}
}
BOOST_REQUIRE_EQUAL(initial, batch_count / 2);
BOOST_REQUIRE_EQUAL(failed_replay, batch_count / 2);
}
const auto fragments_query = format("SELECT * FROM MUTATION_FRAGMENTS({}.{}) WHERE partition_region = 2 ALLOW FILTERING", db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto batchlog_v2_schema = env.local_db().find_schema(db::system_keyspace::NAME, db::system_keyspace::BATCHLOG_V2);
const auto [initial_fragments_per_shard, failed_replay_fragments_per_shard] = extract_batchlog_fragments(
cql3::untyped_result_set(env.execute_cql(fragments_query).get()), batchlog_v2_schema);
for (const auto& [batchlog_shard, batchlog_shard_fragments] : initial_fragments_per_shard) {
if (batchlog_shard_fragments.empty()) {
continue;
}
testlog.info("Checking fragment in initial stage and batchlog shard {}", batchlog_shard);
position_in_partition::less_compare less_cmp{*batchlog_v2_schema};
size_t rts = 0;
auto min_live_pos = position_in_partition::after_all_clustered_rows();
for (const auto& f : batchlog_shard_fragments) {
if (f.is_range_tombstone) {
++rts;
continue;
}
if (!f.is_live) {
continue;
}
min_live_pos = std::min(min_live_pos, f.pos, less_cmp);
}
BOOST_REQUIRE(rts == 0 || rts == 2);
if (!rts) {
continue;
}
check_range_tombstone_start(*batchlog_shard_fragments.begin());
bool found_range_tombstone_end = false;
for (auto it = std::next(batchlog_shard_fragments.begin()); it != batchlog_shard_fragments.end(); ++it) {
if (it->is_range_tombstone) {
BOOST_REQUIRE(!std::exchange(found_range_tombstone_end, true));
check_range_tombstone_end(*it);
BOOST_REQUIRE(less_cmp(it->pos, min_live_pos));
}
}
}
}, cfg);
}
BOOST_AUTO_TEST_SUITE_END()