Files
scylladb/test/lib/cql_assertions.cc
Marcin Maliszkiewicz 81685b0d06 Merge 'db/batchlog_manager: re-add v1 support for mixed clusters' from Botond Dénes
3f7ee3ce5d introduced system.batchlog_v2, with a schema designed to speed up batchlog replays and make post-replay cleanups much more effective.
It did not introduce a cluster feature for the new table, because it is node local table, so the cluster can switch to the new table gradually, one node at a time.
However, https://github.com/scylladb/scylladb/issues/27886 showed that the switching causes timeouts during upgrades, in mixed clusters. Furthermore, switching to the new table unconditionally  on upgrades nodes, means that on rollback, the batches saved into the v2 table are lost.
This PR introduces re-introduces v1 (`system.batchlog`) support and guards the use of the v2 table with a cluster feature, so mixed clusters keep using v1 and thus be rollback-compatible.
The re-introduced v1 support doesn't support post-replay cleanups for simplicity. The cleanup in v1 was never particularly effective anyway and we ended up disabling it for heavy batchlog users, so I don't think the lack of support for cleanup is a problem.

Fixes: https://github.com/scylladb/scylladb/issues/27886

Needs backport to 2026.1, to fix upgrades for clusters using batches

Closes scylladb/scylladb#28736

* github.com:scylladb/scylladb:
  test/boost/batchlog_manager_test: add tests for v1 batchlog
  test/boost/batchlog_manager_test: make prepare_batches() work with both v1 and v2
  test/boost/batchlog_manager_test: fix indentation
  test/boost/batchlog_manager_test: extract prepare_batches() method
  test/lib/cql_assertions: is_rows(): add dump parameter
  tools/scylla-sstable: extract query result printers
  tools/scylla-sstable: add std::ostream& arg to query result printers
  repair/row_level: repair_flush_hints_batchlog_handler(): add all_replayed to finish log
  db/batchlog_manager: re-add v1 support
  db/batchlog_manager: return all_replayed from process_batch()
  db/batchlog_manager: process_bath() fix indentation
  db/batchlog_manager: make batch() a standalone function
  db/batchlog_manager: make structs stats public
  db/batchlog_manager: allocate limiter on the stack
  db/batchlog_manager: add feature_service dependency
  gms/feature_service: add batchlog_v2 feature

(cherry picked from commit a83ee6cf66)

Closes scylladb/scylladb#28853
2026-03-04 08:28:39 +02:00

