schema_tables: handle 'cdc' options
cdc options will be stored in scylla_tables to preserve compatibility with Cassandra. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
@@ -60,6 +60,7 @@
|
||||
#include "mutation_query.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include "cdc/cdc.hh"
|
||||
|
||||
#include "db/marshal/type_parser.hh"
|
||||
#include "db/config.hh"
|
||||
@@ -300,6 +301,7 @@ schema_ptr scylla_tables() {
|
||||
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
|
||||
.with_column("table_name", utf8_type, column_kind::clustering_key)
|
||||
.with_column("version", uuid_type)
|
||||
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
|
||||
.set_gc_grace_seconds(schema_gc_grace)
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
@@ -1458,18 +1460,6 @@ lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const sch
|
||||
return make_lw_shared<keyspace_metadata>(keyspace_name, strategy_name, strategy_options, durable_writes);
|
||||
}
|
||||
|
||||
template<typename K, typename V>
|
||||
static std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
|
||||
if (auto values = row.get<map_type_impl::native_type>(name)) {
|
||||
std::map<K, V> map;
|
||||
for (auto&& entry : *values) {
|
||||
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
|
||||
};
|
||||
return map;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
template<typename V>
|
||||
static std::vector<V> get_list(const query::result_set_row& row, const sstring& name) {
|
||||
std::vector<V> list;
|
||||
@@ -1685,6 +1675,7 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
mutation m(scylla_tables(), pkey);
|
||||
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
|
||||
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
|
||||
return m;
|
||||
}
|
||||
|
||||
@@ -2207,6 +2198,11 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
|
||||
} else {
|
||||
builder.with_version(sm.digest());
|
||||
}
|
||||
if (auto map = sm.cdc_options()) {
|
||||
cdc::options cdc_options(*map);
|
||||
builder.set_cdc_options(std::move(cdc_options));
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@
|
||||
#include "schema_features.hh"
|
||||
#include "hashing.hh"
|
||||
#include "schema_mutations.hh"
|
||||
#include "types/map.hh"
|
||||
|
||||
#include <vector>
|
||||
#include <map>
|
||||
@@ -221,5 +222,17 @@ mutation compact_for_schema_digest(const mutation& m);
|
||||
|
||||
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
|
||||
|
||||
template<typename K, typename V>
|
||||
std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
|
||||
if (auto values = row.get<map_type_impl::native_type>(name)) {
|
||||
std::map<K, V> map;
|
||||
for (auto&& entry : *values) {
|
||||
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
|
||||
};
|
||||
return map;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
} // namespace schema_tables
|
||||
} // namespace db
|
||||
|
||||
@@ -105,6 +105,16 @@ table_schema_version schema_mutations::digest() const {
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const {
|
||||
if (_scylla_tables) {
|
||||
auto rs = query::result_set(*_scylla_tables);
|
||||
if (!rs.empty()) {
|
||||
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
|
||||
}
|
||||
}
|
||||
return { };
|
||||
}
|
||||
|
||||
static mutation_opt compact(const mutation_opt& m) {
|
||||
if (!m) {
|
||||
return m;
|
||||
|
||||
@@ -138,6 +138,7 @@ public:
|
||||
bool is_view() const;
|
||||
|
||||
table_schema_version digest() const;
|
||||
std::optional<std::map<sstring, sstring>> cdc_options() const;
|
||||
|
||||
bool operator==(const schema_mutations&) const;
|
||||
bool operator!=(const schema_mutations&) const;
|
||||
|
||||
@@ -371,8 +371,10 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
|
||||
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
|
||||
mm.announce(muts2).get();
|
||||
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
// SCYLLA_TABLES have additional columns so announcing its mutation
|
||||
// changes the tables
|
||||
BOOST_REQUIRE(s1 != find_table().schema());
|
||||
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user