316 lines
12 KiB
C++
316 lines
12 KiB
C++
/*
|
|
* Copyright (C) 2016-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
|
|
#include <seastar/core/thread.hh>
|
|
|
|
#undef SEASTAR_TESTING_MAIN
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/lowres_clock.hh>
|
|
#include "init.hh"
|
|
#include "data_dictionary/user_types_metadata.hh"
|
|
#include "schema/schema_registry.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "test/lib/mutation_source_test.hh"
|
|
#include "db/config.hh"
|
|
#include "db/schema_applier.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "types/list.hh"
|
|
#include "utils/throttle.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "view_info.hh"
|
|
#include "mutation/async_utils.hh"
|
|
|
|
BOOST_AUTO_TEST_SUITE(schema_registry_test)
|
|
|
|
static bytes random_column_name() {
|
|
return to_bytes(to_hex(make_blob(32)));
|
|
}
|
|
|
|
static schema_ptr random_schema() {
|
|
return schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
}
|
|
|
|
struct dummy_init {
|
|
std::unique_ptr<db::config> config;
|
|
gms::feature_service fs;
|
|
seastar::lowres_clock::duration grace_period;
|
|
dummy_init()
|
|
: config(std::make_unique<db::config>())
|
|
, fs({get_disabled_features_from_db_config(*config)})
|
|
, grace_period(std::chrono::seconds(config->schema_registry_grace_period())) {
|
|
local_schema_registry().init(db::schema_ctxt(*config, std::make_shared<data_dictionary::dummy_user_types_storage>(), fs));
|
|
}
|
|
};
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_load_with_non_nantive_type) {
|
|
dummy_init dummy;
|
|
auto my_list_type = list_type_impl::get_instance(utf8_type, true);
|
|
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", my_list_type)
|
|
.build();
|
|
|
|
local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
|
|
return make_ready_future<extended_frozen_schema>(s);
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_load_with_cdc_schema) {
|
|
dummy_init dummy;
|
|
auto s_cdc = schema_builder("ks", "cdc_cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.build();
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.with_cdc_schema(s_cdc)
|
|
.build();
|
|
|
|
auto s_loaded = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
|
|
return make_ready_future<extended_frozen_schema>(s);
|
|
}).get();
|
|
|
|
BOOST_REQUIRE(s_loaded->cdc_schema()->version() == s_cdc->version());
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_learn_schema_with_cdc) {
|
|
dummy_init dummy;
|
|
auto s_cdc = schema_builder("ks", "cdc_cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.build();
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.with_cdc_schema(s_cdc)
|
|
.build();
|
|
|
|
s = local_schema_registry().learn(s);
|
|
|
|
BOOST_REQUIRE(s->registry_entry());
|
|
BOOST_REQUIRE(s->cdc_schema()->registry_entry());
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_learn_loaded_schema_with_cdc) {
|
|
dummy_init dummy;
|
|
auto s_cdc = schema_builder("ks", "cdc_cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.build();
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("val", bytes_type)
|
|
.with_cdc_schema(s_cdc)
|
|
.build();
|
|
|
|
local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) {
|
|
return make_ready_future<extended_frozen_schema>(s);
|
|
}).get();
|
|
|
|
s = local_schema_registry().learn(s);
|
|
|
|
BOOST_REQUIRE(s->registry_entry());
|
|
BOOST_REQUIRE(s->cdc_schema()->registry_entry());
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_async_loading) {
|
|
return seastar::async([] {
|
|
dummy_init dummy;
|
|
auto s1 = random_schema();
|
|
auto s2 = random_schema();
|
|
|
|
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
|
|
return make_ready_future<extended_frozen_schema>(s1);
|
|
}).get();
|
|
|
|
BOOST_REQUIRE(s1_loaded);
|
|
BOOST_REQUIRE(s1_loaded->version() == s1->version());
|
|
auto s1_later = local_schema_registry().get_or_null(s1->version());
|
|
BOOST_REQUIRE(s1_later);
|
|
|
|
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
|
|
return yield().then([s2] -> extended_frozen_schema { return s2; });
|
|
}).get();
|
|
|
|
BOOST_REQUIRE(s2_loaded);
|
|
BOOST_REQUIRE(s2_loaded->version() == s2->version());
|
|
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
|
|
BOOST_REQUIRE(s2_later);
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_table_is_attached) {
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
auto s0 = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("v1", bytes_type)
|
|
.build();
|
|
|
|
auto s0_wild = schema_builder(s0)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
|
|
// Simulate fetching schema version before the table is created.
|
|
local_schema_registry().learn(s0);
|
|
|
|
// Use schema mutations so that table id is the same as in s0.
|
|
{
|
|
auto sm0 = db::schema_tables::make_schema_mutations(s0, api::new_timestamp(), true);
|
|
utils::chunked_vector<mutation> muts;
|
|
sm0.copy_to(muts);
|
|
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), e.get_storage_service(),
|
|
muts).get();
|
|
}
|
|
|
|
// This should attach the table
|
|
s0->registry_entry()->mark_synced();
|
|
BOOST_REQUIRE(s0->maybe_table());
|
|
|
|
// mark_synced() should attach the table
|
|
local_schema_registry().learn(s0_wild);
|
|
s0_wild->registry_entry()->mark_synced();
|
|
BOOST_REQUIRE(s0_wild->maybe_table());
|
|
|
|
auto s1 = schema_builder(s0)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
BOOST_REQUIRE(!s1->maybe_table());
|
|
|
|
e.execute_cql("ALTER TABLE ks.cf ADD dummy int").get();
|
|
|
|
s1 = local_schema_registry().learn(s1);
|
|
s1->registry_entry()->mark_synced();
|
|
BOOST_REQUIRE(&s1->table() == s0->maybe_table());
|
|
|
|
auto learned_s1 = local_schema_registry().learn(s1);
|
|
BOOST_REQUIRE(learned_s1->maybe_table() == s0->maybe_table());
|
|
BOOST_REQUIRE(&learned_s1->table() == s0->maybe_table());
|
|
|
|
auto s2 = schema_builder(s0)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
|
|
auto learned_s2 = local_schema_registry().get_or_load(s2->version(), [&] (table_schema_version) -> extended_frozen_schema {
|
|
return s2;
|
|
});
|
|
BOOST_REQUIRE(learned_s2->maybe_table() == s0->maybe_table());
|
|
|
|
if (smp::count > 1) {
|
|
smp::submit_to(1, [&e, gs = global_schema_ptr(learned_s2)] {
|
|
schema_ptr s0 = e.local_db().find_column_family("ks", "cf").schema();
|
|
BOOST_REQUIRE(gs.get()->maybe_table());
|
|
BOOST_REQUIRE(gs.get()->maybe_table() == s0->maybe_table());
|
|
}).get();
|
|
}
|
|
|
|
// Simulate concurrent schema version fetch and learn() from schema merge which cuts the race.
|
|
auto s3 = schema_builder(s2)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
utils::throttle s3_thr;
|
|
auto s3_entered = s3_thr.block();
|
|
auto learned_s3 = local_schema_registry().get_or_load(s3->version(), [&, s3] (table_schema_version) -> future<extended_frozen_schema> {
|
|
co_await s3_thr.enter();
|
|
co_return s3;
|
|
});
|
|
s3_entered.get();
|
|
local_schema_registry().learn(s3);
|
|
s3_thr.unblock();
|
|
auto s3_s = learned_s3.get();
|
|
BOOST_REQUIRE(s3_s->maybe_table() == s0->maybe_table());
|
|
BOOST_REQUIRE(s3->maybe_table() == s0->maybe_table());
|
|
|
|
// Simulate concurrent schema version fetch and get_or_load() from global_schema_ptr which cuts the race.
|
|
auto s4 = schema_builder(s3)
|
|
.with_column(random_column_name(), bytes_type)
|
|
.build();
|
|
utils::throttle s4_thr;
|
|
auto s4_entered = s4_thr.block();
|
|
auto learned_s4 = local_schema_registry().get_or_load(s4->version(), [&, s4] (table_schema_version) -> future<extended_frozen_schema> {
|
|
co_await s4_thr.enter();
|
|
co_return s4;
|
|
});
|
|
s4_entered.get();
|
|
s4 = local_schema_registry().get_or_load(s4->version(), [&, s4] (table_schema_version) -> extended_frozen_schema { return s4; });
|
|
s4_thr.unblock();
|
|
auto s4_s = learned_s4.get();
|
|
BOOST_REQUIRE(s4_s->maybe_table() == s0->maybe_table());
|
|
BOOST_REQUIRE(s4->maybe_table() == s0->maybe_table());
|
|
|
|
e.execute_cql("DROP TABLE ks.cf;").get();
|
|
|
|
BOOST_REQUIRE(!s0->maybe_table());
|
|
BOOST_REQUIRE(!learned_s1->maybe_table());
|
|
BOOST_REQUIRE(!learned_s2->maybe_table());
|
|
BOOST_REQUIRE_THROW(learned_s1->table(), replica::no_such_column_family);
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_schema_is_recovered_after_dying) {
|
|
dummy_init dummy;
|
|
auto base_schema = schema_builder("ks", "cf")
|
|
.with_column("pk", int32_type, column_kind::partition_key)
|
|
.with_column("v", int32_type)
|
|
.build();
|
|
auto base_registry_schema = local_schema_registry().get_or_load(base_schema->version(),
|
|
[base_schema] (table_schema_version) -> extended_frozen_schema { return base_schema; });
|
|
base_registry_schema = nullptr;
|
|
auto recovered_registry_schema = local_schema_registry().get_or_null(base_schema->version());
|
|
BOOST_REQUIRE(recovered_registry_schema->version() == base_schema->version());
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_view_info_is_recovered_after_dying) {
|
|
dummy_init dummy;
|
|
auto base_schema = schema_builder("ks", "cf")
|
|
.with_column("pk", int32_type, column_kind::partition_key)
|
|
.with_column("v", int32_type)
|
|
.build();
|
|
schema_builder view_builder("ks", "cf_view");
|
|
auto view_schema = schema_builder("ks", "cf_view")
|
|
.with_column("v", int32_type, column_kind::partition_key)
|
|
.with_column("pk", int32_type)
|
|
.with_view_info(base_schema, false, "pk IS NOT NULL AND v IS NOT NULL")
|
|
.build();
|
|
local_schema_registry().get_or_load(view_schema->version(),
|
|
[view_schema] (table_schema_version) -> extended_frozen_schema { return view_schema; });
|
|
auto view_registry_schema = local_schema_registry().get_or_null(view_schema->version());
|
|
BOOST_REQUIRE(view_registry_schema);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_merge_schema_with_large_collection_of_mutations) {
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
utils::chunked_vector<mutation> mutations;
|
|
for (auto i : std::views::iota(0, 1000)) {
|
|
auto s0 = schema_builder("ks", fmt::format("cf_{}", i))
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("v1", bytes_type)
|
|
.build();
|
|
|
|
db::schema_tables::make_schema_mutations(s0, api::new_timestamp(), true).copy_to(mutations);
|
|
}
|
|
|
|
BOOST_REQUIRE(seastar::memory::stats().large_allocations() == 0);
|
|
seastar::memory::scoped_large_allocation_warning_threshold guard((size_t(128) << 10)+1); // 128 KiB + 1 byte
|
|
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), e.get_storage_service(),
|
|
mutations).get();
|
|
BOOST_REQUIRE(seastar::memory::stats().large_allocations() == 0);
|
|
|
|
}).get();
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|