362 lines
14 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <fmt/ranges.h>
#include <fmt/std.h>
#include "test/lib/cql_assertions.hh"
#include "test/lib/eventually.hh"
#include "test/lib/log.hh"
#include "transport/messages/result_message.hh"
#include "utils/assert.hh"
#include "utils/to_string.hh"
#include "bytes.hh"
#include "cql3/query_result_printer.hh"
static inline void fail(std::string_view msg, std::source_location loc) {
throw std::runtime_error(std::format("assertion at {}:{} failed: {}", loc.file_name(), loc.line(), msg));
}
void columns_assertions::fail(const sstring& msg) {
::fail(msg, _loc);
}
columns_assertions& columns_assertions::do_with_raw_column(const char* name, std::function<void(data_type, managed_bytes_view)> func) {
const auto& names = _metadata.get_names();
auto it = std::ranges::find_if(names, [name] (const auto& col) {
return col->name->text() == name;
});
if (it == names.end()) {
::fail(seastar::format("Column {} not found in metadata", name), _loc);
}
const size_t index = std::distance(names.begin(), it);
const auto& value = _columns.at(index);
if (!value) {
::fail(seastar::format("Column {} is null", name), _loc);
}
func((*it)->type, *value);
return *this;
}
columns_assertions& columns_assertions::with_raw_column(const char* name, std::function<bool(managed_bytes_view)> predicate) {
return do_with_raw_column(name, [this, name, &predicate] (data_type, managed_bytes_view value) {
if (!predicate(value)) {
::fail(seastar::format("Column {} failed predicate check: value = {}", name, value), _loc);
}
});
}
columns_assertions& columns_assertions::with_raw_column(const char* name, managed_bytes_view value) {
return do_with_raw_column(name, [this, name, &value] (data_type, managed_bytes_view cell_value) {
if (cell_value != value) {
::fail(seastar::format("Expected column {} to have value {}, but got {}", name, value, cell_value), _loc);
}
});
}
rows_assertions::rows_assertions(shared_ptr<cql_transport::messages::result_message::rows> rows, tests::dump_to_logs dump, std::source_location loc)
: _rows(rows)
, _loc(loc)
{
if (dump) {
std::stringstream ss;
cql3::print_query_results_text(ss, _rows->rs());
testlog.debug("Query results for assert_that().is_rows(dump_to_logs::yes) at {}:{}:\n{}", _loc.file_name(), _loc.line(), ss.str());
}
}
rows_assertions
rows_assertions::with_size(size_t size) {
const auto& rs = _rows->rs().result_set();
auto row_count = rs.size();
if (row_count != size) {
fail(format("Expected {:d} row(s) but got {:d}", size, row_count), _loc);
}
return {*this};
}
rows_assertions
rows_assertions::with_size(std::function<bool(size_t)> predicate) {
const auto& rs = _rows->rs().result_set();
auto row_count = rs.size();
if (!predicate(row_count)) {
fail(format("Predicate failed for row count {}", row_count), _loc);
}
return {*this};
}
rows_assertions
rows_assertions::is_empty() {
const auto& rs = _rows->rs().result_set();
auto row_count = rs.size();
if (row_count != 0) {
auto&& first_row = *rs.rows().begin();
fail(seastar::format("Expected no rows, but got {:d}. First row: {}", row_count, first_row), _loc);
}
return {*this};
}
rows_assertions
rows_assertions::is_not_empty() {
const auto& rs = _rows->rs().result_set();
auto row_count = rs.size();
if (row_count == 0) {
fail("Expected some rows, but was result was empty", _loc);
}
return {*this};
}
rows_assertions
rows_assertions::rows_assertions::is_null() {
const auto& rs = _rows->rs().result_set();
for (auto&& row : rs.rows()) {
for (const managed_bytes_opt& v : row) {
if (v) {
fail(seastar::format("Expected null values. Found: {}\n", v), _loc);
}
}
}
return {*this};
}
rows_assertions
rows_assertions::rows_assertions::is_not_null() {
const auto& rs = _rows->rs().result_set();
for (auto&& row : rs.rows()) {
for (const managed_bytes_opt& v : row) {
if (!v) {
fail(seastar::format("Expected non-null values. {}\n", row), _loc);
}
}
}
return is_not_empty();
}
rows_assertions
rows_assertions::with_column_types(std::initializer_list<data_type> column_types) {
auto meta = _rows->rs().result_set().get_metadata();
const auto& columns = meta.get_names();
if (column_types.size() != columns.size()) {
fail(format("Expected {:d} columns, got {:d}", column_types.size(), meta.column_count()), _loc);
}
auto expected_it = column_types.begin();
auto actual_it = columns.begin();
for (int i = 0; i < (int)columns.size(); i++) {
const auto& expected_type = *expected_it++;
const auto& actual_spec = *actual_it++;
if (expected_type != actual_spec->type) {
fail(format("Column {:d}: expected type {}, got {}", i, expected_type->name(), actual_spec->type->name()), _loc);
}
}
return {*this};
}
rows_assertions
rows_assertions::with_row(std::initializer_list<bytes_opt> values) {
const auto& rs = _rows->rs().result_set();
std::vector<managed_bytes_opt> expected_row = values | std::views::transform(to_managed_bytes_opt) | std::ranges::to<std::vector<managed_bytes_opt>>();
for (auto&& row : rs.rows()) {
if (row == expected_row) {
return {*this};
}
}
fail(seastar::format("Expected row not found: {} not in {}\n", expected_row, _rows), _loc);
return {*this};
}
// Verifies that the result has the following rows and only that rows, in that order.
rows_assertions
rows_assertions::with_rows(std::vector<std::vector<bytes_opt>> rows) {
const auto& rs = _rows->rs().result_set();
auto actual_i = rs.rows().begin();
auto actual_end = rs.rows().end();
int row_nr = 0;
for (auto&& row : rows) {
if (actual_i == actual_end) {
fail(format("Expected more rows ({:d}), got {:d}", rows.size(), rs.size()), _loc);
}
auto& actual = *actual_i;
auto expected_row = row | std::views::transform(to_managed_bytes_opt);
if (!std::ranges::equal(
expected_row,
actual)) {
fail(seastar::format("row {} differs, expected {} got {}", row_nr, row, actual), _loc);
}
++actual_i;
++row_nr;
}
if (actual_i != actual_end) {
fail(seastar::format("Expected less rows ({:d}), got {:d}. Next row is: {}", rows.size(), rs.size(),
*actual_i), _loc);
}
return {*this};
}
// Verifies that the result has the following rows and only those rows.
rows_assertions
rows_assertions::with_rows_ignore_order(std::vector<std::vector<bytes_opt>> rows) {
const auto& rs = _rows->rs().result_set();
auto& actual = rs.rows();
for (auto&& expected : rows) {
auto expected_row = expected | std::views::transform(to_managed_bytes_opt);
auto found = std::find_if(std::begin(actual), std::end(actual), [&] (auto&& row) {
return std::equal(
std::begin(row), std::end(row),
std::begin(expected_row), std::end(expected_row));
});
if (found == std::end(actual)) {
fail(seastar::format("row {} not found in result set ({})", expected,
fmt::join(actual | std::views::transform([] (auto& r) { return fmt::to_string(r); }), ", ")), _loc);
}
}
if (rs.size() != rows.size()) {
fail(format("Expected different number of rows ({:d}), got {:d}", rows.size(), rs.size()), _loc);
}
return {*this};
}
columns_assertions rows_assertions::with_columns_of_row(size_t row_index) {
const auto& rs = _rows->rs().result_set();
if (row_index >= rs.rows().size()) {
fail(format("Requested row index {} is out of range, result has {} rows", row_index, rs.rows().size()), _loc);
}
return columns_assertions(rs.get_metadata(), rs.rows().at(row_index), _loc);
}
rows_assertions& rows_assertions::assert_for_columns_of_each_row(std::function<void(columns_assertions&)> func) {
const auto& rs = _rows->rs().result_set();
for (size_t i = 0; i < rs.size(); ++i) {
auto columns = with_columns_of_row(i);
func(columns);
}
return *this;
}
result_msg_assertions::result_msg_assertions(shared_ptr<cql_transport::messages::result_message> msg, std::source_location loc)
: _msg(msg)
, _loc(loc)
{ }
rows_assertions result_msg_assertions::is_rows(tests::dump_to_logs dump) {
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(_msg);
if (!rows) {
fail("Expected rows in result set", _loc);
}
return rows_assertions(rows, dump, _loc);
}
result_msg_assertions assert_that(shared_ptr<cql_transport::messages::result_message> msg, std::source_location loc) {
return result_msg_assertions(msg, loc);
}
rows_assertions rows_assertions::with_serialized_columns_count(size_t columns_count) {
size_t serialized_column_count = _rows->rs().get_metadata().column_count();
if (serialized_column_count != columns_count) {
fail(format("Expected {:d} serialized columns(s) but got {:d}", columns_count, serialized_column_count), _loc);
}
return {*this};
}
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env, std::string_view query, std::unique_ptr<cql3::query_options>&& qo, const std::source_location& loc) {
try {
if (qo) {
return env.execute_cql(query, std::move(qo)).get();
} else {
return env.execute_cql(query).get();
}
} catch (...) {
BOOST_FAIL(seastar::format("query '{}' failed: {}\n{}:{}: originally from here",
query, std::current_exception(), loc.file_name(), loc.line()));
}
return shared_ptr<cql_transport::messages::result_message>(nullptr);
}
void require_rows(cql_test_env& e,
std::string_view qstr,
const std::vector<std::vector<bytes_opt>>& expected,
const std::source_location& loc) {
try {
assert_that(cquery_nofail(e, qstr, nullptr, loc), loc).is_rows().with_rows_ignore_order(expected);
}
catch (const std::exception& e) {
BOOST_FAIL(seastar::format("query '{}' failed: {}\n{}:{}: originally from here",
qstr, e.what(), loc.file_name(), loc.line()));
}
}
void eventually_require_rows(cql_test_env& e, std::string_view qstr, const std::vector<std::vector<bytes_opt>>& expected,
const std::source_location& loc) {
try {
eventually([&] {
assert_that(cquery_nofail(e, qstr, nullptr, loc)).is_rows().with_rows_ignore_order(expected);
});
} catch (const std::exception& e) {
BOOST_FAIL(seastar::format("query '{}' failed: {}\n{}:{}: originally from here",
qstr, e.what(), loc.file_name(), loc.line()));
}
}
void require_rows(cql_test_env& e,
cql3::prepared_cache_key_type id,
const std::vector<cql3::raw_value>& values,
const std::vector<std::vector<bytes_opt>>& expected,
const std::source_location& loc) {
try {
assert_that(e.execute_prepared(id, values).get()).is_rows().with_rows_ignore_order(expected);
} catch (const std::exception& e) {
BOOST_FAIL(format("execute_prepared failed: {}\n{}:{}: originally from here",
e.what(), loc.file_name(), loc.line()));
}
}
future<> require_column_has_value(cql_test_env& e, const sstring& table_name,
std::vector<data_value> pk, std::vector<data_value> ck,
const sstring& column_name, data_value expected) {
auto& db = e.local_db();
auto& cf = db.find_column_family("ks", table_name);
auto schema = cf.schema();
auto pkey = partition_key::from_deeply_exploded(*schema, pk);
auto ckey = clustering_key::from_deeply_exploded(*schema, ck);
auto exp = expected.type()->decompose(expected);
auto dk = dht::decorate_key(*schema, pkey);
auto shard = cf.get_effective_replication_map()->shard_for_reads(*schema, dk._token);
return e.db().invoke_on(shard, [&e, dk = std::move(dk),
ckey = std::move(ckey),
column_name = std::move(column_name),
exp = std::move(exp),
table_name = std::move(table_name)] (replica::database& db) mutable {
auto& cf = db.find_column_family("ks", table_name);
auto schema = cf.schema();
return cf.find_row(schema, make_reader_permit(e), dk, ckey).then([schema, column_name, exp] (auto row) {
SCYLLA_ASSERT(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
SCYLLA_ASSERT(col_def != nullptr);
const atomic_cell_or_collection* cell = row->find_cell(col_def->id);
if (!cell) {
SCYLLA_ASSERT(((void)"column not set", 0));
}
bytes actual;
if (!col_def->type->is_multi_cell()) {
auto c = cell->as_atomic_cell(*col_def);
SCYLLA_ASSERT(c.is_live());
actual = c.value().linearize();
} else {
actual = linearized(serialize_for_cql(*col_def->type,
cell->as_collection_mutation()));
}
SCYLLA_ASSERT(col_def->type->equal(actual, exp));
});
});
}