schema_tables: handle 'cdc' options

cdc options will be stored in scylla_tables to preserve
compatibility with Cassandra.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2019-06-03 16:11:30 +02:00
parent 8df942a320
commit 386221da84
5 changed files with 36 additions and 14 deletions

View File

@@ -60,6 +60,7 @@
#include "mutation_query.hh"
#include "system_keyspace.hh"
#include "cql3/cql3_type.hh"
#include "cdc/cdc.hh"
#include "db/marshal/type_parser.hh"
#include "db/config.hh"
@@ -300,6 +301,7 @@ schema_ptr scylla_tables() {
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("version", uuid_type)
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
.set_gc_grace_seconds(schema_gc_grace)
.with_version(generate_schema_version(id))
.build();
@@ -1458,18 +1460,6 @@ lw_shared_ptr<keyspace_metadata> create_keyspace_from_schema_partition(const sch
return make_lw_shared<keyspace_metadata>(keyspace_name, strategy_name, strategy_options, durable_writes);
}
template<typename K, typename V>
static std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
if (auto values = row.get<map_type_impl::native_type>(name)) {
std::map<K, V> map;
for (auto&& entry : *values) {
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
};
return map;
}
return std::nullopt;
}
template<typename V>
static std::vector<V> get_list(const query::result_set_row& row, const sstring& name) {
std::vector<V> list;
@@ -1685,6 +1675,7 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
auto ckey = clustering_key::from_singular(*s, table->cf_name());
mutation m(scylla_tables(), pkey);
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
return m;
}
@@ -2207,6 +2198,11 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
} else {
builder.with_version(sm.digest());
}
if (auto map = sm.cdc_options()) {
cdc::options cdc_options(*map);
builder.set_cdc_options(std::move(cdc_options));
}
return builder.build();
}

View File

@@ -46,6 +46,7 @@
#include "schema_features.hh"
#include "hashing.hh"
#include "schema_mutations.hh"
#include "types/map.hh"
#include <vector>
#include <map>
@@ -221,5 +222,17 @@ mutation compact_for_schema_digest(const mutation& m);
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
template<typename K, typename V>
std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
if (auto values = row.get<map_type_impl::native_type>(name)) {
std::map<K, V> map;
for (auto&& entry : *values) {
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
};
return map;
}
return std::nullopt;
}
} // namespace schema_tables
} // namespace db

View File

@@ -105,6 +105,16 @@ table_schema_version schema_mutations::digest() const {
return utils::UUID_gen::get_name_UUID(h.finalize());
}
std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const {
if (_scylla_tables) {
auto rs = query::result_set(*_scylla_tables);
if (!rs.empty()) {
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
}
}
return { };
}
static mutation_opt compact(const mutation_opt& m) {
if (!m) {
return m;

View File

@@ -138,6 +138,7 @@ public:
bool is_view() const;
table_schema_version digest() const;
std::optional<std::map<sstring, sstring>> cdc_options() const;
bool operator==(const schema_mutations&) const;
bool operator!=(const schema_mutations&) const;

View File

@@ -371,8 +371,10 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
mm.announce(muts2).get();
BOOST_REQUIRE(s1 == find_table().schema());
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
// SCYLLA_TABLES have additional columns so announcing its mutation
// changes the tables
BOOST_REQUIRE(s1 != find_table().schema());
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
});
});
}