Files
scylladb/test/boost/schema_registry_test.cc
Gleb Natapov 74b5a8d43d schema: drop schema_registry_entry::maybe_sync() function
Schema is synced through group0 now. Drop all the test of the function
as well.
2026-03-10 10:46:47 +02:00

316 lines
12 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/thread.hh>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/lowres_clock.hh>
#include "init.hh"
#include "data_dictionary/user_types_metadata.hh"
#include "schema/schema_registry.hh"
#include "schema/schema_builder.hh"
#include "test/lib/mutation_source_test.hh"
#include "db/config.hh"
#include "db/schema_applier.hh"
#include "db/schema_tables.hh"
#include "types/list.hh"
#include "utils/throttle.hh"
#include "test/lib/cql_test_env.hh"
#include "gms/feature_service.hh"
#include "view_info.hh"
#include "mutation/async_utils.hh"
BOOST_AUTO_TEST_SUITE(schema_registry_test)
static bytes random_column_name() {
return to_bytes(to_hex(make_blob(32)));
}
static schema_ptr random_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column(random_column_name(), bytes_type)
.build();
}
struct dummy_init {
std::unique_ptr<db::config> config;
gms::feature_service fs;
seastar::lowres_clock::duration grace_period;
dummy_init()
: config(std::make_unique<db::config>())
, fs({get_disabled_features_from_db_config(*config)})
, grace_period(std::chrono::seconds(config->schema_registry_grace_period())) {
local_schema_registry().init(db::schema_ctxt(*config, std::make_shared<data_dictionary::dummy_user_types_storage>(), fs));
}
};
SEASTAR_THREAD_TEST_CASE(test_load_with_non_nantive_type) {
dummy_init dummy;
auto my_list_type = list_type_impl::get_instance(utf8_type, true);
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", my_list_type)
.build();
local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
return make_ready_future<extended_frozen_schema>(s);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_with_cdc_schema) {
dummy_init dummy;
auto s_cdc = schema_builder("ks", "cdc_cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.build();
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.with_cdc_schema(s_cdc)
.build();
auto s_loaded = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
return make_ready_future<extended_frozen_schema>(s);
}).get();
BOOST_REQUIRE(s_loaded->cdc_schema()->version() == s_cdc->version());
}
SEASTAR_THREAD_TEST_CASE(test_learn_schema_with_cdc) {
dummy_init dummy;
auto s_cdc = schema_builder("ks", "cdc_cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.build();
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.with_cdc_schema(s_cdc)
.build();
s = local_schema_registry().learn(s);
BOOST_REQUIRE(s->registry_entry());
BOOST_REQUIRE(s->cdc_schema()->registry_entry());
}
SEASTAR_THREAD_TEST_CASE(test_learn_loaded_schema_with_cdc) {
dummy_init dummy;
auto s_cdc = schema_builder("ks", "cdc_cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.build();
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("val", bytes_type)
.with_cdc_schema(s_cdc)
.build();
local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
return make_ready_future<extended_frozen_schema>(s);
}).get();
s = local_schema_registry().learn(s);
BOOST_REQUIRE(s->registry_entry());
BOOST_REQUIRE(s->cdc_schema()->registry_entry());
}
SEASTAR_TEST_CASE(test_async_loading) {
return seastar::async([] {
dummy_init dummy;
auto s1 = random_schema();
auto s2 = random_schema();
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
return make_ready_future<extended_frozen_schema>(s1);
}).get();
BOOST_REQUIRE(s1_loaded);
BOOST_REQUIRE(s1_loaded->version() == s1->version());
auto s1_later = local_schema_registry().get_or_null(s1->version());
BOOST_REQUIRE(s1_later);
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
return yield().then([s2] -> extended_frozen_schema { return s2; });
}).get();
BOOST_REQUIRE(s2_loaded);
BOOST_REQUIRE(s2_loaded->version() == s2->version());
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
BOOST_REQUIRE(s2_later);
});
}
SEASTAR_THREAD_TEST_CASE(test_table_is_attached) {
do_with_cql_env_thread([] (cql_test_env& e) {
auto s0 = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v1", bytes_type)
.build();
auto s0_wild = schema_builder(s0)
.with_column(random_column_name(), bytes_type)
.build();
// Simulate fetching schema version before the table is created.
local_schema_registry().learn(s0);
// Use schema mutations so that table id is the same as in s0.
{
auto sm0 = db::schema_tables::make_schema_mutations(s0, api::new_timestamp(), true);
utils::chunked_vector<mutation> muts;
sm0.copy_to(muts);
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), e.get_storage_service(),
muts).get();
}
// This should attach the table
s0->registry_entry()->mark_synced();
BOOST_REQUIRE(s0->maybe_table());
// mark_synced() should attach the table
local_schema_registry().learn(s0_wild);
s0_wild->registry_entry()->mark_synced();
BOOST_REQUIRE(s0_wild->maybe_table());
auto s1 = schema_builder(s0)
.with_column(random_column_name(), bytes_type)
.build();
BOOST_REQUIRE(!s1->maybe_table());
e.execute_cql("ALTER TABLE ks.cf ADD dummy int").get();
s1 = local_schema_registry().learn(s1);
s1->registry_entry()->mark_synced();
BOOST_REQUIRE(&s1->table() == s0->maybe_table());
auto learned_s1 = local_schema_registry().learn(s1);
BOOST_REQUIRE(learned_s1->maybe_table() == s0->maybe_table());
BOOST_REQUIRE(&learned_s1->table() == s0->maybe_table());
auto s2 = schema_builder(s0)
.with_column(random_column_name(), bytes_type)
.build();
auto learned_s2 = local_schema_registry().get_or_load(s2->version(), [&] (table_schema_version) -> extended_frozen_schema {
return s2;
});
BOOST_REQUIRE(learned_s2->maybe_table() == s0->maybe_table());
if (smp::count > 1) {
smp::submit_to(1, [&e, gs = global_schema_ptr(learned_s2)] {
schema_ptr s0 = e.local_db().find_column_family("ks", "cf").schema();
BOOST_REQUIRE(gs.get()->maybe_table());
BOOST_REQUIRE(gs.get()->maybe_table() == s0->maybe_table());
}).get();
}
// Simulate concurrent schema version fetch and learn() from schema merge which cuts the race.
auto s3 = schema_builder(s2)
.with_column(random_column_name(), bytes_type)
.build();
utils::throttle s3_thr;
auto s3_entered = s3_thr.block();
auto learned_s3 = local_schema_registry().get_or_load(s3->version(), [&, s3] (table_schema_version) -> future<extended_frozen_schema> {
co_await s3_thr.enter();
co_return s3;
});
s3_entered.get();
local_schema_registry().learn(s3);
s3_thr.unblock();
auto s3_s = learned_s3.get();
BOOST_REQUIRE(s3_s->maybe_table() == s0->maybe_table());
BOOST_REQUIRE(s3->maybe_table() == s0->maybe_table());
// Simulate concurrent schema version fetch and get_or_load() from global_schema_ptr which cuts the race.
auto s4 = schema_builder(s3)
.with_column(random_column_name(), bytes_type)
.build();
utils::throttle s4_thr;
auto s4_entered = s4_thr.block();
auto learned_s4 = local_schema_registry().get_or_load(s4->version(), [&, s4] (table_schema_version) -> future<extended_frozen_schema> {
co_await s4_thr.enter();
co_return s4;
});
s4_entered.get();
s4 = local_schema_registry().get_or_load(s4->version(), [&, s4] (table_schema_version) -> extended_frozen_schema { return s4; });
s4_thr.unblock();
auto s4_s = learned_s4.get();
BOOST_REQUIRE(s4_s->maybe_table() == s0->maybe_table());
BOOST_REQUIRE(s4->maybe_table() == s0->maybe_table());
e.execute_cql("DROP TABLE ks.cf;").get();
BOOST_REQUIRE(!s0->maybe_table());
BOOST_REQUIRE(!learned_s1->maybe_table());
BOOST_REQUIRE(!learned_s2->maybe_table());
BOOST_REQUIRE_THROW(learned_s1->table(), replica::no_such_column_family);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_schema_is_recovered_after_dying) {
dummy_init dummy;
auto base_schema = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("v", int32_type)
.build();
auto base_registry_schema = local_schema_registry().get_or_load(base_schema->version(),
[base_schema] (table_schema_version) -> extended_frozen_schema { return base_schema; });
base_registry_schema = nullptr;
auto recovered_registry_schema = local_schema_registry().get_or_null(base_schema->version());
BOOST_REQUIRE(recovered_registry_schema->version() == base_schema->version());
}
SEASTAR_THREAD_TEST_CASE(test_view_info_is_recovered_after_dying) {
dummy_init dummy;
auto base_schema = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("v", int32_type)
.build();
schema_builder view_builder("ks", "cf_view");
auto view_schema = schema_builder("ks", "cf_view")
.with_column("v", int32_type, column_kind::partition_key)
.with_column("pk", int32_type)
.with_view_info(base_schema, false, "pk IS NOT NULL AND v IS NOT NULL")
.build();
local_schema_registry().get_or_load(view_schema->version(),
[view_schema] (table_schema_version) -> extended_frozen_schema { return view_schema; });
auto view_registry_schema = local_schema_registry().get_or_null(view_schema->version());
BOOST_REQUIRE(view_registry_schema);
}
SEASTAR_THREAD_TEST_CASE(test_merge_schema_with_large_collection_of_mutations) {
do_with_cql_env_thread([] (cql_test_env& e) {
utils::chunked_vector<mutation> mutations;
for (auto i : std::views::iota(0, 1000)) {
auto s0 = schema_builder("ks", fmt::format("cf_{}", i))
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v1", bytes_type)
.build();
db::schema_tables::make_schema_mutations(s0, api::new_timestamp(), true).copy_to(mutations);
}
BOOST_REQUIRE(seastar::memory::stats().large_allocations() == 0);
seastar::memory::scoped_large_allocation_warning_threshold guard((size_t(128) << 10)+1); // 128 KiB + 1 byte
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), e.get_storage_service(),
mutations).get();
BOOST_REQUIRE(seastar::memory::stats().large_allocations() == 0);
}).get();
}
BOOST_AUTO_TEST_SUITE_END()