From 4a52b802acbf111a3a48525beafa8cd4a48b668c Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Wed, 19 Jan 2022 18:26:48 +0100 Subject: [PATCH] test: unit test for group 0 concurrent change protection and CQL DDL retries Check that group 0 history grows iff a schema change does not throw `group0_concurrent_modification`. Check that the CQL DDL statement retry mechanism works as expected. --- service/migration_manager.cc | 8 +++ service/migration_manager.hh | 6 ++ test/boost/group0_test.cc | 106 +++++++++++++++++++++++++++++++---- 3 files changed, 109 insertions(+), 11 deletions(-) diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 7ae9f9977f..77fd0f79e5 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -1327,4 +1327,12 @@ void migration_manager::set_group0_history_gc_duration(gc_clock::duration d) { _group0_history_gc_duration = d; } +void migration_manager::set_concurrent_ddl_retries(size_t n) { + _concurrent_ddl_retries = n; +} + +semaphore& migration_manager::group0_operation_mutex() { + return _group0_operation_mutex; +} + } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 899257a148..b7d6a0a84b 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -251,6 +251,12 @@ private: public: // For tests only. void set_group0_history_gc_duration(gc_clock::duration); + + // For tests only. + void set_concurrent_ddl_retries(size_t); + + // For tests only. + semaphore& group0_operation_mutex(); }; future get_column_mapping(utils::UUID table_id, table_schema_version v); diff --git a/test/boost/group0_test.cc b/test/boost/group0_test.cc index 116c38c9f5..b9cd2cfd82 100644 --- a/test/boost/group0_test.cc +++ b/test/boost/group0_test.cc @@ -16,16 +16,22 @@ #include "transport/messages/result_message.hh" #include "service/migration_manager.hh" +static future>> fetch_rows(cql_test_env& e, std::string_view cql) { + auto msg = co_await e.execute_cql(cql); + auto rows = dynamic_pointer_cast(msg); + BOOST_REQUIRE(rows); + co_return rows->rs().result_set().rows(); +} + +static future get_history_size(cql_test_env& e) { + co_return (co_await fetch_rows(e, "select * from system.group0_history")).size(); +} + SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) { return do_with_cql_env([] (cql_test_env& e) -> future<> { using namespace std::chrono; - auto get_history_size = [&] () -> future { - auto msg = co_await e.execute_cql("select * from system.group0_history"); - auto rows = dynamic_pointer_cast(msg); - BOOST_REQUIRE(rows); - co_return rows->rs().result_set().rows().size(); - }; + auto get_history_size = std::bind_front(::get_history_size, std::ref(e)); auto perform_schema_change = [&, has_ks = false] () mutable -> future<> { if (has_ks) { @@ -66,12 +72,9 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) { } auto get_history_timestamps = [&] () -> future> { - auto msg = co_await e.execute_cql("select state_id from system.group0_history"); - auto rows = dynamic_pointer_cast(msg); - BOOST_REQUIRE(rows); - + auto rows = co_await fetch_rows(e, "select state_id from system.group0_history"); std::vector result; - for (auto& row: rows->rs().result_set().rows()) { + for (auto& row: rows) { auto state_id = value_cast(timeuuid_type->deserialize(*row[0])); result.push_back(utils::UUID_gen::unix_timestamp(state_id)); } @@ -104,3 +107,84 @@ SEASTAR_TEST_CASE(test_group0_history_clearing_old_entries) { }, raft_cql_test_config()); } + +SEASTAR_TEST_CASE(test_concurrent_group0_modifications) { + return do_with_cql_env([] (cql_test_env& e) -> future<> { + auto& mm = e.migration_manager().local(); + + // migration_manager::_group0_operation_mutex prevents concurrent group 0 changes to be executed on a single node, + // so in production `group0_concurrent_modification` never occurs if all changes go through a single node. + // For this test, give it more units so it doesn't block these concurrent executions + // in order to simulate a scenario where multiple nodes concurrently send schema changes. + mm.group0_operation_mutex().signal(1337); + + // Make DDL statement execution fail on the first attempt if it gets a concurrent modification exception. + mm.set_concurrent_ddl_retries(0); + + auto get_history_size = std::bind_front(::get_history_size, std::ref(e)); + + auto perform_schema_changes = [] (cql_test_env& e, size_t n, size_t task_id) -> future { + size_t successes = 0; + bool has_ks = false; + auto drop_ks_cql = format("drop keyspace new_ks{}", task_id); + auto create_ks_cql = format("create keyspace new_ks{} with replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}", task_id); + + auto perform = [&] () -> future<> { + try { + if (has_ks) { + co_await e.execute_cql(drop_ks_cql); + } else { + co_await e.execute_cql(create_ks_cql); + } + has_ks = !has_ks; + ++successes; + } catch (const service::group0_concurrent_modification&) {} + }; + + while (n--) { + co_await perform(); + } + + co_return successes; + }; + + auto size = co_await get_history_size(); + + size_t N = 4; + size_t M = 4; + + // Run N concurrent tasks, each performing M schema changes in sequence. + auto successes = co_await map_reduce(boost::irange(size_t{0}, N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{}); + + // The number of new entries that appeared in group 0 history table should be exactly equal + // to the number of successful schema changes. + BOOST_REQUIRE_EQUAL(successes, (co_await get_history_size()) - size); + + // Make it so that execution of a DDL statement will perform up to (N-1) * M + 1 attempts (first try + up to (N-1) * M retries). + mm.set_concurrent_ddl_retries((N-1)*M); + + // Run N concurrent tasks, each performing M schema changes in sequence. + // (use different range of task_ids so the new tasks' statements don't conflict with existing keyspaces from previous tasks) + successes = co_await map_reduce(boost::irange(N, 2*N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{}); + + // Each task performs M schema changes. There are N tasks. + // Thus, for each task, all other tasks combined perform (N-1) * M schema changes. + // Each `group0_concurrent_modification` exception means that some statement executed successfully in another task. + // Thus, each statement can get at most (N-1) * M `group0_concurrent_modification` exceptions. + // Since we configured the system to perform (N-1) * M + 1 attempts, the last attempt should always succeed even if all previous + // ones failed - because that means every other task has finished its work. + // Thus, `group0_concurrent_modification` should never propagate outside `execute_cql`. + // Therefore the number of successes should be the number of calls to `execute_cql`, which is N*M in total. + BOOST_REQUIRE_EQUAL(successes, N*M); + + // Let's verify that the mutex indeed does its job. + mm.group0_operation_mutex().consume(1337); + mm.set_concurrent_ddl_retries(0); + + successes = co_await map_reduce(boost::irange(2*N, 3*N), std::bind_front(perform_schema_changes, std::ref(e), M), 0, std::plus{}); + + // Each execution should have succeeded on first attempt because the mutex serialized them all. + BOOST_REQUIRE_EQUAL(successes, N*M); + + }, raft_cql_test_config()); +}