Reformat constructor initializer lists, brace placement, and line wrapping for consistency. The seastar logger already checks is_enabled() before formatting arguments, so explicit guards around trace calls with simple variable arguments are unnecessary. AI-assisted: OpenCode / Claude Opus 4.6 Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
350 lines
11 KiB
C++
350 lines
11 KiB
C++
/*
|
|
* Copyright 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "db/view/base_info.hh"
|
|
#include "utils/assert.hh"
|
|
#include <seastar/core/sharded.hh>
|
|
|
|
#include "schema_registry.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "utils/log.hh"
|
|
#include "db/schema_tables.hh"
|
|
#include "view_info.hh"
|
|
#include "replica/database.hh"
|
|
|
|
static logging::logger slogger("schema_registry");
|
|
|
|
static thread_local schema_registry registry;
|
|
|
|
schema_version_not_found::schema_version_not_found(table_schema_version v)
|
|
: std::runtime_error{format("Schema version {} not found", v)} {
|
|
}
|
|
|
|
schema_version_loading_failed::schema_version_loading_failed(table_schema_version v)
|
|
: std::runtime_error{format("Failed to load schema version {}", v)} {
|
|
}
|
|
|
|
schema_registry_entry::~schema_registry_entry() {
|
|
if (_schema) {
|
|
_schema->_registry_entry = nullptr;
|
|
}
|
|
}
|
|
|
|
schema_registry_entry::schema_registry_entry(table_schema_version v, schema_registry& r)
|
|
: _state(state::INITIAL)
|
|
, _version(v)
|
|
, _registry(r)
|
|
, _sync_state(sync_state::NOT_SYNCED) {
|
|
_erase_timer.set_callback([this] {
|
|
slogger.debug("Dropping {}", _version);
|
|
SCYLLA_ASSERT(!_schema);
|
|
try {
|
|
_registry._entries.erase(_version);
|
|
} catch (...) {
|
|
slogger.error("Failed to erase schema version {}: {}", _version, std::current_exception());
|
|
}
|
|
});
|
|
}
|
|
|
|
schema_registry::~schema_registry() = default;
|
|
|
|
void schema_registry::init(const db::schema_ctxt& ctxt) {
|
|
_ctxt = std::make_unique<db::schema_ctxt>(ctxt);
|
|
}
|
|
|
|
void schema_registry::attach_table(schema_registry_entry& e) noexcept {
|
|
if (e._table) {
|
|
return;
|
|
}
|
|
replica::database* db = _ctxt->get_db();
|
|
if (!db) {
|
|
return;
|
|
}
|
|
try {
|
|
auto& table = db->find_column_family(e.get_schema()->id());
|
|
e.set_table(table.weak_from_this());
|
|
} catch (const replica::no_such_column_family&) {
|
|
if (slogger.is_enabled(seastar::log_level::debug)) {
|
|
slogger.debug("No table for schema version {} of {}.{}: {}", e._version, e.get_schema()->ks_name(), e.get_schema()->cf_name(),
|
|
seastar::current_backtrace());
|
|
}
|
|
// ignore
|
|
}
|
|
}
|
|
|
|
schema_ptr schema_registry::learn(schema_ptr s) {
|
|
auto learned_cdc_schema = s->cdc_schema() ? learn(s->cdc_schema()) : nullptr;
|
|
s->_cdc_schema = learned_cdc_schema;
|
|
if (s->registry_entry()) {
|
|
return s;
|
|
}
|
|
auto i = _entries.find(s->version());
|
|
if (i != _entries.end()) {
|
|
schema_registry_entry& e = *i->second;
|
|
if (e._state == schema_registry_entry::state::LOADING) {
|
|
e.load(s);
|
|
attach_table(e);
|
|
}
|
|
auto loaded_s = e.get_schema();
|
|
loaded_s->_cdc_schema = learned_cdc_schema;
|
|
return loaded_s;
|
|
}
|
|
slogger.debug("Learning about version {} of {}.{}", s->version(), s->ks_name(), s->cf_name());
|
|
auto e_ptr = make_lw_shared<schema_registry_entry>(s->version(), *this);
|
|
auto loaded_s = e_ptr->load(s);
|
|
attach_table(*e_ptr);
|
|
_entries.emplace(s->version(), e_ptr);
|
|
return loaded_s;
|
|
}
|
|
|
|
schema_registry_entry& schema_registry::get_entry(table_schema_version v) const {
|
|
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
|
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
|
throw schema_version_not_found(v);
|
|
}
|
|
}
|
|
|
|
auto i = _entries.find(v);
|
|
if (i == _entries.end()) {
|
|
throw schema_version_not_found(v);
|
|
}
|
|
schema_registry_entry& e = *i->second;
|
|
if (e._state != schema_registry_entry::state::LOADED) {
|
|
throw schema_version_not_found(v);
|
|
}
|
|
return e;
|
|
}
|
|
|
|
schema_registry_entry::erase_clock::duration schema_registry::grace_period() const {
|
|
return std::chrono::seconds(_ctxt->schema_registry_grace_period());
|
|
}
|
|
|
|
schema_ptr schema_registry::get(table_schema_version v) const {
|
|
return get_entry(v).get_schema();
|
|
}
|
|
|
|
frozen_schema schema_registry::get_frozen(table_schema_version v) const {
|
|
return get_entry(v).frozen();
|
|
}
|
|
|
|
future<schema_ptr> schema_registry::get_or_load(table_schema_version v, const async_schema_loader& loader) {
|
|
auto i = _entries.find(v);
|
|
if (i == _entries.end()) {
|
|
auto e_ptr = make_lw_shared<schema_registry_entry>(v, *this);
|
|
auto f = e_ptr->start_loading(loader);
|
|
_entries.emplace(v, e_ptr);
|
|
return f;
|
|
}
|
|
schema_registry_entry& e = *i->second;
|
|
if (e._state == schema_registry_entry::state::LOADING) {
|
|
return e._schema_promise.get_shared_future();
|
|
}
|
|
return make_ready_future<schema_ptr>(e.get_schema());
|
|
}
|
|
|
|
schema_ptr schema_registry::get_or_null(table_schema_version v) const {
|
|
if (auto ignore_version = utils::get_local_injector().inject_parameter<std::string_view>("schema_registry_ignore_version"); ignore_version) {
|
|
if (v == table_schema_version{utils::UUID(*ignore_version)}) {
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
auto i = _entries.find(v);
|
|
if (i == _entries.end()) {
|
|
return nullptr;
|
|
}
|
|
schema_registry_entry& e = *i->second;
|
|
if (e._state != schema_registry_entry::state::LOADED) {
|
|
return nullptr;
|
|
}
|
|
return e.get_schema();
|
|
}
|
|
|
|
schema_ptr schema_registry::get_or_load(table_schema_version v, const schema_loader& loader) {
|
|
auto i = _entries.find(v);
|
|
if (i == _entries.end()) {
|
|
auto e_ptr = make_lw_shared<schema_registry_entry>(v, *this);
|
|
auto s = e_ptr->load(loader(v));
|
|
attach_table(*e_ptr);
|
|
_entries.emplace(v, e_ptr);
|
|
return s;
|
|
}
|
|
schema_registry_entry& e = *i->second;
|
|
if (e._state == schema_registry_entry::state::LOADING) {
|
|
auto s = e.load(loader(v));
|
|
attach_table(e);
|
|
return s;
|
|
}
|
|
return e.get_schema();
|
|
}
|
|
|
|
void schema_registry::clear() {
|
|
_entries.clear();
|
|
}
|
|
|
|
schema_ptr schema_registry_entry::load(extended_frozen_schema fs) {
|
|
_extended_frozen_schema = std::move(fs);
|
|
auto s = get_schema();
|
|
if (_state == state::LOADING) {
|
|
_schema_promise.set_value(s);
|
|
_schema_promise = {};
|
|
}
|
|
_state = state::LOADED;
|
|
slogger.trace("Loaded {} = {}", _version, *s);
|
|
return s;
|
|
}
|
|
|
|
schema_ptr schema_registry_entry::load(schema_ptr s) {
|
|
_extended_frozen_schema = extended_frozen_schema(s);
|
|
_schema = &*s;
|
|
_schema->_registry_entry = this;
|
|
_erase_timer.cancel();
|
|
if (_state == state::LOADING) {
|
|
_schema_promise.set_value(s);
|
|
_schema_promise = {};
|
|
}
|
|
_state = state::LOADED;
|
|
slogger.trace("Loaded {} = {}", _version, *s);
|
|
return s;
|
|
}
|
|
|
|
future<schema_ptr> schema_registry_entry::start_loading(async_schema_loader loader) {
|
|
_loader = std::move(loader);
|
|
auto f = _loader(_version);
|
|
auto sf = _schema_promise.get_shared_future();
|
|
_state = state::LOADING;
|
|
slogger.trace("Loading {}", _version);
|
|
// Move to background.
|
|
(void)f.then_wrapped([self = shared_from_this(), this](future<extended_frozen_schema>&& f) {
|
|
_loader = {};
|
|
if (_state != state::LOADING) {
|
|
slogger.trace("Loading of {} aborted", _version);
|
|
return;
|
|
}
|
|
try {
|
|
try {
|
|
load(f.get());
|
|
_registry.attach_table(*this);
|
|
} catch (...) {
|
|
std::throw_with_nested(schema_version_loading_failed(_version));
|
|
}
|
|
} catch (...) {
|
|
slogger.debug("Loading of {} failed: {}", _version, std::current_exception());
|
|
_schema_promise.set_exception(std::current_exception());
|
|
_registry._entries.erase(_version);
|
|
}
|
|
});
|
|
return sf;
|
|
}
|
|
|
|
schema_ptr schema_registry_entry::get_schema() {
|
|
if (!_schema) {
|
|
slogger.trace("Activating {}", _version);
|
|
schema_ptr s;
|
|
s = _extended_frozen_schema->unfreeze(*_registry._ctxt);
|
|
if (s->version() != _version) {
|
|
throw std::runtime_error(format("Unfrozen schema version doesn't match entry version ({}): {}", _version, *s));
|
|
}
|
|
_erase_timer.cancel();
|
|
s->_registry_entry = this;
|
|
_schema = &*s;
|
|
return s;
|
|
} else {
|
|
return _schema->shared_from_this();
|
|
}
|
|
}
|
|
|
|
void schema_registry_entry::detach_schema() noexcept {
|
|
slogger.trace("Deactivating {}", _version);
|
|
_schema = nullptr;
|
|
_erase_timer.arm(_registry.grace_period());
|
|
}
|
|
|
|
extended_frozen_schema schema_registry_entry::extended_frozen() const {
|
|
SCYLLA_ASSERT(_state >= state::LOADED);
|
|
return *_extended_frozen_schema;
|
|
}
|
|
|
|
frozen_schema schema_registry_entry::frozen() const {
|
|
SCYLLA_ASSERT(_state >= state::LOADED);
|
|
return _extended_frozen_schema->fs;
|
|
}
|
|
|
|
bool schema_registry_entry::is_synced() const {
|
|
return _sync_state == sync_state::SYNCED;
|
|
}
|
|
|
|
void schema_registry_entry::mark_synced() {
|
|
if (_sync_state == sync_state::SYNCED) {
|
|
return;
|
|
}
|
|
_registry.attach_table(*this);
|
|
_sync_state = sync_state::SYNCED;
|
|
slogger.debug("Marked {} as synced", _version);
|
|
}
|
|
|
|
schema_registry& local_schema_registry() {
|
|
return registry;
|
|
}
|
|
|
|
global_schema_ptr::global_schema_ptr(const global_schema_ptr& o)
|
|
: global_schema_ptr(o.get()) {
|
|
}
|
|
|
|
global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept {
|
|
auto current = this_shard_id();
|
|
SCYLLA_ASSERT(o._cpu_of_origin == current);
|
|
_ptr = std::move(o._ptr);
|
|
_cpu_of_origin = current;
|
|
}
|
|
|
|
schema_ptr global_schema_ptr::get() const {
|
|
if (this_shard_id() == _cpu_of_origin) {
|
|
return _ptr;
|
|
} else {
|
|
auto registered_schema = [](const schema_registry_entry& e) -> schema_ptr {
|
|
schema_ptr ret = local_schema_registry().get_or_null(e.version());
|
|
if (!ret) {
|
|
ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) -> extended_frozen_schema {
|
|
return e.extended_frozen();
|
|
});
|
|
}
|
|
return ret;
|
|
};
|
|
|
|
// the following code contains registry entry dereference of a foreign shard
|
|
// however, it is guaranteed to succeed since we made sure in the constructor
|
|
// that _ptr will have a registry on the foreign shard where this
|
|
// object originated so as long as this object lives the registry entries lives too
|
|
// and it is safe to reference them on foreign shards.
|
|
schema_ptr s = registered_schema(*_ptr->registry_entry());
|
|
if (_ptr->registry_entry()->is_synced()) {
|
|
s->registry_entry()->mark_synced();
|
|
}
|
|
return s;
|
|
}
|
|
}
|
|
|
|
global_schema_ptr::global_schema_ptr(const schema_ptr& ptr)
|
|
: _cpu_of_origin(this_shard_id()) {
|
|
// _ptr must always have an associated registry entry,
|
|
// if ptr doesn't, we need to load it into the registry.
|
|
auto ensure_registry_entry = [](const schema_ptr& s) {
|
|
schema_registry_entry* e = s->registry_entry();
|
|
if (e) {
|
|
return s;
|
|
} else {
|
|
return local_schema_registry().get_or_load(s->version(), [&s](table_schema_version) -> extended_frozen_schema {
|
|
return extended_frozen_schema(s);
|
|
});
|
|
}
|
|
};
|
|
|
|
_ptr = ensure_registry_entry(ptr);
|
|
}
|