Files
scylladb/test/boost/group0_test.cc
Andrzej Jackowski e366030a92 treewide: seastar module update
The reason for this seastar update is to have the fixed handling
of the `integer` type in `seastar-json2code` because it's needed
for further development of ScyllaDB REST API.

The following changes were introduced to ScyllaDB code to ensure it
compiles with the updated seastar:
 - Remove `seastar/util/modules.hh` includes as the file was removed
   from seastar
 - Modified `metrics::impl::labels_type` construction in
   `test/boost/group0_test.cc` because now it requires `escaped_string`

* seastar 340e14a7...8c3fba7a (32):
  > Merge 'Remove net::packet usage from dns.cc' from Pavel Emelyanov
    dns: Optimize packet sending for newer c-ares versions
    dns: Replace net::packet with vector<temporary_buffer>
    dns: Remove unused local variable
    dns: Remove pointless for () loop wrapping
    dns: Introduce do_sendv_tcp() method
    dns: Introduce do_send_udp() method
  > test: Add http rules test of matching order
  > Merge 'Generalize packet_data_source into memory_data_source' from Pavel Emelyanov
    memcached: Patch test to use memory_data_source
    memcached: Use memory_data_source in server
    rpc: Use memory_data_sink without constructing net::packet
    util: Generalize packet_data_source into memory_data_source
  > tests: coroutines: restore "explicit this" tests
  > reactor: remove blocking of SIGILL
  > Merge 'Update compilers in GH actions scripts' from Pavel Emelyanov
    github: Use gcc-14
    github: Use clang-20
  > Merge 'Reinforce DNS reverse resolution test ' from Pavel Emelyanov
    test: Make test_resolve() try several addresses
    test: Coroutinize test_resolve() helper
  > modules: make module support standards-compliant
  > Merge 'Fix incorrect union access in dns resolver' from Pavel Emelyanov
    dns: Squash two if blocks together
    dns: Do not check tcp entry for udp type
  > coroutine: Fix compilation of execute_involving_handle_destruction_in_await_suspend
  > promise: Document that promise is resolved at most once
  > coroutine: exception: workaround broken destroy coroutine handle in await_suspend
  > socket: Return unspecified socket_address for unconnected socket
  > smp: Fix exception safety of invoke_on_... internal copying
  > Merge 'Improve loads evaluation by reactor' from Pavel Emelyanov
    reactor: Keep loads timer on reactor
    reactor: Update loads evaluation loop
  > Merge 'scripts: add 'integer' type to seastar-json2code' from Andrzej Jackowski
    test: extend tests/unit/api.json to use 'integer' type
    scripts: add 'integer' type to seastar-json2code
  > Merge 'Sanitize tls::session::do_put(_one)? overloads' from Pavel Emelyanov
    tls: Rename do_put_one(temporary_buffer) into do_put()
    tls: Fix indentation after previous patch
    tls: Move semaphore grab into iterating do_put()
  > net: tcp: change unsent queue from packets to temporary_buffer:s
  > timer: Enable highres timer based on next timeout value
  > rpc: Add a new constructor in closed_error to accept string argument
  > memcache: Implement own data sink for responses
  > Merge 'file: recursive_remove_directory: general cleanup' from Avi Kivity
    file: do_recursive_remove_directory(): move object when popping from queue
    file: do_recursive_remove_directory(): adjust indentation
    file: do_recursive_remove_directory(): coroutinize
    file: do_recursive_remove_directory(): simplify conditional
    file: do_recursive_remove_directory(): remove wrong const
    file: do_recursive_remove_directory(): clean up work_entry
  > tests: Move thread_context_switch_test into perf/
  > test: Add unit test for append_challenged_posix_file
  > Merge 'Prometheus metrics handler optimization' from Travis Downs
    prometheus: optimize metrics aggregation
    prometheus: move and test aggregate_by helper
    prometheus: various optimizations
    metrics: introduce escaped_string for label values
    metric:value: implement + in terms of +=
    tests: add prometheus text format acceptance tests
    extract memory_data_sink.hh
    metrics_perf: enhance metrics bench
  > demos: Simplify udp_zero_copy_demo's way of preparing the packet
  > metrics: Remove deprecated make_...-ers
  > Merge 'Make slab_test be BOOST kind' from Pavel Emelyanov
    test: Use BOOST_REQUIRE checkers
    test: Replace some SEASTAR_ASSERT-s with static_assert-s
    test: Convert slab test into boost kind
  > Merge 'Coroutinize lister_test' from Pavel Emelyanov
    test: Fix indentation after previuous patch
    test: Coroutinize lister_test lister::report() method
    test: Coroutinize lister_test main code
  > file: recursive_remove_directory(): use a list instead of a deque
  > Merge 'Stop using packets in tls data_sink and session' from Pavel Emelyanov
    tls: Stop using net::packet in session::put()
    tls: Fix indentation after previous patch
    tls: Split session::do_put()
    tls: Mark some session methods private

