Files
scylladb/schema/schema_registry.cc
Michael Litvak ac96e40f13 schema: add pointer to CDC schema
Add to the schema object a member that points to the CDC schema object
that is compatible with this schema, if any.

The compatible CDC schema is created and altered with its base schema in
the same group0 operation.

When generating CDC log mutations for some base mutation we want them to
be created using a compatible schema thas has a CDC column corresponding
to each base column. This change will allow us to find the right CDC
schema given a base mutation.

We also update the relevant structures in the schema registry that are
related to learning about schemas and transporting schemas across
shards or nodes.

When transporting a schema as frozen_schema, we need to transport the
frozen cdc schema as well, and set it again when unfreezing and
reconstructing the schema.

When adding a schema to the registry, we need to ensure its CDC schema
is added to the registry as well.

Currently we always set the CDC schema to nullptr and maintain the
previous behavior. We will change it in a later commit. Until then, we
mark all places where CDC schema is passed clearly so we don't forget
it.
2025-10-21 14:13:43 +02:00

378 lines
12 KiB
C++

/*
* Copyright 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/view/base_info.hh"
#include "utils/assert.hh"
#include <seastar/core/sharded.hh>
#include "schema_registry.hh"
#include "utils/log.hh"
#include "db/schema_tables.hh"
#include "view_info.hh"
#include "replica/database.hh"
static logging::logger slogger("schema_registry");
static thread_local schema_registry registry;
schema_version_not_found::schema_version_not_found(table_schema_version v)
: std::runtime_error{format("Schema version {} not found", v)}
{ }
schema_version_loading_failed::schema_version_loading_failed(table_schema_version v)
: std::runtime_error{format("Failed to load schema version {}", v)}
{ }
schema_registry_entry::~schema_registry_entry() {
if (_schema) {
_schema->_registry_entry = nullptr;
}
}
schema_registry_entry::schema_registry_entry(table_schema_version v, schema_registry& r)
: _state(state::INITIAL)
, _version(v)
, _registry(r)
, _sync_state(sync_state::NOT_SYNCED)
{
_erase_timer.set_callback([this] {
slogger.debug("Dropping {}", _version);
SCYLLA_ASSERT(!_schema);
try {
_registry._entries.erase(_version);
} catch (...) {
slogger.error("Failed to erase schema version {}: {}", _version, std::current_exception());
}
});
}
schema_registry::~schema_registry() = default;
void schema_registry::init(const db::schema_ctxt& ctxt) {
_ctxt = std::make_unique<db::schema_ctxt>(ctxt);
}
void schema_registry::attach_table(schema_registry_entry& e) noexcept {
if (e._table) {
return;
}
replica::database* db = _ctxt->get_db();
if (!db) {
return;
}
try {
auto& table = db->find_column_family(e.get_schema()->id());
e.set_table(table.weak_from_this());
} catch (const replica::no_such_column_family&) {
if (slogger.is_enabled(seastar::log_level::debug)) {
slogger.debug("No table for schema version {} of {}.{}: {}", e._version,
e.get_schema()->ks_name(), e.get_schema()->cf_name(), seastar::current_backtrace());
}
// ignore
}
}
schema_ptr schema_registry::learn(schema_ptr s) {
auto learned_cdc_schema = s->cdc_schema() ? local_schema_registry().learn(s->cdc_schema()) : nullptr;
if (learned_cdc_schema != s->cdc_schema()) {
s = s->make_with_cdc(learned_cdc_schema);
}
if (s->registry_entry()) {
return s;
}
auto i = _entries.find(s->version());
if (i != _entries.end()) {
schema_registry_entry& e = *i->second;
if (e._state == schema_registry_entry::state::LOADING) {
e.load(s);
attach_table(e);
}
return e.get_schema();
}
slogger.debug("Learning about version {} of {}.{}", s->version(), s->ks_name(), s->cf_name());
auto e_ptr = make_lw_shared<schema_registry_entry>(s->version(), *this);
auto loaded_s = e_ptr->load(s);
attach_table(*e_ptr);
_entries.emplace(s->version(), e_ptr);
return loaded_s;
}
schema_registry_entry& schema_registry::get_entry(table_schema_version v) const {
auto i = _entries.find(v);
if (i == _entries.end()) {
throw schema_version_not_found(v);
}
schema_registry_entry& e = *i->second;
if (e._state != schema_registry_entry::state::LOADED) {
throw schema_version_not_found(v);
}
return e;
}
schema_registry_entry::erase_clock::duration schema_registry::grace_period() const {
return std::chrono::seconds(_ctxt->schema_registry_grace_period());
}
schema_ptr schema_registry::get(table_schema_version v) const {
return get_entry(v).get_schema();
}
frozen_schema schema_registry::get_frozen(table_schema_version v) const {
return get_entry(v).frozen();
}
future<schema_ptr> schema_registry::get_or_load(table_schema_version v, const async_schema_loader& loader) {
auto i = _entries.find(v);
if (i == _entries.end()) {
auto e_ptr = make_lw_shared<schema_registry_entry>(v, *this);
auto f = e_ptr->start_loading(loader);
_entries.emplace(v, e_ptr);
return f;
}
schema_registry_entry& e = *i->second;
if (e._state == schema_registry_entry::state::LOADING) {
return e._schema_promise.get_shared_future();
}
return make_ready_future<schema_ptr>(e.get_schema());
}
schema_ptr schema_registry::get_or_null(table_schema_version v) const {
auto i = _entries.find(v);
if (i == _entries.end()) {
return nullptr;
}
schema_registry_entry& e = *i->second;
if (e._state != schema_registry_entry::state::LOADED) {
return nullptr;
}
return e.get_schema();
}
schema_ptr schema_registry::get_or_load(table_schema_version v, const schema_loader& loader) {
auto i = _entries.find(v);
if (i == _entries.end()) {
auto e_ptr = make_lw_shared<schema_registry_entry>(v, *this);
auto s = e_ptr->load(loader(v));
attach_table(*e_ptr);
_entries.emplace(v, e_ptr);
return s;
}
schema_registry_entry& e = *i->second;
if (e._state == schema_registry_entry::state::LOADING) {
auto s = e.load(loader(v));
attach_table(e);
return s;
}
return e.get_schema();
}
void schema_registry::clear() {
_entries.clear();
}
schema_ptr schema_registry_entry::load(extended_frozen_schema fs) {
_extended_frozen_schema = std::move(fs);
auto s = get_schema();
if (_state == state::LOADING) {
_schema_promise.set_value(s);
_schema_promise = {};
}
_state = state::LOADED;
slogger.trace("Loaded {} = {}", _version, *s);
return s;
}
schema_ptr schema_registry_entry::load(schema_ptr s) {
_extended_frozen_schema = extended_frozen_schema(s);
_schema = &*s;
_schema->_registry_entry = this;
_erase_timer.cancel();
if (_state == state::LOADING) {
_schema_promise.set_value(s);
_schema_promise = {};
}
_state = state::LOADED;
slogger.trace("Loaded {} = {}", _version, *s);
return s;
}
future<schema_ptr> schema_registry_entry::start_loading(async_schema_loader loader) {
_loader = std::move(loader);
auto f = _loader(_version);
auto sf = _schema_promise.get_shared_future();
_state = state::LOADING;
slogger.trace("Loading {}", _version);
// Move to background.
(void)f.then_wrapped([self = shared_from_this(), this] (future<extended_frozen_schema>&& f) {
_loader = {};
if (_state != state::LOADING) {
slogger.trace("Loading of {} aborted", _version);
return;
}
try {
try {
load(f.get());
_registry.attach_table(*this);
} catch (...) {
std::throw_with_nested(schema_version_loading_failed(_version));
}
} catch (...) {
slogger.debug("Loading of {} failed: {}", _version, std::current_exception());
_schema_promise.set_exception(std::current_exception());
_registry._entries.erase(_version);
}
});
return sf;
}
schema_ptr schema_registry_entry::get_schema() {
if (!_schema) {
slogger.trace("Activating {}", _version);
schema_ptr s;
s = _extended_frozen_schema->unfreeze(*_registry._ctxt);
if (s->version() != _version) {
throw std::runtime_error(format("Unfrozen schema version doesn't match entry version ({}): {}", _version, *s));
}
_erase_timer.cancel();
s->_registry_entry = this;
_schema = &*s;
return s;
} else {
return _schema->shared_from_this();
}
}
void schema_registry_entry::detach_schema() noexcept {
slogger.trace("Deactivating {}", _version);
_schema = nullptr;
_erase_timer.arm(_registry.grace_period());
}
extended_frozen_schema schema_registry_entry::extended_frozen() const {
SCYLLA_ASSERT(_state >= state::LOADED);
return *_extended_frozen_schema;
}
frozen_schema schema_registry_entry::frozen() const {
SCYLLA_ASSERT(_state >= state::LOADED);
return _extended_frozen_schema->fs;
}
future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
switch (_sync_state) {
case schema_registry_entry::sync_state::SYNCED:
return make_ready_future<>();
case schema_registry_entry::sync_state::SYNCING:
return _synced_promise.get_shared_future();
case schema_registry_entry::sync_state::NOT_SYNCED: {
slogger.debug("Syncing {}", _version);
_synced_promise = {};
auto f = do_with(std::move(syncer), [] (auto& syncer) {
return syncer();
});
auto sf = _synced_promise.get_shared_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
// Move to background.
(void)f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
if (_sync_state != sync_state::SYNCING) {
f.ignore_ready_future();
return;
}
if (f.failed()) {
slogger.debug("Syncing of {} failed", _version);
_sync_state = schema_registry_entry::sync_state::NOT_SYNCED;
_synced_promise.set_exception(f.get_exception());
} else {
slogger.debug("Synced {}", _version);
_registry.attach_table(*this);
_sync_state = schema_registry_entry::sync_state::SYNCED;
_synced_promise.set_value();
}
});
return sf;
}
}
abort();
}
bool schema_registry_entry::is_synced() const {
return _sync_state == sync_state::SYNCED;
}
void schema_registry_entry::mark_synced() {
if (_sync_state == sync_state::SYNCED) {
return;
}
if (_sync_state == sync_state::SYNCING) {
_synced_promise.set_value();
}
_registry.attach_table(*this);
_sync_state = sync_state::SYNCED;
slogger.debug("Marked {} as synced", _version);
}
schema_registry& local_schema_registry() {
return registry;
}
global_schema_ptr::global_schema_ptr(const global_schema_ptr& o)
: global_schema_ptr(o.get())
{ }
global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept {
auto current = this_shard_id();
SCYLLA_ASSERT(o._cpu_of_origin == current);
_ptr = std::move(o._ptr);
_cpu_of_origin = current;
}
schema_ptr global_schema_ptr::get() const {
if (this_shard_id() == _cpu_of_origin) {
return _ptr;
} else {
auto registered_schema = [](const schema_registry_entry& e) -> schema_ptr {
schema_ptr ret = local_schema_registry().get_or_null(e.version());
if (!ret) {
ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) -> extended_frozen_schema {
return e.extended_frozen();
});
}
return ret;
};
// the following code contains registry entry dereference of a foreign shard
// however, it is guaranteed to succeed since we made sure in the constructor
// that _ptr will have a registry on the foreign shard where this
// object originated so as long as this object lives the registry entries lives too
// and it is safe to reference them on foreign shards.
schema_ptr s = registered_schema(*_ptr->registry_entry());
if (_ptr->registry_entry()->is_synced()) {
s->registry_entry()->mark_synced();
}
return s;
}
}
global_schema_ptr::global_schema_ptr(const schema_ptr& ptr)
: _cpu_of_origin(this_shard_id()) {
// _ptr must always have an associated registry entry,
// if ptr doesn't, we need to load it into the registry.
auto ensure_registry_entry = [] (const schema_ptr& s) {
schema_registry_entry* e = s->registry_entry();
if (e) {
return s;
} else {
return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) -> extended_frozen_schema {
return extended_frozen_schema(s);
});
}
};
_ptr = ensure_registry_entry(ptr);
}