mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 19:35:12 +00:00
In this series we introduce new system tables and use them for storing the raft metadata for strongly consistent tables. In contrast to the previously used raft group0 tables, the new tables can store data on any shard. The tables also allow specifying the shard where each partition should reside, which enables the tablets of strongly consistent tables to have their raft group metadata co-located on the same shard as the tablet replica. The new tables have almost the same schemas as the raft group0 tables. However, they have an additional column in their partition keys. The additional column is the shard that specifies where the data should be located. While a tablet and its corresponding raft group server resides on some shard, it now writes and reads all requests to the metadata tables using its shard in addition to the group_id. The extra partition key column is used by the new partitioner and sharder which allow this special shard routing. The partitioner encodes the shard in the token and the sharder decodes the shard from the token. This approach for routing avoids any additional lookups (for the tablet mapping) during operations on the new tables and it also doesn't require keeping any state. It also doesn't interact negatively with resharding - as long as tablets (and their corresponding raft metadata) occupy some shard, we do not allow starting the node with a shard count lower than the id of this shard. When increasing the shard count, the routing does not change, similarly to how tablet allocation doesn't change. To use the new tables, a new implementation of `raft::persistence` is added. Currently, it's almost an exact copy of the `raft_sys_table_storage` which just uses the new tables, but in the future we can modify it with changes specific to metadata (or mutation) storage for strongly consistent tables. The new storage is used in the `groups_manager`, which combined with the removal of some `this_shard_id() == 0` checks, allows strongly consistent tables to be used on all shards. This approach for making sure that the reads/writes to the new tables end up on the correct shards won in the balance of complexity/usability/performance against a few other approaches we've considered. They include: 1. Making the Raft server read/write directly to the database, skipping the sharder, on its shard, while using the default partitioner/sharder. This approach could let us avoid changing the schema and there should be no problems for reads and writes performed by the Raft server. However, in this approach we would input data in tables conflicting with the placement determined by the sharder. As a result, any read going through the sharder could miss the rows it was supposed to read. Even when reading all shards to find a specific value, there is a risk of polluting the cache - the rows loaded on incorrect shards may persist in the cache for an unknown amount of time. The cache may also mistakenly remember that a row is missing, even though it's actually present, just on an incorrect shard. Some of the issues with this approach could be worked around using another sharder which always returns this_shard_id() when asked about a shard. It's not clear how such a sharder would implement a method like `token_for_next_shard`, and how much simpler it would be compared to the current "identity" sharder. 2. Using a sharder depending on the current allocation of tablets on the node. This approach relies on the knowledge of group_id -> shard mapping at any point in time in the cluster. For this approach we'd also need to either add a custom partitioner which encodes the group_id in the token, or we'd need to track the token(group_id) -> shard mapping. This approach has the benefit over the one used in the series of keeping the partition key as just group_id. However, it requires more logic, and the access to the live state of the node in the sharder, and it's not static - the same token may be sharded differently depending on the state of the node - it shouldn't occur in practice, but if we changed the state of the node before adjusting the table data, we would be unable to access/fix the stale data without artificially also changing the state of the node. 3. Using metadata tables co-located to the strongly consistent tables. This approach could simplify the metadata migrations in the future, however it would require additional schema management of all co-located metadata tables, and it's not even obvious what could be used as the partition key in these tables - some metadata is per-raft-group, so we couldn't reuse the partition key of the strongly consistent table for it. And finding and remembering a partition key that is routed to a specific shard is not a simple task. Finally, splits and merges will most likely need special handling for metadata anyway, so we wouldn't even make use of co-located table's splits and merges. Fixes [SCYLLADB-361](https://scylladb.atlassian.net/browse/SCYLLADB-361) [SCYLLADB-361]: https://scylladb.atlassian.net/browse/SCYLLADB-361?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ Closes scylladb/scylladb#28509 * github.com:scylladb/scylladb: docs: add strong consistency doc test/cluster: add tests for strongly-consistent tables' metadata persistence raft: enable multi-shard raft groups for strongly consistent tablets test/raft: add unit tests for raft_groups_storage raft: add raft_groups_storage persistence class db: add system tables for strongly consistent tables' raft groups dht: add fixed_shard_partitioner and fixed_shard_sharder raft: add group_id -> shard mapping to raft_group_registry schema: add with_sharder overload accepting static_sharder reference
2412 lines
89 KiB
C++
2412 lines
89 KiB
C++
/*
|
|
* Copyright (C) 2014-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/on_internal_error.hh>
|
|
#include <map>
|
|
#include "bytes_ostream.hh"
|
|
#include "cql3/description.hh"
|
|
#include "db/tablet_options.hh"
|
|
#include "db/view/view.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#include "cql3/column_identifier.hh"
|
|
#include "cql3/util.hh"
|
|
#include "schema.hh"
|
|
#include "schema_builder.hh"
|
|
#include "db/marshal/type_parser.hh"
|
|
#include "schema_registry.hh"
|
|
#include <type_traits>
|
|
#include "view_info.hh"
|
|
#include "partition_slice_builder.hh"
|
|
#include "replica/database.hh"
|
|
#include "dht/token-sharding.hh"
|
|
#include "cdc/cdc_extension.hh"
|
|
#include "tombstone_gc_extension.hh"
|
|
#include "db/paxos_grace_seconds_extension.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "tombstone_gc_options.hh"
|
|
#include "db/per_partition_rate_limit_extension.hh"
|
|
#include "db/tags/utils.hh"
|
|
#include "db/tags/extension.hh"
|
|
#include "index/target_parser.hh"
|
|
#include "utils/hashing.hh"
|
|
#include "utils/hashers.hh"
|
|
#include "alternator/extract_from_attrs.hh"
|
|
#include "utils/managed_string.hh"
|
|
#include "alternator/ttl_tag.hh"
|
|
|
|
#include <boost/lexical_cast.hpp>
|
|
|
|
extern logging::logger dblog;
|
|
|
|
sstring
|
|
speculative_retry::to_sstring() const {
|
|
if (_t == type::NONE) {
|
|
return "NONE";
|
|
} else if (_t == type::ALWAYS) {
|
|
return "ALWAYS";
|
|
} else if (_t == type::CUSTOM) {
|
|
return format("{:.2f}ms", _v);
|
|
} else if (_t == type::PERCENTILE) {
|
|
return format("{:.1f}PERCENTILE", 100 * _v);
|
|
} else {
|
|
throw std::invalid_argument(format("unknown type: {:d}\n", uint8_t(_t)));
|
|
}
|
|
}
|
|
|
|
speculative_retry
|
|
speculative_retry::from_sstring(sstring str) {
|
|
std::transform(str.begin(), str.end(), str.begin(), ::toupper);
|
|
|
|
sstring ms("MS");
|
|
sstring percentile("PERCENTILE");
|
|
|
|
auto convert = [&str] (sstring& t) {
|
|
try {
|
|
return boost::lexical_cast<double>(str.substr(0, str.size() - t.size()));
|
|
} catch (boost::bad_lexical_cast& e) {
|
|
throw exceptions::configuration_exception(format("cannot convert {} to speculative_retry\n", str));
|
|
}
|
|
};
|
|
|
|
type t;
|
|
double v = 0;
|
|
if (str == "NONE") {
|
|
t = type::NONE;
|
|
} else if (str == "ALWAYS") {
|
|
t = type::ALWAYS;
|
|
} else if (str.compare(str.size() - ms.size(), ms.size(), ms) == 0) {
|
|
t = type::CUSTOM;
|
|
v = convert(ms);
|
|
} else if (str.compare(str.size() - percentile.size(), percentile.size(), percentile) == 0) {
|
|
t = type::PERCENTILE;
|
|
v = convert(percentile) / 100;
|
|
if (v < 0.0 || v > 1.0) {
|
|
throw exceptions::configuration_exception(
|
|
format("Invalid value {} for PERCENTILE option 'speculative_retry': must be between [0.0 and 100.0]", str));
|
|
}
|
|
} else {
|
|
throw exceptions::configuration_exception(format("cannot convert {} to speculative_retry\n", str));
|
|
}
|
|
return speculative_retry(t, v);
|
|
}
|
|
|
|
sstring to_sstring(column_kind k) {
|
|
switch (k) {
|
|
case column_kind::partition_key: return "PARTITION_KEY";
|
|
case column_kind::clustering_key: return "CLUSTERING_COLUMN";
|
|
case column_kind::static_column: return "STATIC";
|
|
case column_kind::regular_column: return "REGULAR";
|
|
}
|
|
throw std::invalid_argument("unknown column kind");
|
|
}
|
|
|
|
bool is_compatible(column_kind k1, column_kind k2) {
|
|
return k1 == k2;
|
|
}
|
|
|
|
column_mapping_entry::column_mapping_entry(bytes name, sstring type_name)
|
|
: column_mapping_entry(std::move(name), db::marshal::type_parser::parse(type_name))
|
|
{
|
|
}
|
|
|
|
column_mapping_entry::column_mapping_entry(const column_mapping_entry& o)
|
|
: column_mapping_entry(o._name, o._type->name())
|
|
{
|
|
}
|
|
|
|
column_mapping_entry& column_mapping_entry::operator=(const column_mapping_entry& o) {
|
|
auto copy = o;
|
|
return operator=(std::move(copy));
|
|
}
|
|
|
|
bool operator==(const column_mapping_entry& lhs, const column_mapping_entry& rhs) {
|
|
return lhs.name() == rhs.name() && lhs.type() == rhs.type();
|
|
}
|
|
|
|
bool operator==(const column_mapping& lhs, const column_mapping& rhs) {
|
|
const auto& lhs_columns = lhs.columns(), rhs_columns = rhs.columns();
|
|
if (lhs_columns.size() != rhs_columns.size()) {
|
|
return false;
|
|
}
|
|
for (size_t i = 0, end = lhs_columns.size(); i < end; ++i) {
|
|
const column_mapping_entry& lhs_entry = lhs_columns[i], rhs_entry = rhs_columns[i];
|
|
if (lhs_entry != rhs_entry) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
const column_mapping_entry& column_mapping::column_at(column_kind kind, column_id id) const {
|
|
SCYLLA_ASSERT(kind == column_kind::regular_column || kind == column_kind::static_column);
|
|
return kind == column_kind::regular_column ? regular_column_at(id) : static_column_at(id);
|
|
}
|
|
|
|
const column_mapping_entry& column_mapping::static_column_at(column_id id) const {
|
|
if (id >= _n_static) {
|
|
on_internal_error(dblog, format("static column id {:d} >= {:d}", id, _n_static));
|
|
}
|
|
return _columns[id];
|
|
}
|
|
|
|
const column_mapping_entry& column_mapping::regular_column_at(column_id id) const {
|
|
auto n_regular = _columns.size() - _n_static;
|
|
if (id >= n_regular) {
|
|
on_internal_error(dblog, format("regular column id {:d} >= {:d}", id, n_regular));
|
|
}
|
|
return _columns[id + _n_static];
|
|
}
|
|
|
|
template<typename Sequence>
|
|
std::vector<data_type>
|
|
get_column_types(const Sequence& column_definitions) {
|
|
std::vector<data_type> result;
|
|
for (auto&& col : column_definitions) {
|
|
result.push_back(col.type);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
auto fmt::formatter<column_mapping>::format(const column_mapping& cm, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
column_id n_static = cm.n_static();
|
|
column_id n_regular = cm.columns().size() - n_static;
|
|
|
|
auto pr_entry = [] (column_id i, const column_mapping_entry& e) {
|
|
// Without schema we don't know if name is UTF8. If we had schema we could use
|
|
// s->regular_column_name_type()->to_string(e.name()).
|
|
return fmt::format("{{id={}, name=0x{}, type={}}}", i, e.name(), e.type()->name());
|
|
};
|
|
return fmt::format_to(ctx.out(), "{{static=[{}], regular=[{}]}}",
|
|
fmt::join(std::views::iota(column_id(0), n_static) |
|
|
std::views::transform([&] (column_id i) { return pr_entry(i, cm.static_column_at(i)); }), ", "),
|
|
fmt::join(std::views::iota(column_id(0), n_regular) |
|
|
std::views::transform([&] (column_id i) { return pr_entry(i, cm.regular_column_at(i)); }), ", "));
|
|
}
|
|
|
|
thread_local std::map<sstring, std::unique_ptr<dht::i_partitioner>> partitioners;
|
|
thread_local std::map<std::pair<unsigned, unsigned>, std::unique_ptr<dht::static_sharder>> sharders;
|
|
sstring default_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner";
|
|
unsigned default_partitioner_ignore_msb = 12;
|
|
|
|
static const dht::i_partitioner& get_partitioner(const sstring& name) {
|
|
auto it = partitioners.find(name);
|
|
if (it == partitioners.end()) {
|
|
auto p = dht::make_partitioner(name);
|
|
it = partitioners.insert({name, std::move(p)}).first;
|
|
}
|
|
return *it->second;
|
|
}
|
|
|
|
void schema::set_default_partitioner(const sstring& class_name, unsigned ignore_msb) {
|
|
default_partitioner_name = class_name;
|
|
default_partitioner_ignore_msb = ignore_msb;
|
|
}
|
|
|
|
static const dht::static_sharder& get_sharder(unsigned shard_count, unsigned ignore_msb) {
|
|
auto it = sharders.find({shard_count, ignore_msb});
|
|
if (it == sharders.end()) {
|
|
auto sharder = std::make_unique<dht::static_sharder>(shard_count, ignore_msb);
|
|
it = sharders.emplace(std::make_pair(shard_count, ignore_msb), std::move(sharder)).first;
|
|
}
|
|
return *it->second;
|
|
}
|
|
|
|
const dht::i_partitioner& schema::get_partitioner() const {
|
|
return _raw._partitioner.get();
|
|
}
|
|
|
|
const dht::static_sharder* schema::try_get_static_sharder() const {
|
|
auto t = maybe_table();
|
|
if (t && !t->uses_static_sharding()) {
|
|
// Use table()->get_effective_replication_map()->get_sharder() instead.
|
|
return nullptr;
|
|
}
|
|
return &_raw._sharder.get();
|
|
}
|
|
|
|
const dht::static_sharder& schema::get_sharder() const {
|
|
auto* s = try_get_static_sharder();
|
|
if (!s) {
|
|
// Use table()->get_effective_replication_map()->get_sharder() instead.
|
|
on_internal_error(dblog, format("Attempted to obtain static sharder for table {}.{}", ks_name(), cf_name()));
|
|
}
|
|
return *s;
|
|
}
|
|
|
|
replica::table* schema::maybe_table() const {
|
|
if (_registry_entry) {
|
|
return _registry_entry->table();
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
replica::table& schema::table() const {
|
|
auto t = maybe_table();
|
|
if (!t) {
|
|
seastar::throw_with_backtrace<replica::no_such_column_family>(id());
|
|
}
|
|
return *t;
|
|
}
|
|
|
|
bool schema::has_custom_partitioner() const {
|
|
return _raw._partitioner.get().name() != default_partitioner_name;
|
|
}
|
|
|
|
lw_shared_ptr<cql3::column_specification>
|
|
schema::make_column_specification(const column_definition& def) const {
|
|
auto id = ::make_shared<cql3::column_identifier>(def.name(), column_name_type(def));
|
|
return make_lw_shared<cql3::column_specification>(_raw._ks_name, _raw._cf_name, std::move(id), def.type);
|
|
}
|
|
|
|
v3_columns::v3_columns(std::vector<column_definition> cols, bool is_dense, bool is_compound)
|
|
: _is_dense(is_dense)
|
|
, _is_compound(is_compound)
|
|
, _columns(std::move(cols))
|
|
{
|
|
for (column_definition& def : _columns) {
|
|
_columns_by_name[def.name()] = &def;
|
|
}
|
|
}
|
|
|
|
v3_columns v3_columns::from_v2_schema(const schema& s) {
|
|
data_type static_column_name_type = utf8_type;
|
|
std::vector<column_definition> cols;
|
|
|
|
if (s.is_static_compact_table()) {
|
|
if (s.has_static_columns()) {
|
|
throw std::runtime_error(
|
|
format("v2 static compact table should not have static columns: {}.{}", s.ks_name(), s.cf_name()));
|
|
}
|
|
if (s.clustering_key_size()) {
|
|
throw std::runtime_error(
|
|
format("v2 static compact table should not have clustering columns: {}.{}", s.ks_name(), s.cf_name()));
|
|
}
|
|
static_column_name_type = s.regular_column_name_type();
|
|
for (auto& c : s.all_columns()) {
|
|
// Note that for "static" no-clustering compact storage we use static for the defined columns
|
|
if (c.kind == column_kind::regular_column) {
|
|
auto new_def = c;
|
|
new_def.kind = column_kind::static_column;
|
|
cols.push_back(new_def);
|
|
} else {
|
|
cols.push_back(c);
|
|
}
|
|
}
|
|
schema_builder::default_names names(s._raw);
|
|
cols.emplace_back(to_bytes(names.clustering_name()), static_column_name_type, column_kind::clustering_key, 0);
|
|
cols.emplace_back(to_bytes(names.compact_value_name()), s.make_legacy_default_validator(), column_kind::regular_column, 0);
|
|
} else {
|
|
cols = s.all_columns();
|
|
}
|
|
|
|
for (column_definition& def : cols) {
|
|
data_type name_type = def.is_static() ? static_column_name_type : utf8_type;
|
|
auto id = ::make_shared<cql3::column_identifier>(def.name(), name_type);
|
|
def.column_specification = make_lw_shared<cql3::column_specification>(s.ks_name(), s.cf_name(), std::move(id), def.type);
|
|
}
|
|
|
|
return v3_columns(std::move(cols), s.is_dense(), s.is_compound());
|
|
}
|
|
|
|
void v3_columns::apply_to(schema_builder& builder) const {
|
|
if (is_static_compact()) {
|
|
for (auto& c : _columns) {
|
|
if (c.kind == column_kind::regular_column) {
|
|
builder.set_default_validation_class(c.type);
|
|
} else if (c.kind == column_kind::static_column) {
|
|
auto new_def = c;
|
|
new_def.kind = column_kind::regular_column;
|
|
builder.with_column_ordered(new_def);
|
|
} else if (c.kind == column_kind::clustering_key) {
|
|
builder.set_regular_column_name_type(c.type);
|
|
} else {
|
|
builder.with_column_ordered(c);
|
|
}
|
|
}
|
|
} else {
|
|
for (auto& c : _columns) {
|
|
if (is_compact() && c.kind == column_kind::regular_column) {
|
|
builder.set_default_validation_class(c.type);
|
|
}
|
|
builder.with_column_ordered(c);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool v3_columns::is_static_compact() const {
|
|
return !_is_dense && !_is_compound;
|
|
}
|
|
|
|
bool v3_columns::is_compact() const {
|
|
return _is_dense || !_is_compound;
|
|
}
|
|
|
|
const std::unordered_map<bytes, const column_definition*>& v3_columns::columns_by_name() const {
|
|
return _columns_by_name;
|
|
}
|
|
|
|
const std::vector<column_definition>& v3_columns::all_columns() const {
|
|
return _columns;
|
|
}
|
|
|
|
void schema::rebuild() {
|
|
_partition_key_type = make_lw_shared<compound_type<>>(get_column_types(partition_key_columns()));
|
|
_clustering_key_type = make_lw_shared<compound_prefix>(get_column_types(clustering_key_columns()));
|
|
_clustering_key_size = column_offset(column_kind::static_column) - column_offset(column_kind::clustering_key);
|
|
_regular_column_count = _raw._columns.size() - column_offset(column_kind::regular_column);
|
|
_static_column_count = column_offset(column_kind::regular_column) - column_offset(column_kind::static_column);
|
|
_columns_by_name.clear();
|
|
|
|
for (const column_definition& def : all_columns()) {
|
|
_columns_by_name[def.name()] = &def;
|
|
}
|
|
|
|
static_assert(row_column_ids_are_ordered_by_name::value, "row columns don't need to be ordered by name");
|
|
if (!std::is_sorted(regular_columns().begin(), regular_columns().end(), column_definition::name_comparator(regular_column_name_type()))) {
|
|
throw std::runtime_error("Regular columns should be sorted by name");
|
|
}
|
|
if (!std::is_sorted(static_columns().begin(), static_columns().end(), column_definition::name_comparator(static_column_name_type()))) {
|
|
throw std::runtime_error("Static columns should be sorted by name");
|
|
}
|
|
|
|
{
|
|
std::vector<column_mapping_entry> cm_columns;
|
|
for (const column_definition& def : std::views::join(std::array{static_columns(), regular_columns()})) {
|
|
cm_columns.emplace_back(column_mapping_entry{def.name(), def.type});
|
|
}
|
|
_column_mapping = column_mapping(std::move(cm_columns), static_columns_count());
|
|
}
|
|
|
|
if (is_counter()) {
|
|
for (auto&& cdef : std::views::join(std::array{static_columns(), regular_columns()})) {
|
|
if (!cdef.type->is_counter()) {
|
|
throw exceptions::configuration_exception(format("Cannot add a non counter column ({}) in a counter column family", cdef.name_as_text()));
|
|
}
|
|
}
|
|
} else {
|
|
for (auto&& cdef : all_columns()) {
|
|
if (cdef.type->is_counter()) {
|
|
throw exceptions::configuration_exception(format("Cannot add a counter column ({}) in a non counter column family", cdef.name_as_text()));
|
|
}
|
|
}
|
|
}
|
|
|
|
_v3_columns = v3_columns::from_v2_schema(*this);
|
|
_full_slice = make_shared<query::partition_slice>(partition_slice_builder(*this).build());
|
|
compute_all_columns_in_select_order();
|
|
}
|
|
|
|
const column_mapping& schema::get_column_mapping() const {
|
|
return _column_mapping;
|
|
}
|
|
|
|
schema::raw_schema::raw_schema(table_id id)
|
|
: _id(id)
|
|
, _partitioner(::get_partitioner(default_partitioner_name))
|
|
, _sharder(::get_sharder(smp::count, default_partitioner_ignore_msb))
|
|
{ }
|
|
|
|
schema::schema(private_tag, const raw_schema& raw, const schema_static_props& props, schema_ptr cdc_schema, std::optional<std::variant<schema_ptr, db::view::base_dependent_view_info>> base)
|
|
: _raw(raw)
|
|
, _static_props(props)
|
|
, _cdc_schema(cdc_schema)
|
|
, _offsets([this] {
|
|
if (_raw._columns.size() > std::numeric_limits<column_count_type>::max()) {
|
|
throw std::runtime_error(format("Column count limit ({:d}) overflowed: {:d}",
|
|
std::numeric_limits<column_count_type>::max(), _raw._columns.size()));
|
|
}
|
|
|
|
auto& cols = _raw._columns;
|
|
std::array<column_count_type, 4> count = { 0, 0, 0, 0 };
|
|
auto i = cols.begin();
|
|
auto e = cols.end();
|
|
for (auto k : { column_kind::partition_key, column_kind::clustering_key, column_kind::static_column, column_kind::regular_column }) {
|
|
auto j = std::stable_partition(i, e, [k](const auto& c) {
|
|
return c.kind == k;
|
|
});
|
|
count[column_count_type(k)] = std::distance(i, j);
|
|
i = j;
|
|
}
|
|
return std::array<column_count_type, 3> {
|
|
count[0],
|
|
count[0] + count[1],
|
|
count[0] + count[1] + count[2],
|
|
};
|
|
}())
|
|
{
|
|
std::sort(
|
|
_raw._columns.begin() + column_offset(column_kind::static_column),
|
|
_raw._columns.begin()
|
|
+ column_offset(column_kind::regular_column),
|
|
column_definition::name_comparator(static_column_name_type()));
|
|
std::sort(
|
|
_raw._columns.begin()
|
|
+ column_offset(column_kind::regular_column),
|
|
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
|
|
|
std::stable_sort(_raw._columns.begin(),
|
|
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
|
[] (auto x, auto y) { return x.id < y.id; });
|
|
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
|
_raw._columns.begin() + column_offset(column_kind::static_column),
|
|
[] (auto x, auto y) { return x.id < y.id; });
|
|
|
|
column_id id = 0;
|
|
for (auto& def : _raw._columns) {
|
|
def.column_specification = make_column_specification(def);
|
|
SCYLLA_ASSERT(!def.id || def.id == id - column_offset(def.kind));
|
|
def.ordinal_id = static_cast<ordinal_column_id>(id);
|
|
def.id = id - column_offset(def.kind);
|
|
|
|
auto dropped_at_it = _raw._dropped_columns.find(def.name_as_text());
|
|
if (dropped_at_it != _raw._dropped_columns.end()) {
|
|
def._dropped_at = std::max(def._dropped_at, dropped_at_it->second.timestamp);
|
|
}
|
|
|
|
{
|
|
// is_on_all_components
|
|
// TODO : In origin, this predicate is "componentIndex == null", which is true in
|
|
// a number of cases, some of which I've most likely missed...
|
|
switch (def.kind) {
|
|
case column_kind::partition_key:
|
|
// In origin, ci == null is true for a PK column where CFMetaData "keyValidator" is non-composite.
|
|
// Which is true of #pk == 1
|
|
def._thrift_bits.is_on_all_components = partition_key_size() == 1;
|
|
break;
|
|
case column_kind::regular_column:
|
|
if (_raw._is_dense) {
|
|
// regular values in dense tables are alone, so they have no index
|
|
def._thrift_bits.is_on_all_components = true;
|
|
break;
|
|
}
|
|
[[fallthrough]];
|
|
default:
|
|
// Or any other column where "comparator" is not compound
|
|
break;
|
|
}
|
|
}
|
|
|
|
++id;
|
|
}
|
|
|
|
rebuild();
|
|
if (_raw._view_info) {
|
|
if (!base) {
|
|
on_internal_error(dblog, format("Tried to create schema for view {}.{} without schema or base info of {}",
|
|
_raw._ks_name, _raw._cf_name, _raw._view_info->base_name()));
|
|
}
|
|
_view_info = std::visit(make_visitor(
|
|
[&] (const schema_ptr& base_schema) -> std::unique_ptr<::view_info> {
|
|
return std::make_unique<::view_info>(*this, *_raw._view_info, base_schema);
|
|
},
|
|
[&] (const db::view::base_dependent_view_info& base_info) -> std::unique_ptr<::view_info> {
|
|
return std::make_unique<::view_info>(*this, *_raw._view_info, base_info);
|
|
}
|
|
), *base);
|
|
} else if (base) {
|
|
on_internal_error(dblog, format("Tried to create schema for table/view {}.{} with base info but without view info",
|
|
_raw._ks_name, _raw._cf_name));
|
|
}
|
|
}
|
|
|
|
schema::schema(const schema& o, const std::function<void(schema&)>& transform)
|
|
: _raw(o._raw)
|
|
, _static_props(o._static_props)
|
|
, _cdc_schema(o._cdc_schema)
|
|
, _offsets(o._offsets)
|
|
{
|
|
// Do the transformation after all the raw fields are initialized, but
|
|
// *before* the derived fields are generated (from the raw ones).
|
|
if (transform) {
|
|
transform(*this);
|
|
}
|
|
|
|
rebuild();
|
|
if (o.is_view()) {
|
|
_view_info = std::make_unique<::view_info>(*this, o.view_info()->raw(), o.view_info()->base_info());
|
|
}
|
|
}
|
|
|
|
schema::schema(const schema& o)
|
|
: schema(o, {})
|
|
{
|
|
}
|
|
|
|
schema::schema(reversed_tag, const schema& o)
|
|
: schema(o, [] (schema& s) {
|
|
s._raw._version = reversed(s._raw._version);
|
|
for (auto& col : s._raw._columns) {
|
|
if (col.kind == column_kind::clustering_key) {
|
|
col.type = reversed(col.type);
|
|
}
|
|
}
|
|
})
|
|
{
|
|
}
|
|
|
|
schema::schema(with_cdc_schema_tag, const schema& o, schema_ptr cdc_schema)
|
|
: schema(o, [cdc_schema] (schema& s) {
|
|
s._cdc_schema = cdc_schema;
|
|
})
|
|
{
|
|
}
|
|
|
|
schema::~schema() {
|
|
if (_registry_entry) {
|
|
_registry_entry->detach_schema();
|
|
}
|
|
}
|
|
|
|
schema_registry_entry*
|
|
schema::registry_entry() const noexcept {
|
|
return _registry_entry;
|
|
}
|
|
|
|
bool
|
|
schema::has_multi_cell_collections() const {
|
|
return std::ranges::any_of(all_columns(), [] (const column_definition& cdef) {
|
|
return cdef.type->is_collection() && cdef.type->is_multi_cell();
|
|
});
|
|
}
|
|
|
|
bool operator==(const schema::user_properties& lhs, const schema::user_properties& rhs) {
|
|
return lhs.comment == rhs.comment
|
|
&& lhs.default_time_to_live == rhs.default_time_to_live
|
|
&& lhs.bloom_filter_fp_chance == rhs.bloom_filter_fp_chance
|
|
&& lhs.compressor_params == rhs.compressor_params
|
|
&& lhs.gc_grace_seconds == rhs.gc_grace_seconds
|
|
&& lhs.min_compaction_threshold == rhs.min_compaction_threshold
|
|
&& lhs.max_compaction_threshold == rhs.max_compaction_threshold
|
|
&& lhs.min_index_interval == rhs.min_index_interval
|
|
&& lhs.max_index_interval == rhs.max_index_interval
|
|
&& lhs.memtable_flush_period == rhs.memtable_flush_period
|
|
&& lhs.speculative_retry == rhs.speculative_retry
|
|
&& lhs.compaction_strategy == rhs.compaction_strategy
|
|
&& lhs.compaction_strategy_options == rhs.compaction_strategy_options
|
|
&& lhs.compaction_enabled == rhs.compaction_enabled
|
|
&& lhs.caching_options == rhs.caching_options
|
|
&& lhs.tablet_options == rhs.tablet_options
|
|
&& lhs.get_paxos_grace_seconds() == rhs.get_paxos_grace_seconds()
|
|
&& lhs.get_cdc_options() == rhs.get_cdc_options()
|
|
&& lhs.get_tombstone_gc_options() == rhs.get_tombstone_gc_options();
|
|
}
|
|
|
|
bool operator==(const schema& x, const schema& y)
|
|
{
|
|
return x._raw._id == y._raw._id
|
|
&& x._raw._ks_name == y._raw._ks_name
|
|
&& x._raw._cf_name == y._raw._cf_name
|
|
&& x._raw._columns == y._raw._columns
|
|
&& x._raw._regular_column_name_type == y._raw._regular_column_name_type
|
|
&& x._raw._is_dense == y._raw._is_dense
|
|
&& x._raw._is_compound == y._raw._is_compound
|
|
&& x._raw._type == y._raw._type
|
|
&& x._raw._dropped_columns == y._raw._dropped_columns
|
|
&& x._raw._collections == y._raw._collections
|
|
&& indirect_equal_to<std::unique_ptr<::view_info>>()(x._view_info, y._view_info)
|
|
&& x._raw._indices_by_name == y._raw._indices_by_name
|
|
&& x._raw._is_counter == y._raw._is_counter
|
|
&& x._raw._in_memory == y._raw._in_memory
|
|
&& x._raw._props == y._raw._props
|
|
;
|
|
}
|
|
|
|
|
|
template<>
|
|
struct appending_hash<index_metadata> {
|
|
template<typename H>
|
|
requires Hasher<H>
|
|
void operator()(H& h, const index_metadata& x) const noexcept {
|
|
feed_hash(h, x.id());
|
|
feed_hash(h, x.name());
|
|
feed_hash(h, x.kind());
|
|
feed_hash(h, x.options());
|
|
}
|
|
};
|
|
|
|
|
|
template<>
|
|
struct appending_hash<column_definition> {
|
|
template<typename H>
|
|
requires Hasher<H>
|
|
void operator()(H& h, const column_definition& x) const noexcept {
|
|
feed_hash(h, x.name());
|
|
feed_hash(h, x.type);
|
|
feed_hash(h, x.id);
|
|
feed_hash(h, x.kind);
|
|
feed_hash(h, x.dropped_at());
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct appending_hash<raw_view_info> {
|
|
template<typename H>
|
|
requires Hasher<H>
|
|
void operator()(H& h, const raw_view_info& x) const noexcept {
|
|
feed_hash(h, x.base_id());
|
|
feed_hash(h, x.base_name());
|
|
feed_hash(h, x.include_all_columns());
|
|
feed_hash(h, x.where_clause());
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct appending_hash<schema::dropped_column> {
|
|
template<typename H>
|
|
requires Hasher<H>
|
|
void operator()(H& h, const schema::dropped_column& x) const noexcept {
|
|
feed_hash(h, x.type);
|
|
feed_hash(h, x.timestamp);
|
|
}
|
|
};
|
|
|
|
table_schema_version schema::calculate_digest(const schema::raw_schema& r) {
|
|
md5_hasher h;
|
|
feed_hash(h, r._id);
|
|
feed_hash(h, r._ks_name);
|
|
feed_hash(h, r._cf_name);
|
|
feed_hash(h, r._columns);
|
|
feed_hash(h, r._props.comment);
|
|
feed_hash(h, r._props.default_time_to_live.count());
|
|
feed_hash(h, r._regular_column_name_type);
|
|
feed_hash(h, r._props.bloom_filter_fp_chance);
|
|
feed_hash(h, r._props.compressor_params.get_options());
|
|
feed_hash(h, r._is_dense);
|
|
feed_hash(h, r._is_compound);
|
|
feed_hash(h, r._props.gc_grace_seconds);
|
|
feed_hash(h, r._props._paxos_grace_seconds);
|
|
feed_hash(h, r._props.min_compaction_threshold);
|
|
feed_hash(h, r._props.max_compaction_threshold);
|
|
feed_hash(h, r._props.min_index_interval);
|
|
feed_hash(h, r._props.max_index_interval);
|
|
feed_hash(h, r._props.memtable_flush_period);
|
|
feed_hash(h, r._props.speculative_retry.to_sstring());
|
|
feed_hash(h, r._props.compaction_strategy);
|
|
feed_hash(h, r._props.compaction_strategy_options);
|
|
feed_hash(h, r._props.compaction_enabled);
|
|
feed_hash(h, r._props.caching_options.to_map());
|
|
feed_hash(h, r._dropped_columns);
|
|
feed_hash(h, r._collections);
|
|
feed_hash(h, r._view_info);
|
|
feed_hash(h, r._indices_by_name);
|
|
feed_hash(h, r._is_counter);
|
|
|
|
for (auto&& [name, ext] : r._props.extensions) {
|
|
feed_hash(h, name);
|
|
feed_hash(h, ext->options_to_string());
|
|
}
|
|
|
|
feed_hash(h, r._props.tablet_options);
|
|
|
|
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
|
|
}
|
|
|
|
index_metadata::index_metadata(const sstring& name,
|
|
const index_options_map& options,
|
|
index_metadata_kind kind,
|
|
is_local_index local)
|
|
: _id{utils::UUID_gen::get_name_UUID(name)}
|
|
, _name{name}
|
|
, _kind{kind}
|
|
, _options{options}
|
|
, _local{bool(local)}
|
|
{}
|
|
|
|
bool index_metadata::operator==(const index_metadata& other) const {
|
|
// Keep consistent with appending_hash<index_metadata>.
|
|
return _id == other._id
|
|
&& _name == other._name
|
|
&& _kind == other._kind
|
|
&& _options == other._options;
|
|
}
|
|
|
|
bool index_metadata::equals_noname(const index_metadata& other) const {
|
|
return _kind == other._kind && _options == other._options;
|
|
}
|
|
|
|
const table_id& index_metadata::id() const {
|
|
return _id;
|
|
}
|
|
|
|
const sstring& index_metadata::name() const {
|
|
return _name;
|
|
}
|
|
|
|
index_metadata_kind index_metadata::kind() const {
|
|
return _kind;
|
|
}
|
|
|
|
const index_options_map& index_metadata::options() const {
|
|
return _options;
|
|
}
|
|
|
|
bool index_metadata::local() const {
|
|
return _local;
|
|
}
|
|
|
|
sstring index_metadata::get_default_index_name(const sstring& cf_name,
|
|
std::optional<sstring> root) {
|
|
if (root) {
|
|
// As noted in issue #3403, because table names in CQL only use word
|
|
// characters [A-Za-z0-9_], the default index name should drop other
|
|
// characters from the column name ("root").
|
|
sstring name = root.value();
|
|
name.erase(std::remove_if(name.begin(), name.end(), [](char c) {
|
|
return !((c >= 'A' && c <= 'Z') ||
|
|
(c >= 'a' && c <= 'z') ||
|
|
(c >= '0' && c <= '9') ||
|
|
(c == '_')); }), name.end());
|
|
return cf_name + "_" + name + "_idx";
|
|
}
|
|
return cf_name + "_idx";
|
|
}
|
|
|
|
column_definition::column_definition(bytes name, data_type type, column_kind kind, column_id component_index, column_view_virtual is_view_virtual, column_computation_ptr computation, api::timestamp_type dropped_at)
|
|
: _name(std::move(name))
|
|
, _dropped_at(dropped_at)
|
|
, _is_atomic(type->is_atomic())
|
|
, _is_counter(type->is_counter())
|
|
, _is_view_virtual(is_view_virtual)
|
|
, _computation(std::move(computation))
|
|
, type(std::move(type))
|
|
, id(component_index)
|
|
, kind(kind)
|
|
{}
|
|
|
|
auto fmt::formatter<column_definition>::format(const column_definition& cd, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto out = ctx.out();
|
|
out = fmt::format_to(out, "ColumnDefinition{{name={}, type={}, kind={}",
|
|
cd.name_as_text(), cd.type->name(), to_sstring(cd.kind));
|
|
if (cd.is_view_virtual()) {
|
|
out = fmt::format_to(out, ", view_virtual");
|
|
}
|
|
if (cd.is_computed()) {
|
|
out = fmt::format_to(out, ", computed:{}", cd.get_computation().serialize());
|
|
}
|
|
out = fmt::format_to(out, ", componentIndex={}", cd.has_component_index() ? std::to_string(cd.component_index()) : "null");
|
|
out = fmt::format_to(out, ", droppedAt={}", cd._dropped_at);
|
|
return fmt::format_to(out, "}}");
|
|
}
|
|
|
|
const column_definition*
|
|
schema::get_column_definition(const bytes& name) const {
|
|
auto i = _columns_by_name.find(name);
|
|
if (i == _columns_by_name.end()) {
|
|
return nullptr;
|
|
}
|
|
return i->second;
|
|
}
|
|
|
|
const column_definition&
|
|
schema::column_at(column_kind kind, column_id id) const {
|
|
return column_at(static_cast<ordinal_column_id>(column_offset(kind) + id));
|
|
}
|
|
|
|
const column_definition&
|
|
schema::column_at(ordinal_column_id ordinal_id) const {
|
|
if (size_t(ordinal_id) >= _raw._columns.size()) [[unlikely]] {
|
|
on_internal_error(dblog, format("{}.{}@{}: column id {:d} >= {:d}",
|
|
ks_name(), cf_name(), version(), size_t(ordinal_id), _raw._columns.size()));
|
|
}
|
|
return _raw._columns.at(static_cast<column_count_type>(ordinal_id));
|
|
}
|
|
|
|
auto fmt::formatter<schema>::format(const schema& s, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto out = ctx.out();
|
|
out = fmt::format_to(out, "org.apache.cassandra.config.CFMetaData@{}[", fmt::ptr(&s));
|
|
out = fmt::format_to(out, "cfId={}", s._raw._id);
|
|
out = fmt::format_to(out, ",ksName=={}", s._raw._ks_name);
|
|
out = fmt::format_to(out, ",cfName={}", s._raw._cf_name);
|
|
out = fmt::format_to(out, ",cfType={}", cf_type_to_sstring(s._raw._type));
|
|
out = fmt::format_to(out, ",comparator={}", cell_comparator::to_sstring(s));
|
|
out = fmt::format_to(out, ",comment={}", s._raw._props.comment);
|
|
out = fmt::format_to(out, ",tombstoneGcOptions={}", s.tombstone_gc_options().to_sstring());
|
|
out = fmt::format_to(out, ",gcGraceSeconds={}", s._raw._props.gc_grace_seconds);
|
|
out = fmt::format_to(out, ",minCompactionThreshold={}", s._raw._props.min_compaction_threshold);
|
|
out = fmt::format_to(out, ",maxCompactionThreshold={}", s._raw._props.max_compaction_threshold);
|
|
out = fmt::format_to(out, ",columnMetadata=[");
|
|
int n = 0;
|
|
for (auto& cdef : s._raw._columns) {
|
|
if (n++ != 0) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out, "{}", cdef);
|
|
}
|
|
out = fmt::format_to(out, "]");
|
|
|
|
out = fmt::format_to(out, ",compactionStrategyClass=class org.apache.cassandra.db.compaction.{}",
|
|
compaction::compaction_strategy::name(s._raw._props.compaction_strategy));
|
|
|
|
out = fmt::format_to(out, ",compactionStrategyOptions={{");
|
|
n = 0;
|
|
for (auto& p : s._raw._props.compaction_strategy_options) {
|
|
out = fmt::format_to(out, "{}={}", p.first, p.second);
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out, "enabled={}", s._raw._props.compaction_enabled);
|
|
out = fmt::format_to(out, "}}");
|
|
|
|
out = fmt::format_to(out, ",compressionParameters={{");
|
|
n = 0;
|
|
for (auto& p : s._raw._props.compressor_params.get_options() ) {
|
|
if (n++ != 0) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out, "{}={}", p.first, p.second);
|
|
}
|
|
out = fmt::format_to(out, "}}");
|
|
|
|
out = fmt::format_to(out, ",bloomFilterFpChance={}", s._raw._props.bloom_filter_fp_chance);
|
|
out = fmt::format_to(out, ",memtableFlushPeriod={}", s._raw._props.memtable_flush_period);
|
|
out = fmt::format_to(out, ",caching={}", s._raw._props.caching_options.to_sstring());
|
|
out = fmt::format_to(out, ",cdc={}", s.cdc_options().to_sstring());
|
|
out = fmt::format_to(out, ",defaultTimeToLive={}", s._raw._props.default_time_to_live.count());
|
|
out = fmt::format_to(out, ",minIndexInterval={}", s._raw._props.min_index_interval);
|
|
out = fmt::format_to(out, ",maxIndexInterval={}", s._raw._props.max_index_interval);
|
|
out = fmt::format_to(out, ",speculativeRetry={}", s._raw._props.speculative_retry.to_sstring());
|
|
out = fmt::format_to(out, ",tablets={{");
|
|
if (s._raw._props.tablet_options) {
|
|
n = 0;
|
|
for (auto& [k, v] : *s._raw._props.tablet_options) {
|
|
if (n++) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out, "{}={}", k, v);
|
|
}
|
|
}
|
|
out = fmt::format_to(out, "}}");
|
|
out = fmt::format_to(out, ",triggers=[]");
|
|
out = fmt::format_to(out, ",isDense={}", s._raw._is_dense);
|
|
out = fmt::format_to(out, ",in_memory={}", s._raw._in_memory);
|
|
out = fmt::format_to(out, ",version={}", s.version());
|
|
|
|
out = fmt::format_to(out, ",droppedColumns={{");
|
|
n = 0;
|
|
for (auto& dc : s._raw._dropped_columns) {
|
|
if (n++ != 0) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out,"{} : {{ {}, {} }}",
|
|
dc.first, dc.second.type->name(), dc.second.timestamp);
|
|
}
|
|
out = fmt::format_to(out, "}}");
|
|
|
|
out = fmt::format_to(out, ",collections={{");
|
|
n = 0;
|
|
for (auto& c : s._raw._collections) {
|
|
if (n++ != 0) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out,"{} : {}", c.first, c.second->name());
|
|
}
|
|
out = fmt::format_to(out, "}}");
|
|
|
|
out = fmt::format_to(out, ",indices={{");
|
|
n = 0;
|
|
for (auto& c : s._raw._indices_by_name) {
|
|
if (n++ != 0) {
|
|
out = fmt::format_to(out, ", ");
|
|
}
|
|
out = fmt::format_to(out,"{} : {}", c.first, c.second.id());
|
|
}
|
|
out = fmt::format_to(out, "}}");
|
|
|
|
if (s.is_view()) {
|
|
out = fmt::format_to(out, ", viewInfo={}", *s.view_info());
|
|
}
|
|
return fmt::format_to(out, "]");
|
|
}
|
|
|
|
template <typename Stream>
|
|
static Stream& map_as_cql_param(Stream& os, const std::map<sstring, sstring>& map, bool first = true) {
|
|
for (auto i: map) {
|
|
if (first) {
|
|
first = false;
|
|
} else {
|
|
os << ", ";
|
|
}
|
|
os << "'" << i.first << "': '" << i.second << "'";
|
|
}
|
|
|
|
return os;
|
|
}
|
|
|
|
std::ostream& operator<<(std::ostream& os, const schema& s) {
|
|
fmt::print(os, "{}", s);
|
|
return os;
|
|
}
|
|
|
|
// default impl assumes options are in a map.
|
|
// implementations should override if not
|
|
std::string schema_extension::options_to_string() const {
|
|
std::ostringstream ss;
|
|
ss << '{';
|
|
map_as_cql_param(ss, ser::deserialize_from_buffer(serialize(), std::type_identity<default_map_type>(), 0));
|
|
ss << '}';
|
|
return ss.str();
|
|
}
|
|
|
|
static fragmented_ostringstream& column_definition_as_cql_key(fragmented_ostringstream& os, const column_definition & cd) {
|
|
os << cd.name_as_cql_string();
|
|
os << " " << cd.type->cql3_type_name();
|
|
|
|
if (cd.kind == column_kind::static_column) {
|
|
os << " STATIC";
|
|
}
|
|
return os;
|
|
}
|
|
|
|
static void describe_index_columns(fragmented_ostringstream& os, bool is_local, const schema& index_schema, schema_ptr base_schema) {
|
|
auto index_name = secondary_index::index_name_from_table_name(index_schema.cf_name());
|
|
if (!base_schema->all_indices().contains(index_name)) {
|
|
on_internal_error(dblog, format("Couldn't find index {} on table {}", index_name, base_schema->cf_name()));
|
|
}
|
|
|
|
int n = 0;
|
|
os << "(";
|
|
|
|
if (is_local) {
|
|
os << "(";
|
|
for (auto& pk: index_schema.partition_key_columns()) {
|
|
if (n++ != 0) {
|
|
os << ", ";
|
|
}
|
|
os << pk.name_as_cql_string();
|
|
}
|
|
os << "), ";
|
|
}
|
|
|
|
// Global and local indexes may only contain one column.
|
|
// Local indexes support multi column indexes via custom indexes, but at least
|
|
// for now, Scylla supports no custom indexes.
|
|
auto index_metadata = base_schema->all_indices().at(index_name);
|
|
auto target_str = secondary_index::target_parser::get_target_column_name_from_string(
|
|
index_metadata.options().at(cql3::statements::index_target::target_option_name)
|
|
);
|
|
auto base_column_name = cql3_parser::index_target::column_name_from_target_string(target_str);
|
|
auto base_column = base_schema->get_column_definition(to_bytes(base_column_name));
|
|
if (!base_column) {
|
|
on_internal_error(dblog, format("Couldn't find base column {} in table {} for index {}", base_column_name, base_schema->cf_name(), index_name));
|
|
}
|
|
auto bk_type = base_column->type;
|
|
|
|
if (bk_type->is_collection() && !bk_type->is_multi_cell()) {
|
|
// Indexes on frozen collection require full() function but the function is dropped while saving the target.
|
|
os << "full(" << cql3::util::maybe_quote(base_column_name) << ")";
|
|
} else if (bk_type->is_set() && target_str.starts_with("keys(")) {
|
|
// Indexes on set are saved with target keys() but only valid targets when creating the index are values() or just the column name.
|
|
// Since indexes on list are always printed with values() target (it's always added even if the index is created only with column name),
|
|
// always add values() target to index to set
|
|
os << "values(" << cql3::util::maybe_quote(base_column_name) << ")";
|
|
} else {
|
|
// Target string is already quoted if needed
|
|
os << target_str;
|
|
}
|
|
|
|
os << ")";
|
|
}
|
|
|
|
managed_string schema::get_create_statement(const schema_describe_helper& helper, bool with_internals) const {
|
|
fragmented_ostringstream os;
|
|
|
|
os << "CREATE ";
|
|
int n = 0;
|
|
|
|
if (is_view()) {
|
|
if (helper.type == schema_describe_helper::type::index) {
|
|
auto is_local = !helper.is_global_index;
|
|
auto custom_index_class = helper.custom_index_class;
|
|
|
|
if (custom_index_class) {
|
|
os << "CUSTOM ";
|
|
}
|
|
|
|
os << "INDEX " << cql3::util::maybe_quote(secondary_index::index_name_from_table_name(cf_name())) << " ON "
|
|
<< cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(view_info()->base_name());
|
|
|
|
describe_index_columns(os, is_local, *this, helper.base_schema);
|
|
|
|
if (custom_index_class) {
|
|
os << " USING '" << *custom_index_class << "'";
|
|
}
|
|
|
|
os << " WITH ";
|
|
if (with_internals) {
|
|
os << "ID = " << id().to_sstring() << "\n AND ";
|
|
}
|
|
if (is_compact_table()) {
|
|
os << "COMPACT STORAGE\n AND ";
|
|
}
|
|
schema_properties(helper, os);
|
|
os << ";\n";
|
|
|
|
return std::move(os).to_managed_string();
|
|
} else {
|
|
os << "MATERIALIZED VIEW " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name()) << " AS\n";
|
|
if (view_info()->include_all_columns()) {
|
|
os << " SELECT *";
|
|
}
|
|
else {
|
|
os << " SELECT ";
|
|
for (auto& cdef : all_columns()) {
|
|
if (cdef.is_hidden_from_cql()) {
|
|
continue;
|
|
}
|
|
if (n++ != 0) {
|
|
os << ", ";
|
|
}
|
|
os << cdef.name_as_cql_string();
|
|
}
|
|
}
|
|
os << "\n FROM " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(view_info()->base_name());
|
|
os << "\n WHERE " << view_info()->where_clause();
|
|
}
|
|
} else {
|
|
os << "TABLE " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name()) << " (";
|
|
// Find the name of the per-row TTL column, if there is one, so we
|
|
// can mark it with "TTL".
|
|
std::optional<std::string> ttl_column = db::find_tag(*this, TTL_TAG_KEY);
|
|
for (auto& cdef : all_columns()) {
|
|
if (with_internals && dropped_columns().contains(cdef.name_as_text())) {
|
|
// If the column has been re-added after a drop, we don't include it right away. Instead, we'll add the
|
|
// dropped one first below, then we'll issue the DROP and then the actual ADD for this column, thus
|
|
// simulating the proper sequence of events.
|
|
continue;
|
|
}
|
|
|
|
os << "\n ";
|
|
column_definition_as_cql_key(os, cdef);
|
|
if (ttl_column && *ttl_column == cdef.name_as_text()) {
|
|
os << " TTL";
|
|
}
|
|
os << ",";
|
|
}
|
|
|
|
if (with_internals) {
|
|
for (auto& cdef: dropped_columns()) {
|
|
os << "\n ";
|
|
os << cql3::util::maybe_quote(cdef.first) << " " << cdef.second.type->cql3_type_name() << ",";
|
|
}
|
|
}
|
|
}
|
|
|
|
os << "\n PRIMARY KEY (";
|
|
if (partition_key_columns().size() > 1) {
|
|
os << "(";
|
|
}
|
|
n = 0;
|
|
for (auto& pk : partition_key_columns()) {
|
|
if (n++ != 0) {
|
|
os << ", ";
|
|
}
|
|
os << pk.name_as_cql_string();
|
|
}
|
|
if (partition_key_columns().size() > 1) {
|
|
os << ")";
|
|
}
|
|
for (auto& pk : clustering_key_columns()) {
|
|
os << ", ";
|
|
os << pk.name_as_cql_string();
|
|
}
|
|
os << ")";
|
|
if (is_view()) {
|
|
os << "\n ";
|
|
} else {
|
|
os << "\n) ";
|
|
}
|
|
os << "WITH ";
|
|
if (with_internals) {
|
|
os << "ID = " << id().to_sstring() << "\nAND ";
|
|
}
|
|
if (!clustering_key_columns().empty()) {
|
|
// Adding clustering key order can be optional, but there's no harm in doing so.
|
|
os << "CLUSTERING ORDER BY (";
|
|
n = 0;
|
|
for (auto& pk : clustering_key_columns()) {
|
|
if (n++ != 0) {
|
|
os << ", ";
|
|
}
|
|
os << pk.name_as_cql_string();
|
|
if (pk.type->is_reversed()) {
|
|
os << " DESC";
|
|
} else {
|
|
os << " ASC";
|
|
}
|
|
}
|
|
os << ")\n AND ";
|
|
}
|
|
if (is_compact_table()) {
|
|
os << "COMPACT STORAGE\n AND ";
|
|
}
|
|
schema_properties(helper, os);
|
|
os << ";\n";
|
|
|
|
if (with_internals) {
|
|
for (auto& cdef : dropped_columns()) {
|
|
os << "\nALTER TABLE " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name())
|
|
<< " DROP " << cql3::util::maybe_quote(cdef.first) << " USING TIMESTAMP " << fmt::to_string(cdef.second.timestamp) << ";";
|
|
|
|
auto column = get_column_definition(to_bytes(cdef.first));
|
|
if (column) {
|
|
os << "\nALTER TABLE " << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name())
|
|
<< " ADD ";
|
|
column_definition_as_cql_key(os, *column);
|
|
os << ";";
|
|
}
|
|
}
|
|
}
|
|
|
|
return std::move(os).to_managed_string();
|
|
}
|
|
|
|
cql3::description schema::describe(const schema_describe_helper& helper, cql3::describe_option desc_opt) const {
|
|
const sstring type = std::invoke([&] {
|
|
switch (helper.type) {
|
|
case schema_describe_helper::type::view:
|
|
return "view";
|
|
case schema_describe_helper::type::index:
|
|
return "index";
|
|
case schema_describe_helper::type::table:
|
|
return "table";
|
|
}
|
|
});
|
|
|
|
return cql3::description {
|
|
.keyspace = ks_name(),
|
|
.type = std::move(type),
|
|
.name = cf_name(),
|
|
.create_statement = desc_opt == cql3::describe_option::NO_STMTS
|
|
? std::nullopt
|
|
: std::make_optional(get_create_statement(helper, desc_opt == cql3::describe_option::STMTS_AND_INTERNALS))
|
|
};
|
|
}
|
|
|
|
fragmented_ostringstream& schema::schema_properties(const schema_describe_helper& helper, fragmented_ostringstream& os) const {
|
|
os << "bloom_filter_fp_chance = " << fmt::to_string(bloom_filter_fp_chance());
|
|
os << "\n AND caching = {";
|
|
map_as_cql_param(os, caching_options().to_map());
|
|
os << "}";
|
|
os << "\n AND comment = " << cql3::util::single_quote(comment());
|
|
os << "\n AND compaction = {'class': '" << compaction::compaction_strategy::name(compaction_strategy()) << "'";
|
|
map_as_cql_param(os, compaction_strategy_options(), false) << "}";
|
|
os << "\n AND compression = {";
|
|
map_as_cql_param(os, get_compressor_params().get_options());
|
|
os << "}";
|
|
|
|
os << "\n AND crc_check_chance = " << fmt::to_string(crc_check_chance());
|
|
os << "\n AND default_time_to_live = " << fmt::to_string(default_time_to_live().count());
|
|
os << "\n AND gc_grace_seconds = " << fmt::to_string(gc_grace_seconds().count());
|
|
os << "\n AND max_index_interval = " << fmt::to_string(max_index_interval());
|
|
os << "\n AND memtable_flush_period_in_ms = " << fmt::to_string(memtable_flush_period());
|
|
os << "\n AND min_index_interval = " << fmt::to_string(min_index_interval());
|
|
os << "\n AND speculative_retry = '" << speculative_retry().to_sstring() << "'";
|
|
|
|
if (has_tablet_options()) {
|
|
os << "\n AND tablets = {";
|
|
map_as_cql_param(os, tablet_options().to_map());
|
|
os << "}";
|
|
}
|
|
|
|
for (auto& [type, ext] : extensions()) {
|
|
os << "\n AND " << type << " = " << ext->options_to_string();
|
|
}
|
|
if (helper.type == schema_describe_helper::type::view || helper.type == schema_describe_helper::type::index) {
|
|
auto is_sync_update = db::find_tag(*this, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY);
|
|
if (is_sync_update.has_value()) {
|
|
os << "\n AND synchronous_updates = " << *is_sync_update;
|
|
}
|
|
}
|
|
return os;
|
|
}
|
|
|
|
fragmented_ostringstream& schema::describe_alter_with_properties(const schema_describe_helper& helper, fragmented_ostringstream& os) const {
|
|
os << "ALTER ";
|
|
switch (helper.type) {
|
|
case schema_describe_helper::type::view:
|
|
os << "MATERIALIZED VIEW ";
|
|
break;
|
|
case schema_describe_helper::type::index:
|
|
on_internal_error(dblog, "ALTER statement is not supported for index");
|
|
break;
|
|
case schema_describe_helper::type::table:
|
|
os << "TABLE ";
|
|
break;
|
|
}
|
|
os << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name()) << " WITH ";
|
|
schema_properties(helper, os);
|
|
os << ";\n";
|
|
|
|
return os;
|
|
}
|
|
|
|
const sstring&
|
|
column_definition::name_as_text() const {
|
|
return column_specification->name->text();
|
|
}
|
|
|
|
const bytes&
|
|
column_definition::name() const {
|
|
return _name;
|
|
}
|
|
|
|
sstring column_definition::name_as_cql_string() const {
|
|
return cql3::util::maybe_quote(name_as_text());
|
|
}
|
|
|
|
bool column_definition::is_on_all_components() const {
|
|
return _thrift_bits.is_on_all_components;
|
|
}
|
|
|
|
bool operator==(const column_definition& x, const column_definition& y)
|
|
{
|
|
return x._name == y._name
|
|
&& x.type == y.type
|
|
&& x.id == y.id
|
|
&& x.kind == y.kind
|
|
&& x._dropped_at == y._dropped_at;
|
|
}
|
|
|
|
// Based on org.apache.cassandra.config.CFMetaData#generateLegacyCfId
|
|
table_id
|
|
generate_legacy_id(const sstring& ks_name, const sstring& cf_name) {
|
|
return table_id(utils::UUID_gen::get_name_UUID(ks_name + cf_name));
|
|
}
|
|
|
|
schema_builder& schema_builder::set_compaction_strategy_options(std::map<sstring, sstring>&& options) {
|
|
_raw._props.compaction_strategy_options = std::move(options);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_partitioner(sstring name) {
|
|
_raw._partitioner = get_partitioner(name);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_sharder(unsigned shard_count, unsigned sharding_ignore_msb_bits) {
|
|
_raw._sharder = get_sharder(shard_count, sharding_ignore_msb_bits);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_sharder(const dht::static_sharder& sharder) {
|
|
_raw._sharder = sharder;
|
|
return *this;
|
|
}
|
|
|
|
|
|
schema_builder::schema_builder(std::string_view ks_name, std::string_view cf_name,
|
|
std::optional<table_id> id, data_type rct)
|
|
: _raw(id ? *id : table_id(utils::UUID_gen::get_time_UUID()))
|
|
{
|
|
// Various schema-creation commands (creating tables, indexes, etc.)
|
|
// usually place limits on which characters are allowed in keyspace or
|
|
// table names. But in case we have a hole in those defences (see issue
|
|
// #3403, for example), let's prevent at least the characters "/" and
|
|
// null from being in the keyspace or table name, because those will
|
|
// surely cause serious problems when materialized to directory names.
|
|
// We throw a logic_error because we expect earlier defences to have
|
|
// avoided this case in the first place.
|
|
if (ks_name.find_first_of('/') != std::string_view::npos ||
|
|
ks_name.find_first_of('\0') != std::string_view::npos) {
|
|
throw std::logic_error(fmt::format("Tried to create a schema with illegal characters in keyspace name: {}", ks_name));
|
|
}
|
|
if (cf_name.find_first_of('/') != std::string_view::npos ||
|
|
cf_name.find_first_of('\0') != std::string_view::npos) {
|
|
throw std::logic_error(fmt::format("Tried to create a schema with illegal characters in table name: {}", cf_name));
|
|
}
|
|
_raw._ks_name = sstring(ks_name);
|
|
_raw._cf_name = sstring(cf_name);
|
|
_raw._regular_column_name_type = rct;
|
|
|
|
for (const auto& c : schema_initializers()) {
|
|
c(*this);
|
|
}
|
|
}
|
|
|
|
schema_builder::schema_builder(const schema_ptr s)
|
|
: schema_builder(s->_raw)
|
|
{
|
|
if (s->is_view()) {
|
|
_base_info = s->view_info()->base_info();
|
|
_view_info = s->view_info()->raw();
|
|
}
|
|
|
|
if (s->cdc_schema()) {
|
|
_cdc_schema = s->cdc_schema();
|
|
}
|
|
_static_props = s->static_props();
|
|
}
|
|
|
|
schema_builder::schema_builder(const schema::raw_schema& raw)
|
|
: _raw(raw)
|
|
{
|
|
static_assert(schema::row_column_ids_are_ordered_by_name::value, "row columns don't need to be ordered by name");
|
|
// Schema builder may add or remove columns and their ids need to be
|
|
// recomputed in build().
|
|
for (auto& def : _raw._columns | std::views::filter([] (auto& def) { return !def.is_primary_key(); })) {
|
|
def.id = 0;
|
|
def.ordinal_id = static_cast<ordinal_column_id>(0);
|
|
}
|
|
}
|
|
|
|
schema_builder::schema_builder(
|
|
std::optional<table_id> id,
|
|
std::string_view ks_name,
|
|
std::string_view cf_name,
|
|
std::vector<schema::column> partition_key,
|
|
std::vector<schema::column> clustering_key,
|
|
std::vector<schema::column> regular_columns,
|
|
std::vector<schema::column> static_columns,
|
|
data_type regular_column_name_type,
|
|
sstring comment)
|
|
: schema_builder(ks_name, cf_name, std::move(id), std::move(regular_column_name_type)) {
|
|
for (auto&& column : partition_key) {
|
|
with_column(std::move(column.name), std::move(column.type), column_kind::partition_key);
|
|
}
|
|
for (auto&& column : clustering_key) {
|
|
with_column(std::move(column.name), std::move(column.type), column_kind::clustering_key);
|
|
}
|
|
for (auto&& column : regular_columns) {
|
|
with_column(std::move(column.name), std::move(column.type));
|
|
}
|
|
for (auto&& column : static_columns) {
|
|
with_column(std::move(column.name), std::move(column.type), column_kind::static_column);
|
|
}
|
|
set_comment(comment);
|
|
}
|
|
|
|
column_definition& schema_builder::find_column(const cql3::column_identifier& c) {
|
|
auto i = std::find_if(_raw._columns.begin(), _raw._columns.end(), [c](auto& p) {
|
|
return p.name() == c.name();
|
|
});
|
|
if (i != _raw._columns.end()) {
|
|
return *i;
|
|
}
|
|
throw std::invalid_argument(format("No such column {}", c.name()));
|
|
}
|
|
|
|
bool schema_builder::has_column(const cql3::column_identifier& c) {
|
|
auto i = std::find_if(_raw._columns.begin(), _raw._columns.end(), [c](auto& p) {
|
|
return p.name() == c.name();
|
|
});
|
|
return i != _raw._columns.end();
|
|
}
|
|
|
|
schema_builder& schema_builder::with_column_ordered(const column_definition& c) {
|
|
return with_column(bytes(c.name()), data_type(c.type), column_kind(c.kind), c.position(), c.view_virtual(), c.get_computation_ptr());
|
|
}
|
|
|
|
schema_builder& schema_builder::with_column(bytes name, data_type type, column_kind kind, column_view_virtual is_view_virtual) {
|
|
// component_index will be determined by schema constructor
|
|
return with_column(name, type, kind, 0, is_view_virtual);
|
|
}
|
|
|
|
schema_builder& schema_builder::with_column(bytes name, data_type type, column_kind kind, column_id component_index, column_view_virtual is_view_virtual, column_computation_ptr computation) {
|
|
_raw._columns.emplace_back(name, type, kind, component_index, is_view_virtual, std::move(computation));
|
|
if (type->is_multi_cell()) {
|
|
with_collection(name, type);
|
|
} else if (type->is_counter()) {
|
|
set_is_counter(true);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_computed_column(bytes name, data_type type, column_kind kind, column_computation_ptr computation) {
|
|
return with_column(name, type, kind, 0, column_view_virtual::no, std::move(computation));
|
|
}
|
|
|
|
schema_builder& schema_builder::remove_column(bytes name, std::optional<api::timestamp_type> timestamp)
|
|
{
|
|
auto it = std::ranges::find_if(_raw._columns, [&] (auto& column) {
|
|
return column.name() == name;
|
|
});
|
|
if(it == _raw._columns.end()) {
|
|
throw std::out_of_range(format("Cannot remove: column {} not found.", name));
|
|
}
|
|
auto name_as_text = it->column_specification ? it->name_as_text() : schema::column_name_type(*it, _raw._regular_column_name_type)->get_string(it->name());
|
|
without_column(name_as_text, it->type, timestamp ? *timestamp : api::new_timestamp());
|
|
_raw._columns.erase(it);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::without_column(sstring name, api::timestamp_type timestamp) {
|
|
return without_column(std::move(name), bytes_type, timestamp);
|
|
}
|
|
|
|
schema_builder& schema_builder::without_column(sstring name, data_type type, api::timestamp_type timestamp)
|
|
{
|
|
auto ret = _raw._dropped_columns.emplace(name, schema::dropped_column{type, timestamp});
|
|
if (!ret.second && ret.first->second.timestamp < timestamp) {
|
|
ret.first->second.type = type;
|
|
ret.first->second.timestamp = timestamp;
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::rename_column(bytes from, bytes to)
|
|
{
|
|
auto it = std::find_if(_raw._columns.begin(), _raw._columns.end(), [&] (auto& col) {
|
|
return col.name() == from;
|
|
});
|
|
SCYLLA_ASSERT(it != _raw._columns.end());
|
|
auto& def = *it;
|
|
column_definition new_def(to, def.type, def.kind, def.component_index());
|
|
_raw._columns.erase(it);
|
|
return with_column_ordered(new_def);
|
|
}
|
|
|
|
schema_builder& schema_builder::alter_column_type(bytes name, data_type new_type)
|
|
{
|
|
auto it = std::ranges::find_if(_raw._columns, [&name] (auto& c) { return c.name() == name; });
|
|
SCYLLA_ASSERT(it != _raw._columns.end());
|
|
it->type = new_type;
|
|
|
|
if (new_type->is_multi_cell()) {
|
|
auto c_it = _raw._collections.find(name);
|
|
SCYLLA_ASSERT(c_it != _raw._collections.end());
|
|
c_it->second = new_type;
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::mark_column_computed(bytes name, column_computation_ptr computation) {
|
|
auto it = std::ranges::find_if(_raw._columns, [&name] (const column_definition& c) { return c.name() == name; });
|
|
SCYLLA_ASSERT(it != _raw._columns.end());
|
|
it->set_computed(std::move(computation));
|
|
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_collection(bytes name, data_type type)
|
|
{
|
|
_raw._collections.emplace(name, type);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with(compact_storage cs) {
|
|
_compact_storage = cs;
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_version(table_schema_version v) {
|
|
_version = v;
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_hash_version() {
|
|
_version = from_hash();
|
|
return *this;
|
|
}
|
|
|
|
static const sstring default_partition_key_name = "key";
|
|
static const sstring default_clustering_name = "column";
|
|
static const sstring default_compact_value_name = "value";
|
|
|
|
schema_builder::default_names::default_names(const schema_builder& builder)
|
|
: default_names(builder._raw)
|
|
{}
|
|
|
|
schema_builder::default_names::default_names(const schema::raw_schema& raw)
|
|
: _raw(raw)
|
|
, _partition_index(0)
|
|
, _clustering_index(1)
|
|
, _compact_index(0)
|
|
{}
|
|
|
|
sstring schema_builder::default_names::unique_name(const sstring& base, size_t& idx, size_t off) const {
|
|
for (;;) {
|
|
auto candidate = idx == 0 ? base : base + std::to_string(idx + off);
|
|
++idx;
|
|
auto i = std::find_if(_raw._columns.begin(), _raw._columns.end(), [b = to_bytes(candidate)](const column_definition& c) {
|
|
return c.name() == b;
|
|
});
|
|
if (i == _raw._columns.end()) {
|
|
return candidate;
|
|
}
|
|
}
|
|
}
|
|
|
|
sstring schema_builder::default_names::partition_key_name() {
|
|
// For compatibility sake, we call the first alias 'key' rather than 'key1'. This
|
|
// is inconsistent with column alias, but it's probably not worth risking breaking compatibility now.
|
|
return unique_name(default_partition_key_name, _partition_index, 1);
|
|
}
|
|
|
|
sstring schema_builder::default_names::clustering_name() {
|
|
return unique_name(default_clustering_name, _clustering_index, 0);
|
|
}
|
|
|
|
sstring schema_builder::default_names::compact_value_name() {
|
|
return unique_name(default_compact_value_name, _compact_index, 0);
|
|
}
|
|
|
|
void schema_builder::prepare_dense_schema(schema::raw_schema& raw) {
|
|
auto is_dense = raw._is_dense;
|
|
auto is_compound = raw._is_compound;
|
|
auto is_compact_table = is_dense || !is_compound;
|
|
|
|
if (is_compact_table) {
|
|
auto count_kind = [&raw](column_kind kind) {
|
|
return std::count_if(raw._columns.begin(), raw._columns.end(), [kind](const column_definition& c) {
|
|
return c.kind == kind;
|
|
});
|
|
};
|
|
|
|
default_names names(raw);
|
|
|
|
if (is_dense) {
|
|
auto regular_cols = count_kind(column_kind::regular_column);
|
|
// In Origin, dense CFs always have at least one regular column
|
|
if (regular_cols == 0) {
|
|
raw._columns.emplace_back(to_bytes(names.compact_value_name()),
|
|
empty_type,
|
|
column_kind::regular_column, 0);
|
|
} else if (regular_cols > 1) {
|
|
throw exceptions::configuration_exception(
|
|
format("Expecting exactly one regular column. Found {:d}",
|
|
regular_cols));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
schema_builder& schema_builder::with_view_info(schema_ptr base, bool include_all_columns, sstring where_clause) {
|
|
_base_schema = std::move(base);
|
|
_raw._view_info = raw_view_info(_base_schema.value()->id(), _base_schema.value()->cf_name(), include_all_columns, std::move(where_clause));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause, db::view::base_dependent_view_info base) {
|
|
_base_info = std::move(base);
|
|
_raw._view_info = raw_view_info(std::move(base_id), std::move(base_name), include_all_columns, std::move(where_clause));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_cdc_schema(schema_ptr cdc_schema) {
|
|
if (cdc_schema) {
|
|
_cdc_schema = std::move(cdc_schema);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_index(const index_metadata& im) {
|
|
_raw._indices_by_name.emplace(im.name(), im);
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::without_index(const sstring& name) {
|
|
if (_raw._indices_by_name.contains(name)) {
|
|
_raw._indices_by_name.erase(name);
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::without_indexes() {
|
|
_raw._indices_by_name.clear();
|
|
return *this;
|
|
}
|
|
|
|
schema_ptr schema_builder::build() && {
|
|
return build(_raw);
|
|
}
|
|
|
|
schema_ptr schema_builder::build() & {
|
|
schema::raw_schema new_raw = _raw; // Copy so that build() remains idempotent.
|
|
return build(new_raw);
|
|
}
|
|
|
|
schema_ptr schema_builder::build(schema::raw_schema& new_raw) {
|
|
if (_static_props.use_schema_commitlog) {
|
|
if (!_static_props.use_null_sharder) {
|
|
on_internal_error(dblog,
|
|
format("{}.{} uses schema commitlog, but not null sharder, "
|
|
"schema commitlog works only on shard 0", ks_name(), cf_name()));
|
|
}
|
|
if (_static_props.wait_for_sync_to_commitlog) {
|
|
on_internal_error(dblog,
|
|
format("{}.{} uses schema commitlog, wait_for_sync_to_commitlog is redundant",
|
|
ks_name(), cf_name()));
|
|
}
|
|
}
|
|
|
|
if (new_raw._is_counter) {
|
|
new_raw._default_validation_class = counter_type;
|
|
}
|
|
|
|
if (_compact_storage) {
|
|
// Dense means that no part of the comparator stores a CQL column name. This means
|
|
// COMPACT STORAGE with at least one columnAliases (otherwise it's a thrift "static" CF).
|
|
auto clustering_key_size = std::count_if(new_raw._columns.begin(), new_raw._columns.end(), [](auto&& col) {
|
|
return col.kind == column_kind::clustering_key;
|
|
});
|
|
new_raw._is_dense = (*_compact_storage == compact_storage::yes) && (clustering_key_size > 0);
|
|
|
|
if (clustering_key_size == 0) {
|
|
if (*_compact_storage == compact_storage::yes) {
|
|
new_raw._is_compound = false;
|
|
} else {
|
|
new_raw._is_compound = true;
|
|
}
|
|
} else {
|
|
if ((*_compact_storage == compact_storage::yes) && clustering_key_size == 1) {
|
|
new_raw._is_compound = false;
|
|
} else {
|
|
new_raw._is_compound = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
prepare_dense_schema(new_raw);
|
|
|
|
// cache `paxos_grace_seconds` value for fast access through the schema object, which is immutable
|
|
if (auto it = new_raw._props.extensions.find(db::paxos_grace_seconds_extension::NAME); it != new_raw._props.extensions.end()) {
|
|
new_raw._props._paxos_grace_seconds =
|
|
dynamic_pointer_cast<db::paxos_grace_seconds_extension>(it->second)->get_paxos_grace_seconds();
|
|
}
|
|
|
|
// cache the `per_partition_rate_limit` parameters for fast access through the schema object.
|
|
if (auto it = new_raw._props.extensions.find(db::per_partition_rate_limit_extension::NAME); it != new_raw._props.extensions.end()) {
|
|
new_raw._per_partition_rate_limit_options =
|
|
dynamic_pointer_cast<db::per_partition_rate_limit_extension>(it->second)->get_options();
|
|
}
|
|
|
|
if (_static_props.use_null_sharder) {
|
|
new_raw._sharder = get_sharder(1, 0);
|
|
}
|
|
|
|
std::visit(make_visitor(
|
|
[&] (from_time) {
|
|
new_raw._version = table_schema_version(utils::UUID_gen::get_time_UUID());
|
|
},
|
|
[&] (from_hash) {
|
|
new_raw._version = schema::calculate_digest(new_raw);
|
|
},
|
|
[&] (table_schema_version v) {
|
|
new_raw._version = v;
|
|
}
|
|
), _version);
|
|
|
|
if (_base_info) {
|
|
return make_lw_shared<schema>(schema::private_tag{}, new_raw, _static_props, nullptr, _base_info);
|
|
} else if (_base_schema) {
|
|
return make_lw_shared<schema>(schema::private_tag{}, new_raw, _static_props, nullptr, _base_schema);
|
|
}
|
|
return make_lw_shared<schema>(schema::private_tag{}, new_raw, _static_props, _cdc_schema ? *_cdc_schema : nullptr);
|
|
}
|
|
|
|
std::vector<schema_builder::schema_initializer>&
|
|
schema_builder::schema_initializers() {
|
|
static std::vector<schema_initializer> result{};
|
|
return result;
|
|
}
|
|
|
|
int schema_builder::register_schema_initializer(schema_initializer&& initializer) {
|
|
schema_initializers().push_back(std::move(initializer));
|
|
return 0;
|
|
}
|
|
|
|
schema_builder::schema_initializers_checkpoint
|
|
schema_builder::capture_schema_initializers_checkpoint() {
|
|
return schema_initializers_checkpoint{schema_initializers().size()};
|
|
}
|
|
|
|
void schema_builder::restore_schema_initializers_checkpoint(schema_initializers_checkpoint checkpoint) {
|
|
auto& initializers = schema_initializers();
|
|
if (checkpoint.size > initializers.size()) {
|
|
throw std::logic_error("Invalid schema initializer checkpoint");
|
|
}
|
|
initializers.resize(checkpoint.size);
|
|
}
|
|
|
|
void schema_builder::set_properties(schema::user_properties props) {
|
|
_raw._props = std::move(props);
|
|
}
|
|
|
|
const cdc::options& schema::user_properties::get_cdc_options() const {
|
|
static const cdc::options default_cdc_options;
|
|
const auto& schema_extensions = extensions;
|
|
|
|
if (auto it = schema_extensions.find(cdc::cdc_extension::NAME); it != schema_extensions.end()) {
|
|
return dynamic_pointer_cast<cdc::cdc_extension>(it->second)->get_options();
|
|
}
|
|
return default_cdc_options;
|
|
}
|
|
|
|
const ::tombstone_gc_options& schema::user_properties::get_tombstone_gc_options() const {
|
|
static const ::tombstone_gc_options default_tombstone_gc_options;
|
|
const auto& schema_extensions = extensions;
|
|
|
|
if (auto it = schema_extensions.find(tombstone_gc_extension::NAME); it != schema_extensions.end()) {
|
|
return dynamic_pointer_cast<tombstone_gc_extension>(it->second)->get_options();
|
|
}
|
|
return default_tombstone_gc_options;
|
|
}
|
|
|
|
bool schema::has_tablet_options() const noexcept {
|
|
return _raw._props.tablet_options.has_value();
|
|
}
|
|
|
|
db::tablet_options schema::tablet_options() const {
|
|
return db::tablet_options(raw_tablet_options());
|
|
}
|
|
|
|
const db::tablet_options::map_type& schema::raw_tablet_options() const noexcept {
|
|
if (!_raw._props.tablet_options) {
|
|
static db::tablet_options::map_type no_options;
|
|
return no_options;
|
|
}
|
|
return *_raw._props.tablet_options;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_cdc_options(const cdc::options& opts) {
|
|
add_extension(cdc::cdc_extension::NAME, ::make_shared<cdc::cdc_extension>(opts));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_tombstone_gc_options(const tombstone_gc_options& opts) {
|
|
add_extension(tombstone_gc_extension::NAME, ::make_shared<tombstone_gc_extension>(opts));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::with_per_partition_rate_limit_options(const db::per_partition_rate_limit_options& opts) {
|
|
add_extension(db::per_partition_rate_limit_extension::NAME, ::make_shared<db::per_partition_rate_limit_extension>(opts));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::set_paxos_grace_seconds(int32_t seconds) {
|
|
add_extension(db::paxos_grace_seconds_extension::NAME, ::make_shared<db::paxos_grace_seconds_extension>(seconds));
|
|
return *this;
|
|
}
|
|
|
|
schema_builder& schema_builder::set_tablet_options(std::map<sstring, sstring>&& hints) {
|
|
_raw._props.tablet_options = std::move(hints);
|
|
return *this;
|
|
}
|
|
|
|
gc_clock::duration schema::user_properties::get_paxos_grace_seconds() const {
|
|
return std::chrono::duration_cast<gc_clock::duration>(
|
|
std::chrono::seconds(
|
|
_paxos_grace_seconds ? *_paxos_grace_seconds : DEFAULT_GC_GRACE_SECONDS
|
|
)
|
|
);
|
|
}
|
|
|
|
schema_ptr schema_builder::build(compact_storage cp) {
|
|
return with(cp).build();
|
|
}
|
|
|
|
// Useful functions to manipulate the schema's comparator field
|
|
namespace cell_comparator {
|
|
|
|
static constexpr auto _composite_str = "org.apache.cassandra.db.marshal.CompositeType";
|
|
static constexpr auto _collection_str = "org.apache.cassandra.db.marshal.ColumnToCollectionType";
|
|
|
|
static sstring compound_name(const schema& s) {
|
|
sstring compound(_composite_str);
|
|
|
|
compound += "(";
|
|
if (s.clustering_key_size()) {
|
|
for (auto &t : s.clustering_key_columns()) {
|
|
compound += t.type->name() + ",";
|
|
}
|
|
}
|
|
|
|
if (!s.is_dense()) {
|
|
compound += s.regular_column_name_type()->name() + ",";
|
|
}
|
|
|
|
if (!s.collections().empty()) {
|
|
compound += _collection_str;
|
|
compound += "(";
|
|
for (auto& c : s.collections()) {
|
|
compound += format("{}:{},", to_hex(c.first), c.second->name());
|
|
}
|
|
compound.back() = ')';
|
|
compound += ",";
|
|
}
|
|
// last one will be a ',', just replace it.
|
|
compound.back() = ')';
|
|
return compound;
|
|
}
|
|
|
|
sstring to_sstring(const schema& s) {
|
|
if (s.is_compound()) {
|
|
return compound_name(s);
|
|
} else if (s.clustering_key_size() == 1) {
|
|
SCYLLA_ASSERT(s.is_dense() || s.is_static_compact_table());
|
|
return s.clustering_key_columns().front().type->name();
|
|
} else {
|
|
return s.regular_column_name_type()->name();
|
|
}
|
|
}
|
|
|
|
bool check_compound(sstring comparator) {
|
|
static sstring compound(_composite_str);
|
|
return comparator.compare(0, compound.size(), compound) == 0;
|
|
}
|
|
|
|
void read_collections(schema_builder& builder, sstring comparator)
|
|
{
|
|
// The format of collection entries in the comparator is:
|
|
// org.apache.cassandra.db.marshal.ColumnToCollectionType(<name1>:<type1>, ...)
|
|
|
|
auto find_closing_parenthesis = [] (std::string_view str, size_t start) {
|
|
auto pos = start;
|
|
auto nest_level = 0;
|
|
do {
|
|
pos = str.find_first_of("()", pos);
|
|
if (pos == sstring::npos) {
|
|
throw marshal_exception("read_collections - can't find any parentheses");
|
|
}
|
|
if (str[pos] == ')') {
|
|
nest_level--;
|
|
} else if (str[pos] == '(') {
|
|
nest_level++;
|
|
}
|
|
pos++;
|
|
} while (nest_level > 0);
|
|
return pos - 1;
|
|
};
|
|
|
|
auto collection_str_length = strlen(_collection_str);
|
|
|
|
auto pos = comparator.find(_collection_str);
|
|
if (pos == sstring::npos) {
|
|
return;
|
|
}
|
|
pos += collection_str_length + 1;
|
|
while (pos < comparator.size()) {
|
|
size_t end = comparator.find('(', pos);
|
|
if (end == sstring::npos) {
|
|
throw marshal_exception("read_collections - open parenthesis not found");
|
|
}
|
|
end = find_closing_parenthesis(comparator, end) + 1;
|
|
|
|
auto colon = comparator.find(':', pos);
|
|
if (colon == sstring::npos || colon > end) {
|
|
throw marshal_exception("read_collections - colon not found");
|
|
}
|
|
|
|
auto name = from_hex(std::string_view(comparator.c_str() + pos, colon - pos));
|
|
|
|
colon++;
|
|
auto type_str = std::string_view(comparator.c_str() + colon, end - colon);
|
|
auto type = db::marshal::type_parser::parse(type_str);
|
|
|
|
builder.with_collection(name, type);
|
|
|
|
if (end < comparator.size() && comparator[end] == ',') {
|
|
pos = end + 1;
|
|
} else if (end < comparator.size() && comparator[end] == ')') {
|
|
pos = sstring::npos;
|
|
} else {
|
|
throw marshal_exception("read_collections - invalid collection format");
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::regular_begin() const {
|
|
return regular_columns().begin();
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::regular_end() const {
|
|
return regular_columns().end();
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::regular_lower_bound(const bytes& name) const {
|
|
return std::ranges::lower_bound(regular_columns(), name, std::ranges::less(), std::mem_fn(&column_definition::name));
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::regular_upper_bound(const bytes& name) const {
|
|
return std::ranges::upper_bound(regular_columns(), name, std::ranges::less(), std::mem_fn(&column_definition::name));
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::static_begin() const {
|
|
return static_columns().begin();
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::static_end() const {
|
|
return static_columns().end();
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::static_lower_bound(const bytes& name) const {
|
|
return std::ranges::lower_bound(static_columns(), name, std::ranges::less(), std::mem_fn(&column_definition::name));
|
|
}
|
|
|
|
schema::const_iterator
|
|
schema::static_upper_bound(const bytes& name) const {
|
|
return std::ranges::upper_bound(static_columns(), name, std::ranges::less(), std::mem_fn(&column_definition::name));
|
|
}
|
|
data_type
|
|
schema::column_name_type(const column_definition& def, const data_type& regular_column_name_type) {
|
|
if (def.kind == column_kind::regular_column) {
|
|
return regular_column_name_type;
|
|
}
|
|
return utf8_type;
|
|
}
|
|
|
|
data_type
|
|
schema::column_name_type(const column_definition& def) const {
|
|
return column_name_type(def, _raw._regular_column_name_type);
|
|
}
|
|
|
|
const column_definition&
|
|
schema::regular_column_at(column_id id) const {
|
|
if (id >= regular_columns_count()) {
|
|
on_internal_error(dblog, format("{}.{}@{}: regular column id {:d} >= {:d}",
|
|
ks_name(), cf_name(), version(), id, regular_columns_count()));
|
|
}
|
|
return _raw._columns.at(column_offset(column_kind::regular_column) + id);
|
|
}
|
|
|
|
const column_definition&
|
|
schema::clustering_column_at(column_id id) const {
|
|
if (id >= clustering_key_size()) {
|
|
on_internal_error(dblog, format("{}.{}@{}: clustering column id {:d} >= {:d}",
|
|
ks_name(), cf_name(), version(), id, clustering_key_size()));
|
|
}
|
|
return _raw._columns.at(column_offset(column_kind::clustering_key) + id);
|
|
}
|
|
|
|
const column_definition&
|
|
schema::static_column_at(column_id id) const {
|
|
if (id >= static_columns_count()) {
|
|
on_internal_error(dblog, format("{}.{}@{}: static column id {:d} >= {:d}",
|
|
ks_name(), cf_name(), version(), id, static_columns_count()));
|
|
}
|
|
return _raw._columns.at(column_offset(column_kind::static_column) + id);
|
|
}
|
|
|
|
bool
|
|
schema::is_last_partition_key(const column_definition& def) const {
|
|
return &_raw._columns.at(partition_key_size() - 1) == &def;
|
|
}
|
|
|
|
bool
|
|
schema::has_static_columns() const {
|
|
return !static_columns().empty();
|
|
}
|
|
|
|
column_count_type
|
|
schema::columns_count(column_kind kind) const {
|
|
switch (kind) {
|
|
case column_kind::partition_key:
|
|
return partition_key_size();
|
|
case column_kind::clustering_key:
|
|
return clustering_key_size();
|
|
case column_kind::static_column:
|
|
return static_columns_count();
|
|
case column_kind::regular_column:
|
|
return regular_columns_count();
|
|
default:
|
|
std::abort();
|
|
}
|
|
}
|
|
column_count_type
|
|
schema::partition_key_size() const {
|
|
return column_offset(column_kind::clustering_key);
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::partition_key_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin() + column_offset(column_kind::partition_key)
|
|
, _raw._columns.begin() + column_offset(column_kind::clustering_key));
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::clustering_key_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin() + column_offset(column_kind::clustering_key)
|
|
, _raw._columns.begin() + column_offset(column_kind::static_column));
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::static_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin() + column_offset(column_kind::static_column)
|
|
, _raw._columns.begin() + column_offset(column_kind::regular_column));
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::regular_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin() + column_offset(column_kind::regular_column)
|
|
, _raw._columns.end());
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::primary_key_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin(), _raw._columns.begin() + column_offset(column_kind::static_column));
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::static_and_regular_columns() const {
|
|
return std::ranges::subrange(_raw._columns.begin() + column_offset(column_kind::static_column), _raw._columns.end());
|
|
}
|
|
|
|
schema::const_iterator_range_type
|
|
schema::columns(column_kind kind) const {
|
|
switch (kind) {
|
|
case column_kind::partition_key:
|
|
return partition_key_columns();
|
|
case column_kind::clustering_key:
|
|
return clustering_key_columns();
|
|
case column_kind::static_column:
|
|
return static_columns();
|
|
case column_kind::regular_column:
|
|
return regular_columns();
|
|
}
|
|
throw std::invalid_argument(std::to_string(int(kind)));
|
|
}
|
|
|
|
void schema::compute_all_columns_in_select_order() {
|
|
auto is_static_compact_table = this->is_static_compact_table();
|
|
auto no_non_pk_columns = is_compact_table()
|
|
// Origin: && CompactTables.hasEmptyCompactValue(this);
|
|
&& regular_columns_count() == 1
|
|
&& [](const column_definition& c) {
|
|
// We use empty_type now to match origin, but earlier incarnations
|
|
// set name empty instead. check either.
|
|
return c.type == empty_type || c.name().empty();
|
|
}(regular_column_at(0));
|
|
auto pk_range = const_iterator_range_type(_raw._columns.begin(),
|
|
_raw._columns.begin() + (is_static_compact_table ?
|
|
column_offset(column_kind::clustering_key) :
|
|
column_offset(column_kind::static_column)));
|
|
auto ck_v_range = no_non_pk_columns ? static_columns()
|
|
: const_iterator_range_type(static_columns().begin(), all_columns().end());
|
|
auto to_addr = std::views::transform([] (const column_definition& x) { return &x; });
|
|
std::ranges::copy(pk_range | to_addr, std::back_inserter(_all_columns_in_select_order));
|
|
std::ranges::copy(ck_v_range | to_addr, std::back_inserter(_all_columns_in_select_order));
|
|
}
|
|
|
|
uint32_t
|
|
schema::position(const column_definition& column) const {
|
|
if (column.is_primary_key()) {
|
|
return column.id;
|
|
}
|
|
return clustering_key_size();
|
|
}
|
|
|
|
std::optional<index_metadata> schema::find_index_noname(const index_metadata& target) const {
|
|
const auto& it = std::ranges::find_if(_raw._indices_by_name, [&] (auto&& e) {
|
|
return e.second.equals_noname(target);
|
|
});
|
|
if (it != _raw._indices_by_name.end()) {
|
|
return it->second;
|
|
}
|
|
return {};
|
|
}
|
|
|
|
std::vector<index_metadata> schema::indices() const {
|
|
return _raw._indices_by_name | std::views::values | std::ranges::to<std::vector>();
|
|
}
|
|
|
|
const std::unordered_map<sstring, index_metadata>& schema::all_indices() const {
|
|
return _raw._indices_by_name;
|
|
}
|
|
|
|
bool schema::has_index(const sstring& index_name) const {
|
|
return _raw._indices_by_name.contains(index_name);
|
|
}
|
|
|
|
std::vector<sstring> schema::index_names() const {
|
|
return _raw._indices_by_name | std::views::keys | std::ranges::to<std::vector>();
|
|
}
|
|
|
|
data_type schema::make_legacy_default_validator() const {
|
|
return _raw._default_validation_class;
|
|
}
|
|
|
|
bool schema::is_synced() const {
|
|
return _registry_entry && _registry_entry->is_synced();
|
|
}
|
|
|
|
bool schema::equal_columns(const schema& other) const {
|
|
return std::ranges::equal(all_columns(), other.all_columns());
|
|
}
|
|
|
|
schema_ptr schema::make_reversed() const {
|
|
return make_lw_shared<schema>(schema::reversed_tag{}, *this);
|
|
}
|
|
|
|
schema_ptr schema::get_reversed() const {
|
|
return local_schema_registry().get_or_load(reversed(_raw._version), [this] (table_schema_version) -> extended_frozen_schema {
|
|
auto s = make_reversed();
|
|
return extended_frozen_schema(s);
|
|
});
|
|
}
|
|
|
|
schema_ptr schema::make_with_cdc(schema_ptr cdc_schema) const {
|
|
return make_lw_shared<schema>(schema::with_cdc_schema_tag{}, *this, cdc_schema);
|
|
}
|
|
|
|
raw_view_info::raw_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause)
|
|
: _base_id(std::move(base_id))
|
|
, _base_name(std::move(base_name))
|
|
, _include_all_columns(include_all_columns)
|
|
, _where_clause(where_clause)
|
|
{ }
|
|
|
|
column_computation_ptr column_computation::deserialize(bytes_view raw) {
|
|
rjson::value parsed = rjson::parse(std::string_view(reinterpret_cast<const char*>(raw.begin()), reinterpret_cast<const char*>(raw.end())));
|
|
if (!parsed.IsObject()) {
|
|
throw std::runtime_error(format("Invalid column computation value: {}", parsed));
|
|
}
|
|
const rjson::value* type_json = rjson::find(parsed, "type");
|
|
if (!type_json || !type_json->IsString()) {
|
|
throw std::runtime_error(format("Type {} is not convertible to string", *type_json));
|
|
}
|
|
const std::string_view type = rjson::to_string_view(*type_json);
|
|
if (type == "token") {
|
|
return std::make_unique<legacy_token_column_computation>();
|
|
}
|
|
if (type == "token_v2") {
|
|
return std::make_unique<token_column_computation>();
|
|
}
|
|
if (type.starts_with("collection_")) {
|
|
const rjson::value* collection_name = rjson::find(parsed, "collection_name");
|
|
|
|
if (collection_name && collection_name->IsString()) {
|
|
auto collection = rjson::to_string_view(*collection_name);
|
|
auto collection_as_bytes = bytes(collection.begin(), collection.end());
|
|
if (auto collection = collection_column_computation::for_target_type(type, collection_as_bytes)) {
|
|
return collection->clone();
|
|
}
|
|
}
|
|
}
|
|
if (type == alternator::extract_from_attrs_column_computation::TYPE_NAME) {
|
|
return std::make_unique<alternator::extract_from_attrs_column_computation>(parsed);
|
|
}
|
|
throw std::runtime_error(format("Incorrect column computation type {} found when parsing {}", *type_json, parsed));
|
|
}
|
|
|
|
bytes legacy_token_column_computation::serialize() const {
|
|
rjson::value serialized = rjson::empty_object();
|
|
rjson::add(serialized, "type", rjson::from_string("token"));
|
|
return to_bytes(rjson::print(serialized));
|
|
}
|
|
|
|
bytes legacy_token_column_computation::compute_value(const schema& schema, const partition_key& key) const {
|
|
return {dht::get_token(schema, key).data()};
|
|
}
|
|
|
|
bytes token_column_computation::serialize() const {
|
|
rjson::value serialized = rjson::empty_object();
|
|
rjson::add(serialized, "type", rjson::from_string("token_v2"));
|
|
return to_bytes(rjson::print(serialized));
|
|
}
|
|
|
|
bytes token_column_computation::compute_value(const schema& schema, const partition_key& key) const {
|
|
auto long_value = dht::token::to_int64(dht::get_token(schema, key));
|
|
return long_type->decompose(long_value);
|
|
}
|
|
|
|
bytes collection_column_computation::serialize() const {
|
|
rjson::value serialized = rjson::empty_object();
|
|
const char* type = nullptr;
|
|
switch (_kind) {
|
|
case kind::keys:
|
|
type = "collection_keys";
|
|
break;
|
|
case kind::values:
|
|
type = "collection_values";
|
|
break;
|
|
case kind::entries:
|
|
type = "collection_entries";
|
|
break;
|
|
}
|
|
rjson::add(serialized, "type", rjson::from_string(type));
|
|
rjson::add(serialized, "collection_name", rjson::from_string(to_string_view(_collection_name)));
|
|
return to_bytes(rjson::print(serialized));
|
|
}
|
|
|
|
column_computation_ptr collection_column_computation::for_target_type(std::string_view type, const bytes& collection_name) {
|
|
if (type == "collection_keys") {
|
|
return collection_column_computation::for_keys(collection_name).clone();
|
|
}
|
|
if (type == "collection_values") {
|
|
return collection_column_computation::for_values(collection_name).clone();
|
|
}
|
|
if (type == "collection_entries") {
|
|
return collection_column_computation::for_entries(collection_name).clone();
|
|
}
|
|
return {};
|
|
}
|
|
|
|
void collection_column_computation::operate_on_collection_entries(
|
|
std::invocable<collection_kv*, collection_kv*, tombstone> auto&& old_and_new_row_func, const schema& schema,
|
|
const partition_key& key, const db::view::clustering_or_static_row& update, const std::optional<db::view::clustering_or_static_row>& existing) const {
|
|
|
|
const column_definition* cdef = schema.get_column_definition(_collection_name);
|
|
|
|
decltype(collection_mutation_view_description::cells) update_cells, existing_cells;
|
|
|
|
const auto* update_cell = update.cells().find_cell(cdef->id);
|
|
tombstone update_tombstone = update.tomb().tomb();
|
|
if (update_cell) {
|
|
collection_mutation_view update_col_view = update_cell->as_collection_mutation();
|
|
update_col_view.with_deserialized(*(cdef->type), [&update_cells, &update_tombstone] (collection_mutation_view_description descr) {
|
|
update_tombstone.apply(descr.tomb);
|
|
update_cells = descr.cells;
|
|
});
|
|
}
|
|
if (existing) {
|
|
const auto* existing_cell = existing->cells().find_cell(cdef->id);
|
|
if (existing_cell) {
|
|
collection_mutation_view existing_col_view = existing_cell->as_collection_mutation();
|
|
existing_col_view.with_deserialized(*(cdef->type), [&existing_cells] (collection_mutation_view_description descr) {
|
|
existing_cells = descr.cells;
|
|
});
|
|
}
|
|
}
|
|
|
|
auto compare = [](const collection_kv& p1, const collection_kv& p2) {
|
|
return p1.first <=> p2.first;
|
|
};
|
|
|
|
// Both collections are assumed to be sorted by the keys.
|
|
auto existing_it = existing_cells.begin();
|
|
auto update_it = update_cells.begin();
|
|
|
|
auto is_existing_end = [&] {
|
|
return existing_it == existing_cells.end();
|
|
};
|
|
auto is_update_end = [&] {
|
|
return update_it == update_cells.end();
|
|
};
|
|
while (!(is_existing_end() && is_update_end())) {
|
|
std::strong_ordering cmp = [&] {
|
|
if (is_existing_end()) {
|
|
return std::strong_ordering::greater;
|
|
} else if (is_update_end()) {
|
|
return std::strong_ordering::less;
|
|
}
|
|
return compare(*existing_it, *update_it);
|
|
}();
|
|
|
|
auto existing_ptr = [&] () -> collection_kv* {
|
|
return (!is_existing_end() && cmp <= 0) ? &*existing_it : nullptr;
|
|
};
|
|
auto update_ptr = [&] () -> collection_kv* {
|
|
return (!is_update_end() && cmp >= 0) ? &*update_it : nullptr;
|
|
};
|
|
|
|
old_and_new_row_func(existing_ptr(), update_ptr(), update_tombstone);
|
|
if (cmp <= 0) {
|
|
++existing_it;
|
|
}
|
|
if (cmp >= 0) {
|
|
++update_it;
|
|
}
|
|
}
|
|
}
|
|
|
|
bytes collection_column_computation::compute_value(const schema&, const partition_key&) const {
|
|
throw std::runtime_error(fmt::format("{}: not supported", __PRETTY_FUNCTION__));
|
|
}
|
|
|
|
std::vector<db::view::view_key_and_action> collection_column_computation::compute_values_with_action(const schema& schema, const partition_key& key,
|
|
const db::view::clustering_or_static_row& update, const std::optional<db::view::clustering_or_static_row>& existing) const {
|
|
using collection_kv = std::pair<bytes_view, atomic_cell_view>;
|
|
auto serialize_cell = [_kind = _kind](const collection_kv& kv) -> bytes {
|
|
using kind = collection_column_computation::kind;
|
|
auto& [key, value] = kv;
|
|
switch (_kind) {
|
|
case kind::keys:
|
|
return bytes(key);
|
|
case kind::values:
|
|
return value.value().linearize();
|
|
case kind::entries:
|
|
bytes_opt elements[] = {bytes(key), value.value().linearize()};
|
|
return tuple_type_impl::build_value(elements);
|
|
}
|
|
std::abort(); // compiler will error
|
|
};
|
|
|
|
std::vector<db::view::view_key_and_action> ret;
|
|
|
|
auto compute_row_marker = [] (auto&& cell) -> row_marker {
|
|
return cell.is_live_and_has_ttl() ? row_marker(cell.timestamp(), cell.ttl(), cell.expiry()) : row_marker(cell.timestamp());
|
|
};
|
|
|
|
auto fn = [&ret, &compute_row_marker, &serialize_cell] (collection_kv* existing, collection_kv* update, tombstone tomb) {
|
|
api::timestamp_type operation_ts = tomb.timestamp;
|
|
if (existing && update && compare_atomic_cell_for_merge(existing->second, update->second) == 0) {
|
|
return;
|
|
}
|
|
if (update) {
|
|
operation_ts = update->second.timestamp();
|
|
if (update->second.is_live()) {
|
|
row_marker rm = compute_row_marker(update->second);
|
|
ret.push_back({serialize_cell(*update), {rm}});
|
|
}
|
|
}
|
|
operation_ts -= 1;
|
|
if (existing && existing->second.is_live()) {
|
|
db::view::view_key_and_action::shadowable_tombstone_tag tag{operation_ts};
|
|
ret.push_back({serialize_cell(*existing), {tag}});
|
|
}
|
|
};
|
|
operate_on_collection_entries(fn, schema, key, update, existing);
|
|
return ret;
|
|
}
|
|
|
|
bool operator==(const raw_view_info& x, const raw_view_info& y) {
|
|
// Keep consistent with appending_hash<raw_view_info>
|
|
return x._base_id == y._base_id
|
|
&& x._base_name == y._base_name
|
|
&& x._include_all_columns == y._include_all_columns
|
|
&& x._where_clause == y._where_clause;
|
|
}
|
|
|
|
auto fmt::formatter<raw_view_info>::format(const raw_view_info& view, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(),
|
|
"ViewInfo{{baseTableId={}, baseTableName={}, includeAllColumns={}, whereClause={}}}",
|
|
view._base_id, view._base_name, view._include_all_columns, view._where_clause);
|
|
}
|
|
|
|
auto fmt::formatter<view_ptr>::format(const view_ptr& view, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
if (view) {
|
|
return fmt::format_to(ctx.out(), "{}", *view);
|
|
} else {
|
|
return fmt::format_to(ctx.out(), "null");
|
|
}
|
|
}
|
|
|
|
namespace std {
|
|
|
|
std::ostream& operator<<(std::ostream& os, const table_info& ti) {
|
|
fmt::print(os, "{}", ti);
|
|
return os;
|
|
}
|
|
|
|
} // namespace std
|
|
|
|
auto fmt::formatter<table_info>::format(const table_info& ti,
|
|
fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "table{{name={}, id={}}}", ti.name, ti.id);
|
|
}
|
|
|
|
schema_mismatch_error::schema_mismatch_error(table_schema_version expected, const schema& access)
|
|
: std::runtime_error(fmt::format("Attempted to deserialize schema-dependent object of version {} using {}.{} {}",
|
|
expected, access.ks_name(), access.cf_name(), access.version()))
|
|
{ }
|