Closes scylladb/scylladb#27240
2025-11-27 12:34:22 +02:00

349 lines
16 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_assertions.hh"
#include <seastar/core/coroutine.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
#include "schema/schema_builder.hh"
#include "utils/UUID_gen.hh"
#include "utils/error_injection.hh"
#include "transport/messages/result_message.hh"
#include "service/migration_manager.hh"
#include <fmt/ranges.h>
#include <seastar/core/metrics_api.hh>
static future<utils::chunked_vector<std::vector<managed_bytes_opt>>> fetch_rows(cql_test_env& e, std::string_view cql) {
auto msg = co_await e.execute_cql(cql);
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
BOOST_REQUIRE(rows);
co_return rows->rs().result_set().rows();
}
static future<size_t> get_history_size(cql_test_env& e) {
co_return (co_await fetch_rows(e, "select * from system.group0_history")).size();
}
BOOST_AUTO_TEST_SUITE(group0_test)
SEASTAR_TEST_CASE(test_abort_server_on_background_error) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n";
return make_ready_future<>();
#else
return do_with_cql_env([] (cql_test_env& e) -> future<> {
utils::get_local_injector().enable("store_log_entries/test-failure", true);
auto get_metric_ui64 = [&](sstring name) {
const auto& value_map = seastar::metrics::impl::get_value_map();
const auto& metric_family = value_map.at("raft_group0_" + name);
const auto& registered_metric = metric_family.at(make_lw_shared<const metrics::impl::labels_type>({{"shard", metrics::impl::escaped_string("0")}}));
return (*registered_metric)().ui();
};
auto get_status = [&] {
return get_metric_ui64("status");
};
auto perform_schema_change = [&, has_ks = false] () mutable -> future<> {
if (has_ks) {
co_await e.execute_cql("drop keyspace new_ks");
} else {
co_await e.execute_cql("create keyspace new_ks with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}");
}
has_ks = !has_ks;
};
auto check_error = [](const raft::stopped_error& e) {
return e.what() == sstring("Raft instance is stopped, reason: \"background error, std::runtime_error (store_log_entries/test-failure)\"");
};
BOOST_REQUIRE_EQUAL(get_status(), 1);
BOOST_CHECK_EXCEPTION(co_await perform_schema_change(), raft::stopped_error, check_error);
BOOST_REQUIRE_EQUAL(get_status(), 2);
BOOST_CHECK_EXCEPTION(co_await perform_schema_change(), raft::stopped_error, check_error);
BOOST_REQUIRE_EQUAL(get_status(), 2);
BOOST_CHECK_EXCEPTION(co_await perform_schema_change(), raft::stopped_error, check_error);
BOOST_REQUIRE_EQUAL(get_status(), 2);
});
#endif
}
SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) {
return do_with_cql_env([] (cql_test_env& e) -> future<> {
using namespace std::chrono;
auto get_history_size = std::bind_front(::get_history_size, std::ref(e));
auto perform_schema_change = [&, has_ks = false] () mutable -> future<> {
if (has_ks) {
co_await e.execute_cql("drop keyspace new_ks");
} else {
co_await e.execute_cql("create keyspace new_ks with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}");
}
has_ks = !has_ks;
};
auto size = co_await get_history_size();
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), size + 1);
auto& rclient = e.get_raft_group0_client();
rclient.set_history_gc_duration(gc_clock::duration{0});
// When group0_history_gc_duration is 0, any change should clear all previous history entries.
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 1);
rclient.set_history_gc_duration(duration_cast<gc_clock::duration>(weeks{1}));
co_await perform_schema_change();
BOOST_REQUIRE_EQUAL(co_await get_history_size(), 2);
for (int i = 0; i < 10; ++i) {
co_await perform_schema_change();
}
// Would use a shorter sleep, but gc_clock's resolution is one second.
auto sleep_dur = seconds{1};
co_await sleep(sleep_dur);
for (int i = 0; i < 10; ++i) {
co_await perform_schema_change();
}
auto get_history_timestamps = [&] () -> future<std::vector<microseconds>> {
auto rows = co_await fetch_rows(e, "select state_id from system.group0_history");
std::vector<microseconds> result;
for (auto& row: rows) {
auto state_id = value_cast<utils::UUID>(timeuuid_type->deserialize(*row[0]));
result.push_back(utils::UUID_gen::unix_timestamp_micros(state_id));
}
co_return result;
};
auto timestamps1 = co_await get_history_timestamps();
rclient.set_history_gc_duration(duration_cast<gc_clock::duration>(sleep_dur));
co_await perform_schema_change();
auto timestamps2 = co_await get_history_timestamps();
// State IDs are sorted in descending order in the history table.
// The first entry corresponds to the last schema change.
auto last_ts = timestamps2.front();
testlog.info("timestamps1: {}", timestamps1);
testlog.info("timestamps2: {}", timestamps2);
// All entries in `timestamps2` except `last_ts` should be present in `timestamps1`.
BOOST_REQUIRE(std::includes(timestamps1.begin(), timestamps1.end(), timestamps2.begin()+1, timestamps2.end(), std::greater{}));
// Count the number of timestamps in `timestamps1` that are older than the last entry by `sleep_dur` or more.
// There should be about 12 because we slept for `sleep_dur` between the two loops above
// and performing these schema changes should be much faster than `sleep_dur`.
auto older_by_sleep_dur = std::count_if(timestamps1.begin(), timestamps1.end(), [last_ts, sleep_dur] (microseconds ts) {
return last_ts - ts > sleep_dur;
});
testlog.info("older by sleep_dur: {}", older_by_sleep_dur);
// That last change should have cleared exactly those older than `sleep_dur` entries.
// Therefore `timestamps2` should contain all in `timestamps1` minus those changes plus one (`last_ts`).
BOOST_REQUIRE_EQUAL(timestamps2.size(), timestamps1.size() - older_by_sleep_dur + 1);
});
}
SEASTAR_TEST_CASE(test_concurrent_group0_modifications) {
return do_with_cql_env([] (cql_test_env& e) -> future<> {
auto& rclient = e.get_raft_group0_client();
auto& mm = e.migration_manager().local();
// raft_group0_client::_group0_operation_mutex prevents concurrent group 0 changes to be executed on a single node,
// so in production `group0_concurrent_modification` never occurs if all changes go through a single node.
// For this test, give it more units so it doesn't block these concurrent executions
// in order to simulate a scenario where multiple nodes concurrently send schema changes.
rclient.operation_mutex().signal(1337);
// Make DDL statement execution fail on the first attempt if it gets a concurrent modification exception.
mm.set_concurrent_ddl_retries(0);
auto get_history_size = std::bind_front(::get_history_size, std::ref(e));
auto perform_schema_changes = [] (cql_test_env& e, size_t n, size_t task_id) -> future<size_t> {
size_t successes = 0;
bool has_ks = false;
auto drop_ks_cql = format("drop keyspace new_ks{}", task_id);
auto create_ks_cql = format("create keyspace new_ks{} with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}", task_id);
auto perform = [&] () -> future<> {
try {
if (has_ks) {
co_await e.execute_cql(drop_ks_cql);
} else {
co_await e.execute_cql(create_ks_cql);
}
has_ks = !has_ks;
++successes;
} catch (const service::group0_concurrent_modification&) {}
};
while (n--) {
co_await perform();
}
co_return successes;
};
auto size = co_await get_history_size();
size_t N = 4;
size_t M = 4;
// Run N concurrent tasks, each performing M schema changes in sequence.
auto successes = co_await map_reduce(std::views::iota(size_t{0}, N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{});
// The number of new entries that appeared in group 0 history table should be exactly equal
// to the number of successful schema changes.
BOOST_REQUIRE_EQUAL(successes, (co_await get_history_size()) - size);
// Make it so that execution of a DDL statement will perform up to (N-1) * M + 1 attempts (first try + up to (N-1) * M retries).
mm.set_concurrent_ddl_retries((N-1)*M);
// Run N concurrent tasks, each performing M schema changes in sequence.
// (use different range of task_ids so the new tasks' statements don't conflict with existing keyspaces from previous tasks)
successes = co_await map_reduce(std::views::iota(N, 2*N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{});
// Each task performs M schema changes. There are N tasks.
// Thus, for each task, all other tasks combined perform (N-1) * M schema changes.
// Each `group0_concurrent_modification` exception means that some statement executed successfully in another task.
// Thus, each statement can get at most (N-1) * M `group0_concurrent_modification` exceptions.
// Since we configured the system to perform (N-1) * M + 1 attempts, the last attempt should always succeed even if all previous
// ones failed - because that means every other task has finished its work.
// Thus, `group0_concurrent_modification` should never propagate outside `execute_cql`.
// Therefore the number of successes should be the number of calls to `execute_cql`, which is N*M in total.
BOOST_REQUIRE_EQUAL(successes, N*M);
// Let's verify that the mutex indeed does its job.
rclient.operation_mutex().consume(1337);
mm.set_concurrent_ddl_retries(0);
successes = co_await map_reduce(std::views::iota(2*N, 3*N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{});
// Each execution should have succeeded on first attempt because the mutex serialized them all.
BOOST_REQUIRE_EQUAL(successes, N*M);
});
}
SEASTAR_TEST_CASE(test_group0_batch) {
return do_with_cql_env([] (cql_test_env& e) -> future<> {
auto& rclient = e.get_raft_group0_client();
abort_source as;
// mark the table as group0 to pass the mutation apply check
// (group0 mutations are not allowed on non-group0 tables)
schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (cf_name == "test_group0_batch") {
props.is_group0_table = true;
}
});
co_await e.execute_cql("CREATE TABLE test_group0_batch (key int, part int, PRIMARY KEY (key, part))");
auto insert_mut = [&] (int key, int part) -> future<mutation> {
auto muts = co_await e.get_modification_mutations(format("INSERT INTO test_group0_batch (key, part) VALUES ({}, {})", key, part));
co_return muts[0];
};
auto do_transaction = [&] (std::function<future<>(service::group0_batch&)> f) -> future<> {
auto guard = co_await rclient.start_operation(as);
service::group0_batch mc(std::move(guard));
co_await f(mc);
co_await std::move(mc).commit(rclient, as, ::service::raft_timeout{});
};
// test simple add_mutation
co_await do_transaction([&] (service::group0_batch& mc) -> future<> {
mc.add_mutation(co_await insert_mut(1, 1));
mc.add_mutation(co_await insert_mut(2, 1));
});
BOOST_TEST_PASSPOINT();
assert_that(co_await e.execute_cql("SELECT * FROM test_group0_batch WHERE part = 1 ALLOW FILTERING"))
.is_rows()
.with_size(2)
.with_rows_ignore_order({
{int32_type->decompose(1), int32_type->decompose(1)},
{int32_type->decompose(2), int32_type->decompose(1)}});
// test extract
{
auto guard = co_await rclient.start_operation(as);
service::group0_batch mc(std::move(guard));
mc.add_mutation(co_await insert_mut(1, 2));
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {
co_yield co_await insert_mut(2, 2);
co_yield co_await insert_mut(3, 2);
});
auto v = co_await std::move(mc).extract();
BOOST_REQUIRE_EQUAL(v.first.size(), 3);
BOOST_REQUIRE(v.second); // we got the guard too
}
BOOST_TEST_PASSPOINT();
assert_that(co_await e.execute_cql("SELECT * FROM test_group0_batch WHERE part = 2 ALLOW FILTERING")).is_rows().is_empty();
// test all add methods combined
co_await do_transaction([&] (service::group0_batch& mc) -> future<> {
mc.add_mutations({
co_await insert_mut(1, 3),
co_await insert_mut(2, 3)
});
mc.add_mutation(co_await insert_mut(3, 3));
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {
co_yield co_await insert_mut(4, 3);
co_yield co_await insert_mut(5, 3);
co_yield co_await insert_mut(6, 3);
});
});
BOOST_TEST_PASSPOINT();
assert_that(co_await e.execute_cql("SELECT * FROM test_group0_batch WHERE part = 3 ALLOW FILTERING"))
.is_rows()
.with_size(6)
.with_rows_ignore_order({
{int32_type->decompose(1), int32_type->decompose(3)},
{int32_type->decompose(2), int32_type->decompose(3)},
{int32_type->decompose(3), int32_type->decompose(3)},
{int32_type->decompose(4), int32_type->decompose(3)},
{int32_type->decompose(5), int32_type->decompose(3)},
{int32_type->decompose(6), int32_type->decompose(3)}});
// test generator only
co_await do_transaction([&] (service::group0_batch& mc) -> future<> {
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {
co_yield co_await insert_mut(1, 4);
});
co_return;
});
BOOST_TEST_PASSPOINT();
assert_that(co_await e.execute_cql("SELECT * FROM test_group0_batch WHERE part = 4 ALLOW FILTERING"))
.is_rows()
.with_size(1)
.with_rows_ignore_order({
{int32_type->decompose(1), int32_type->decompose(4)}});
// nop without mutations nor generator
auto mc1 = service::group0_batch::unused();
co_await std::move(mc1).commit(rclient, as, ::service::raft_timeout{});
});
}
BOOST_AUTO_TEST_SUITE_END()