mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'Add tests for schema changes' from Paweł
This series adds a generic test for schema changes that generates
various schema and data before and after an ALTER TABLE operation. It is
then used to check correctness of mutation::upgrade() and sstable
readers and lead to the discovery of #3924 and #3925.
Fixes #3925.
* https://github.com/pdziepak/scylla.git schema-change-test/v3.1
schema_builder: make member function names less confusing
converting_mutation_partition_applier: fix collection type changes
converting_mutation_partition_applier: do not emit empty collections
sstable: use format() instead of sprint()
tests/random-utils: make functions and variables inline
tests: add models for schemas and data
tests: generate schema changes
tests/mutation: add test for schema changes
tests/sstable: add test for schema changes
(cherry picked from commit 564b328b2e)
This commit is contained in:
committed by
Paweł Dziepak
parent
28cca751d1
commit
f124b7026f
@@ -38,44 +38,44 @@ private:
|
||||
static bool is_compatible(const column_definition& new_def, const data_type& old_type, column_kind kind) {
|
||||
return ::is_compatible(new_def.kind, kind) && new_def.type->is_value_compatible_with(*old_type);
|
||||
}
|
||||
static atomic_cell upgrade_cell(const abstract_type& new_type, const abstract_type& old_type, atomic_cell_view cell,
|
||||
atomic_cell::collection_member cm = atomic_cell::collection_member::no) {
|
||||
if (cell.is_live() && !old_type.is_counter()) {
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
return atomic_cell::make_live(new_type, cell.timestamp(), cell.value().linearize(), cell.expiry(), cell.ttl(), cm);
|
||||
}
|
||||
return atomic_cell::make_live(new_type, cell.timestamp(), cell.value().linearize(), cm);
|
||||
} else {
|
||||
return atomic_cell(new_type, cell);
|
||||
}
|
||||
}
|
||||
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
|
||||
if (!is_compatible(new_def, old_type, kind) || cell.timestamp() <= new_def.dropped_at()) {
|
||||
return;
|
||||
}
|
||||
auto new_cell = [&] {
|
||||
if (cell.is_live() && !old_type->is_counter()) {
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
return atomic_cell_or_collection(
|
||||
atomic_cell::make_live(*new_def.type, cell.timestamp(), cell.value().linearize(), cell.expiry(), cell.ttl())
|
||||
);
|
||||
}
|
||||
return atomic_cell_or_collection(
|
||||
atomic_cell::make_live(*new_def.type, cell.timestamp(), cell.value().linearize())
|
||||
);
|
||||
} else {
|
||||
return atomic_cell_or_collection(*new_def.type, cell);
|
||||
}
|
||||
}();
|
||||
dst.apply(new_def, std::move(new_cell));
|
||||
dst.apply(new_def, upgrade_cell(*new_def.type, *old_type, cell));
|
||||
}
|
||||
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
|
||||
if (!is_compatible(new_def, old_type, kind)) {
|
||||
return;
|
||||
}
|
||||
cell.data.with_linearized([&] (bytes_view cell_bv) {
|
||||
auto&& ctype = static_pointer_cast<const collection_type_impl>(old_type);
|
||||
auto old_view = ctype->deserialize_mutation_form(cell_bv);
|
||||
auto new_ctype = static_pointer_cast<const collection_type_impl>(new_def.type);
|
||||
auto old_ctype = static_pointer_cast<const collection_type_impl>(old_type);
|
||||
auto old_view = old_ctype->deserialize_mutation_form(cell_bv);
|
||||
|
||||
collection_type_impl::mutation_view new_view;
|
||||
collection_type_impl::mutation new_view;
|
||||
if (old_view.tomb.timestamp > new_def.dropped_at()) {
|
||||
new_view.tomb = old_view.tomb;
|
||||
}
|
||||
for (auto& c : old_view.cells) {
|
||||
if (c.second.timestamp() > new_def.dropped_at()) {
|
||||
new_view.cells.emplace_back(std::move(c));
|
||||
new_view.cells.emplace_back(c.first, upgrade_cell(*new_ctype->value_comparator(), *old_ctype->value_comparator(), c.second, atomic_cell::collection_member::yes));
|
||||
}
|
||||
}
|
||||
dst.apply(new_def, ctype->serialize_mutation_form(std::move(new_view)));
|
||||
if (new_view.tomb || !new_view.cells.empty()) {
|
||||
dst.apply(new_def, new_ctype->serialize_mutation_form(std::move(new_view)));
|
||||
}
|
||||
});
|
||||
}
|
||||
public:
|
||||
|
||||
@@ -276,7 +276,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
|
||||
auto type = validate_alter(schema, *def, *validator);
|
||||
// In any case, we update the column definition
|
||||
cfm.with_altered_column_type(column_name->name(), type);
|
||||
cfm.alter_column_type(column_name->name(), type);
|
||||
|
||||
// We also have to validate the view types here. If we have a view which includes a column as part of
|
||||
// the clustering key, we need to make sure that it is indeed compatible.
|
||||
@@ -285,7 +285,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
if (view_def) {
|
||||
schema_builder builder(view);
|
||||
auto view_type = validate_alter(view, *view_def, *validator);
|
||||
builder.with_altered_column_type(column_name->name(), std::move(view_type));
|
||||
builder.alter_column_type(column_name->name(), std::move(view_type));
|
||||
view_updates.push_back(view_ptr(builder.build()));
|
||||
}
|
||||
}
|
||||
@@ -306,7 +306,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
} else {
|
||||
for (auto&& column_def : boost::range::join(schema->static_columns(), schema->regular_columns())) { // find
|
||||
if (column_def.name() == column_name->name()) {
|
||||
cfm.without_column(column_name->name());
|
||||
cfm.remove_column(column_name->name());
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -349,7 +349,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
auto to = entry.second->prepare_column_identifier(schema);
|
||||
|
||||
validate_column_rename(db, *schema, *from, *to);
|
||||
cfm.with_column_rename(from->name(), to->name());
|
||||
cfm.rename_column(from->name(), to->name());
|
||||
|
||||
// If the view includes a renamed column, it must be renamed in
|
||||
// the view table and the definition.
|
||||
@@ -360,7 +360,7 @@ future<shared_ptr<cql_transport::event::schema_change>> alter_table_statement::a
|
||||
auto view_from = entry.first->prepare_column_identifier(view);
|
||||
auto view_to = entry.second->prepare_column_identifier(view);
|
||||
validate_column_rename(db, *view, *view_from, *view_to);
|
||||
builder.with_column_rename(view_from->name(), view_to->name());
|
||||
builder.rename_column(view_from->name(), view_to->name());
|
||||
|
||||
auto new_where = util::rename_column_in_where_clause(
|
||||
view->view_info()->where_clause(),
|
||||
|
||||
@@ -110,7 +110,7 @@ void alter_type_statement::do_announce_migration(database& db, ::keyspace& ks, b
|
||||
if (t_opt) {
|
||||
modified = true;
|
||||
// We need to update this column
|
||||
cfm.with_altered_column_type(column.name(), *t_opt);
|
||||
cfm.alter_column_type(column.name(), *t_opt);
|
||||
}
|
||||
}
|
||||
if (modified) {
|
||||
|
||||
@@ -713,7 +713,7 @@ schema_builder& schema_builder::with_column(bytes name, data_type type, column_k
|
||||
return *this;
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::without_column(bytes name)
|
||||
schema_builder& schema_builder::remove_column(bytes name)
|
||||
{
|
||||
auto it = boost::range::find_if(_raw._columns, [&] (auto& column) {
|
||||
return column.name() == name;
|
||||
@@ -738,7 +738,7 @@ schema_builder& schema_builder::without_column(sstring name, data_type type, api
|
||||
return *this;
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::with_column_rename(bytes from, bytes to)
|
||||
schema_builder& schema_builder::rename_column(bytes from, bytes to)
|
||||
{
|
||||
auto it = std::find_if(_raw._columns.begin(), _raw._columns.end(), [&] (auto& col) {
|
||||
return col.name() == from;
|
||||
@@ -750,7 +750,7 @@ schema_builder& schema_builder::with_column_rename(bytes from, bytes to)
|
||||
return with_column(new_def);
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::with_altered_column_type(bytes name, data_type new_type)
|
||||
schema_builder& schema_builder::alter_column_type(bytes name, data_type new_type)
|
||||
{
|
||||
auto it = boost::find_if(_raw._columns, [&name] (auto& c) { return c.name() == name; });
|
||||
assert(it != _raw._columns.end());
|
||||
|
||||
@@ -240,11 +240,11 @@ public:
|
||||
schema_builder& with_column(const column_definition& c);
|
||||
schema_builder& with_column(bytes name, data_type type, column_kind kind = column_kind::regular_column, column_view_virtual view_virtual = column_view_virtual::no);
|
||||
schema_builder& with_column(bytes name, data_type type, column_kind kind, column_id component_index, column_view_virtual view_virtual = column_view_virtual::no);
|
||||
schema_builder& without_column(bytes name);
|
||||
schema_builder& remove_column(bytes name);
|
||||
schema_builder& without_column(sstring name, api::timestamp_type timestamp);
|
||||
schema_builder& without_column(sstring name, data_type, api::timestamp_type timestamp);
|
||||
schema_builder& with_column_rename(bytes from, bytes to);
|
||||
schema_builder& with_altered_column_type(bytes name, data_type new_type);
|
||||
schema_builder& rename_column(bytes from, bytes to);
|
||||
schema_builder& alter_column_type(bytes name, data_type new_type);
|
||||
|
||||
// Adds information about collection that existed in the past but the column
|
||||
// has since been removed. For adding colllections that are still alive
|
||||
|
||||
@@ -96,7 +96,7 @@ private:
|
||||
std::optional<column_id> id;
|
||||
if (def) {
|
||||
if (def->is_multi_cell() != type->is_multi_cell() || def->is_counter() != type->is_counter()) {
|
||||
throw malformed_sstable_exception(sprint(
|
||||
throw malformed_sstable_exception(format(
|
||||
"{} definition in serialization header does not match schema. "
|
||||
"Schema collection = {}, counter = {}. Header collection = {}, counter = {}",
|
||||
def->name(),
|
||||
|
||||
@@ -30,6 +30,9 @@
|
||||
#include "flat_mutation_reader_assertions.hh"
|
||||
#include "mutation_query.hh"
|
||||
#include "mutation_rebuilder.hh"
|
||||
#include "random-utils.hh"
|
||||
#include "cql3/cql3_type.hh"
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
|
||||
// partitions must be sorted by decorated key
|
||||
static void require_no_token_duplicates(const std::vector<mutation>& partitions) {
|
||||
@@ -1709,3 +1712,596 @@ clustering_key random_mutation_generator::make_random_key() {
|
||||
std::vector<query::clustering_range> random_mutation_generator::make_random_ranges(unsigned n_ranges) {
|
||||
return _impl->make_random_ranges(n_ranges);
|
||||
}
|
||||
|
||||
namespace tests::data_model {
|
||||
|
||||
static constexpr const api::timestamp_type previously_removed_column_timestamp = 100;
|
||||
static constexpr const api::timestamp_type data_timestamp = 200;
|
||||
static constexpr const api::timestamp_type column_removal_timestamp = 300;
|
||||
|
||||
class mutation_description {
|
||||
public:
|
||||
using key = std::vector<bytes>;
|
||||
struct collection_element {
|
||||
bytes key;
|
||||
bytes value;
|
||||
};
|
||||
using collection = std::vector<collection_element>;
|
||||
using atomic_value = bytes;
|
||||
using value = std::variant<atomic_value, collection>;
|
||||
struct cell {
|
||||
sstring column_name;
|
||||
value data_value;
|
||||
};
|
||||
using row = std::vector<cell>;
|
||||
struct clustered_row {
|
||||
api::timestamp_type marker;
|
||||
row cells;
|
||||
};
|
||||
struct range_tombstone {
|
||||
key first;
|
||||
key last;
|
||||
};
|
||||
|
||||
private:
|
||||
key _partition_key;
|
||||
row _static_row;
|
||||
std::map<key, clustered_row> _clustered_rows;
|
||||
std::vector<range_tombstone> _range_tombstones;
|
||||
|
||||
private:
|
||||
static void remove_column(row& r, const sstring& name) {
|
||||
auto it = boost::range::find_if(r, [&] (const cell& c) {
|
||||
return c.column_name == name;
|
||||
});
|
||||
if (it != r.end()) {
|
||||
r.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
explicit mutation_description(key partition_key)
|
||||
: _partition_key(std::move(partition_key))
|
||||
{ }
|
||||
|
||||
void add_static_cell(const sstring& column, value v) {
|
||||
_static_row.emplace_back(cell { column, std::move(v) });
|
||||
}
|
||||
|
||||
void add_clustered_cell(const key& ck, const sstring& column, value v) {
|
||||
_clustered_rows[ck].cells.emplace_back(cell { column, std::move(v) });
|
||||
}
|
||||
|
||||
void add_clustered_row_marker(const key& ck) {
|
||||
_clustered_rows[ck].marker = data_timestamp;
|
||||
}
|
||||
|
||||
void remove_static_column(const sstring& name) {
|
||||
remove_column(_static_row, name);
|
||||
}
|
||||
|
||||
void remove_regular_column(const sstring& name) {
|
||||
for (auto& [ ckey, cr ] : _clustered_rows) {
|
||||
(void)ckey;
|
||||
remove_column(cr.cells, name);
|
||||
}
|
||||
}
|
||||
|
||||
void add_range_tombstone(const key& start, const key& end) {
|
||||
_range_tombstones.emplace_back(range_tombstone { start, end });
|
||||
}
|
||||
|
||||
mutation build(schema_ptr s) const {
|
||||
auto m = mutation(s, partition_key::from_exploded(*s, _partition_key));
|
||||
for (auto& [ column, value_or_collection ] : _static_row) {
|
||||
auto cdef = s->get_column_definition(utf8_type->decompose(column));
|
||||
assert(cdef);
|
||||
std::visit(make_visitor(
|
||||
[&] (const atomic_value& v) {
|
||||
assert(cdef->is_atomic());
|
||||
m.set_static_cell(*cdef, atomic_cell::make_live(*cdef->type, data_timestamp, v));
|
||||
},
|
||||
[&] (const collection& c) {
|
||||
assert(!cdef->is_atomic());
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(cdef->type);
|
||||
collection_type_impl::mutation mut;
|
||||
for (auto& [ key, value ] : c) {
|
||||
mut.cells.emplace_back(key, atomic_cell::make_live(*ctype->value_comparator(), data_timestamp,
|
||||
value, atomic_cell::collection_member::yes));
|
||||
}
|
||||
m.set_static_cell(*cdef, ctype->serialize_mutation_form(std::move(mut)));
|
||||
}
|
||||
), value_or_collection);
|
||||
}
|
||||
for (auto& [ ckey, cr ] : _clustered_rows) {
|
||||
auto& [ marker, cells ] = cr;
|
||||
auto ck = clustering_key::from_exploded(*s, ckey);
|
||||
for (auto& [ column, value_or_collection ] : cells) {
|
||||
auto cdef = s->get_column_definition(utf8_type->decompose(column));
|
||||
assert(cdef);
|
||||
std::visit(make_visitor(
|
||||
[&] (const atomic_value& v) {
|
||||
assert(cdef->is_atomic());
|
||||
m.set_clustered_cell(ck, *cdef, atomic_cell::make_live(*cdef->type, data_timestamp, v));
|
||||
},
|
||||
[&] (const collection& c) {
|
||||
assert(!cdef->is_atomic());
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(cdef->type);
|
||||
collection_type_impl::mutation mut;
|
||||
for (auto& [ key, value ] : c) {
|
||||
mut.cells.emplace_back(key, atomic_cell::make_live(*ctype->value_comparator(), data_timestamp,
|
||||
value, atomic_cell::collection_member::yes));
|
||||
}
|
||||
m.set_clustered_cell(ck, *cdef, ctype->serialize_mutation_form(std::move(mut)));
|
||||
}
|
||||
), value_or_collection);
|
||||
}
|
||||
if (marker != api::missing_timestamp) {
|
||||
m.partition().clustered_row(*s, ckey).apply(row_marker(marker));
|
||||
}
|
||||
}
|
||||
clustering_key::less_compare cmp(*s);
|
||||
for (auto& [ a, b ] : _range_tombstones) {
|
||||
auto start = clustering_key::from_exploded(*s, a);
|
||||
auto stop = clustering_key::from_exploded(*s, b);
|
||||
if (cmp(stop, start)) {
|
||||
std::swap(start, stop);
|
||||
}
|
||||
auto rt = ::range_tombstone(std::move(start), bound_kind::excl_start,
|
||||
std::move(stop), bound_kind::excl_end,
|
||||
tombstone(previously_removed_column_timestamp, gc_clock::time_point()));
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
};
|
||||
|
||||
class table_description {
|
||||
public:
|
||||
using column = std::tuple<sstring, data_type>;
|
||||
struct removed_column {
|
||||
sstring name;
|
||||
data_type type;
|
||||
api::timestamp_type removal_timestamp;
|
||||
};
|
||||
|
||||
private:
|
||||
std::vector<column> _partition_key;
|
||||
std::vector<column> _clustering_key;
|
||||
std::vector<column> _static_columns;
|
||||
std::vector<column> _regular_columns;
|
||||
|
||||
std::vector<removed_column> _removed_columns;
|
||||
|
||||
std::vector<mutation_description> _mutations;
|
||||
|
||||
std::vector<sstring> _change_log;
|
||||
|
||||
private:
|
||||
static std::vector<column>::iterator find_column(std::vector<column>& columns, const sstring& name) {
|
||||
return boost::range::find_if(columns, [&] (const column& c) {
|
||||
return std::get<sstring>(c) == name;
|
||||
});
|
||||
}
|
||||
|
||||
static void add_column(std::vector<column>& columns, const sstring& name, data_type type) {
|
||||
assert(find_column(columns, name) == columns.end());
|
||||
columns.emplace_back(name, type);
|
||||
}
|
||||
|
||||
void add_old_column(const sstring& name, data_type type) {
|
||||
_removed_columns.emplace_back(removed_column { name, type, previously_removed_column_timestamp });
|
||||
}
|
||||
|
||||
void remove_column(std::vector<column>& columns, const sstring& name) {
|
||||
auto it = find_column(columns, name);
|
||||
assert(it != columns.end());
|
||||
_removed_columns.emplace_back(removed_column { name, std::get<data_type>(*it), column_removal_timestamp });
|
||||
columns.erase(it);
|
||||
}
|
||||
|
||||
static void alter_column_type(std::vector<column>& columns, const sstring& name, data_type new_type) {
|
||||
auto it = find_column(columns, name);
|
||||
assert(it != columns.end());
|
||||
std::get<data_type>(*it) = new_type;
|
||||
}
|
||||
|
||||
schema_ptr build_schema() const {
|
||||
auto sb = schema_builder("ks", "cf");
|
||||
for (auto&& [ name, type ] : _partition_key) {
|
||||
sb.with_column(utf8_type->decompose(name), type, column_kind::partition_key);
|
||||
}
|
||||
for (auto&& [ name, type ] : _clustering_key) {
|
||||
sb.with_column(utf8_type->decompose(name), type, column_kind::clustering_key);
|
||||
}
|
||||
for (auto&& [ name, type ] : _static_columns) {
|
||||
sb.with_column(utf8_type->decompose(name), type, column_kind::static_column);
|
||||
}
|
||||
for (auto&& [ name, type ] : _regular_columns) {
|
||||
sb.with_column(utf8_type->decompose(name), type);
|
||||
}
|
||||
|
||||
for (auto&& [ name, type, timestamp ] : _removed_columns) {
|
||||
sb.without_column(name, type, timestamp);
|
||||
}
|
||||
|
||||
return sb.build();
|
||||
}
|
||||
|
||||
std::vector<mutation> build_mutations(schema_ptr s) const {
|
||||
auto ms = boost::copy_range<std::vector<mutation>>(
|
||||
_mutations | boost::adaptors::transformed([&] (const mutation_description& md) {
|
||||
return md.build(s);
|
||||
})
|
||||
);
|
||||
boost::sort(ms, mutation_decorated_key_less_comparator());
|
||||
return ms;
|
||||
}
|
||||
|
||||
public:
|
||||
explicit table_description(std::vector<column> partition_key, std::vector<column> clustering_key)
|
||||
: _partition_key(std::move(partition_key))
|
||||
, _clustering_key(std::move(clustering_key))
|
||||
{ }
|
||||
|
||||
void add_static_column(const sstring& name, data_type type) {
|
||||
_change_log.emplace_back(format("added static column \'{}\' of type \'{}\'", name, type->as_cql3_type()->to_string()));
|
||||
add_column(_static_columns, name, type);
|
||||
}
|
||||
|
||||
void add_regular_column(const sstring& name, data_type type) {
|
||||
_change_log.emplace_back(format("added regular column \'{}\' of type \'{}\'", name, type->as_cql3_type()->to_string()));
|
||||
add_column(_regular_columns, name, type);
|
||||
}
|
||||
|
||||
void add_old_static_column(const sstring& name, data_type type) {
|
||||
add_old_column(name, type);
|
||||
}
|
||||
|
||||
void add_old_regular_column(const sstring& name, data_type type) {
|
||||
add_old_column(name, type);
|
||||
}
|
||||
|
||||
void remove_static_column(const sstring& name) {
|
||||
_change_log.emplace_back(format("removed static column \'{}\'", name));
|
||||
remove_column(_static_columns, name);
|
||||
for (auto& m : _mutations) {
|
||||
m.remove_static_column(name);
|
||||
}
|
||||
}
|
||||
|
||||
void remove_regular_column(const sstring& name) {
|
||||
_change_log.emplace_back(format("removed regular column \'{}\'", name));
|
||||
remove_column(_regular_columns, name);
|
||||
for (auto& m : _mutations) {
|
||||
m.remove_regular_column(name);
|
||||
}
|
||||
}
|
||||
|
||||
void alter_partition_column_type(const sstring& name, data_type new_type) {
|
||||
_change_log.emplace_back(format("altered partition column \'{}\' type to \'{}\'", name, new_type->as_cql3_type()->to_string()));
|
||||
alter_column_type(_partition_key, name, new_type);
|
||||
}
|
||||
|
||||
void alter_clustering_column_type(const sstring& name, data_type new_type) {
|
||||
_change_log.emplace_back(format("altered clustering column \'{}\' type to \'{}\'", name, new_type->as_cql3_type()->to_string()));
|
||||
alter_column_type(_clustering_key, name, new_type);
|
||||
}
|
||||
|
||||
void alter_static_column_type(const sstring& name, data_type new_type) {
|
||||
_change_log.emplace_back(format("altered static column \'{}\' type to \'{}\'", name, new_type->as_cql3_type()->to_string()));
|
||||
alter_column_type(_static_columns, name, new_type);
|
||||
}
|
||||
|
||||
void alter_regular_column_type(const sstring& name, data_type new_type) {
|
||||
_change_log.emplace_back(format("altered regular column \'{}\' type to \'{}\'", name, new_type->as_cql3_type()->to_string()));
|
||||
alter_column_type(_regular_columns, name, new_type);
|
||||
}
|
||||
|
||||
void rename_partition_column(const sstring& from, const sstring& to) {
|
||||
_change_log.emplace_back(format("renamed partition column \'{}\' to \'{}\'", from, to));
|
||||
auto it = find_column(_partition_key, from);
|
||||
assert(it != _partition_key.end());
|
||||
std::get<sstring>(*it) = to;
|
||||
}
|
||||
void rename_clustering_column(const sstring& from, const sstring& to) {
|
||||
_change_log.emplace_back(format("renamed clustering column \'{}\' to \'{}\'", from, to));
|
||||
auto it = find_column(_clustering_key, from);
|
||||
assert(it != _clustering_key.end());
|
||||
std::get<sstring>(*it) = to;
|
||||
}
|
||||
|
||||
std::vector<mutation_description>& unordered_mutations() { return _mutations; }
|
||||
const std::vector<mutation_description>& unordered_mutations() const { return _mutations; }
|
||||
|
||||
struct table {
|
||||
sstring schema_changes_log;
|
||||
schema_ptr schema;
|
||||
std::vector<mutation> mutations;
|
||||
};
|
||||
table build() const {
|
||||
auto s = build_schema();
|
||||
return { boost::algorithm::join(_change_log, "\n"), s, build_mutations(s) };
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mutation>&,
|
||||
schema_ptr, const std::vector<mutation>&)> fn) {
|
||||
auto map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, true);
|
||||
auto map_of_int_to_bytes = map_type_impl::get_instance(int32_type, bytes_type, true);
|
||||
auto frozen_map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, false);
|
||||
auto frozen_map_of_int_to_bytes = map_type_impl::get_instance(int32_type, bytes_type, false);
|
||||
auto tuple_of_int_long = tuple_type_impl::get_instance({ int32_type, long_type });
|
||||
auto tuple_of_bytes_long = tuple_type_impl::get_instance( { bytes_type, long_type });
|
||||
auto tuple_of_bytes_bytes = tuple_type_impl::get_instance( { bytes_type, bytes_type });
|
||||
auto set_of_text = set_type_impl::get_instance(utf8_type, true);
|
||||
auto set_of_bytes = set_type_impl::get_instance(bytes_type, true);
|
||||
auto udt_int_text = user_type_impl::get_instance("ks", "udt",
|
||||
{ utf8_type->decompose("f1"), utf8_type->decompose("f2"), },
|
||||
{ int32_type, utf8_type });
|
||||
auto udt_int_blob_long = user_type_impl::get_instance("ks", "udt",
|
||||
{ utf8_type->decompose("v1"), utf8_type->decompose("v2"), utf8_type->decompose("v3"), },
|
||||
{ int32_type, bytes_type, long_type });
|
||||
|
||||
auto random_int32_value = [] {
|
||||
return int32_type->decompose(tests::random::get_int<int32_t>());
|
||||
};
|
||||
int32_t key_id = 0;
|
||||
auto random_partition_key = [&] () -> tests::data_model::mutation_description::key {
|
||||
return { random_int32_value(), random_int32_value(), int32_type->decompose(key_id++), };
|
||||
};
|
||||
auto random_clustering_key = [&] () -> tests::data_model::mutation_description::key {
|
||||
return {
|
||||
utf8_type->decompose(tests::random::get_sstring()),
|
||||
utf8_type->decompose(tests::random::get_sstring()),
|
||||
utf8_type->decompose(format("{}", key_id++)),
|
||||
};
|
||||
};
|
||||
auto random_map = [&] () -> tests::data_model::mutation_description::collection {
|
||||
return {
|
||||
{ int32_type->decompose(1), random_int32_value() },
|
||||
{ int32_type->decompose(2), random_int32_value() },
|
||||
{ int32_type->decompose(3), random_int32_value() },
|
||||
};
|
||||
};
|
||||
auto random_frozen_map = [&] {
|
||||
return map_of_int_to_int->decompose(make_map_value(map_of_int_to_int, map_type_impl::native_type({
|
||||
{ 1, tests::random::get_int<int32_t>() },
|
||||
{ 2, tests::random::get_int<int32_t>() },
|
||||
{ 3, tests::random::get_int<int32_t>() },
|
||||
})));
|
||||
};
|
||||
auto random_tuple = [&] {
|
||||
return tuple_of_int_long->decompose(make_tuple_value(tuple_of_int_long, tuple_type_impl::native_type{
|
||||
tests::random::get_int<int32_t>(), tests::random::get_int<int64_t>(),
|
||||
}));
|
||||
};
|
||||
auto random_set = [&] () -> tests::data_model::mutation_description::collection {
|
||||
return {
|
||||
{ utf8_type->decompose("a"), bytes() },
|
||||
{ utf8_type->decompose("b"), bytes() },
|
||||
{ utf8_type->decompose("c"), bytes() },
|
||||
};
|
||||
};
|
||||
auto random_udt = [&] {
|
||||
return udt_int_text->decompose(make_user_value(udt_int_text, user_type_impl::native_type{
|
||||
tests::random::get_int<int32_t>(),
|
||||
tests::random::get_sstring(),
|
||||
}));
|
||||
};
|
||||
|
||||
struct column_description {
|
||||
int id;
|
||||
data_type type;
|
||||
std::vector<data_type> alter_to;
|
||||
std::vector<std::function<tests::data_model::mutation_description::value()>> data_generators;
|
||||
data_type old_type;
|
||||
};
|
||||
|
||||
auto columns = std::vector<column_description> {
|
||||
{ 100, int32_type, { varint_type, bytes_type }, { [&] { return random_int32_value(); }, [&] { return bytes(); } }, uuid_type },
|
||||
{ 200, map_of_int_to_int, { map_of_int_to_bytes }, { [&] { return random_map(); } }, empty_type },
|
||||
{ 300, int32_type, { varint_type, bytes_type }, { [&] { return random_int32_value(); }, [&] { return bytes(); } }, empty_type },
|
||||
{ 400, frozen_map_of_int_to_int, { frozen_map_of_int_to_bytes }, { [&] { return random_frozen_map(); } }, empty_type },
|
||||
{ 500, tuple_of_int_long, { tuple_of_bytes_long, tuple_of_bytes_bytes }, { [&] { return random_tuple(); } }, empty_type },
|
||||
{ 600, set_of_text, { set_of_bytes }, { [&] { return random_set(); } }, empty_type },
|
||||
{ 700, udt_int_text, { udt_int_blob_long }, { [&] { return random_udt(); } }, empty_type },
|
||||
};
|
||||
auto static_columns = columns;
|
||||
auto regular_columns = columns;
|
||||
|
||||
// Base schema
|
||||
auto s = tests::data_model::table_description({ { "pk1", int32_type }, { "pk2", int32_type }, { "pk3", int32_type }, },
|
||||
{ { "ck1", utf8_type }, { "ck2", utf8_type }, { "ck3", utf8_type }, });
|
||||
for (auto& sc : static_columns) {
|
||||
auto name = format("s{}", sc.id);
|
||||
s.add_static_column(name, sc.type);
|
||||
if (sc.old_type != empty_type) {
|
||||
s.add_old_static_column(name, sc.old_type);
|
||||
}
|
||||
}
|
||||
for (auto& rc : regular_columns) {
|
||||
auto name = format("r{}", rc.id);
|
||||
s.add_regular_column(name, rc.type);
|
||||
if (rc.old_type != empty_type) {
|
||||
s.add_old_regular_column(name, rc.old_type);
|
||||
}
|
||||
}
|
||||
|
||||
auto max_generator_count = std::max(
|
||||
// boost::max_elements wants the iterators to be copy-assignable. The ones we get
|
||||
// from boost::adaptors::transformed aren't.
|
||||
boost::accumulate(static_columns | boost::adaptors::transformed([] (const column_description& c) {
|
||||
return c.data_generators.size();
|
||||
}), 0u, [] (size_t a, size_t b) { return std::max(a, b); }),
|
||||
boost::accumulate(regular_columns | boost::adaptors::transformed([] (const column_description& c) {
|
||||
return c.data_generators.size();
|
||||
}), 0u, [] (size_t a, size_t b) { return std::max(a, b); })
|
||||
);
|
||||
|
||||
// Base data
|
||||
|
||||
// Single column in a static row, nothing else
|
||||
for (auto& [id, type, alter_to, data_generators, old_type] : static_columns) {
|
||||
auto name = format("s{}", id);
|
||||
for (auto& dg : data_generators) {
|
||||
auto m = tests::data_model::mutation_description(random_partition_key());
|
||||
m.add_static_cell(name, dg());
|
||||
s.unordered_mutations().emplace_back(std::move(m));
|
||||
}
|
||||
}
|
||||
|
||||
// Partition with rows each having a single column
|
||||
auto m = tests::data_model::mutation_description(random_partition_key());
|
||||
for (auto& [id, type, alter_to, data_generators, old_type] : regular_columns) {
|
||||
auto name = format("r{}", id);
|
||||
for (auto& dg : data_generators) {
|
||||
m.add_clustered_cell(random_clustering_key(), name, dg());
|
||||
}
|
||||
}
|
||||
s.unordered_mutations().emplace_back(std::move(m));
|
||||
|
||||
// Absolutely everything
|
||||
for (auto i = 0u; i < max_generator_count; i++) {
|
||||
auto m = tests::data_model::mutation_description(random_partition_key());
|
||||
for (auto& [id, type, alter_to, data_generators, old_type] : static_columns) {
|
||||
auto name = format("s{}", id);
|
||||
m.add_static_cell(name, data_generators[std::min<size_t>(i, data_generators.size() - 1)]());
|
||||
}
|
||||
for (auto& [id, type, alter_to, data_generators, old_type] : regular_columns) {
|
||||
auto name = format("r{}", id);
|
||||
m.add_clustered_cell(random_clustering_key(), name, data_generators[std::min<size_t>(i, data_generators.size() - 1)]());
|
||||
}
|
||||
|
||||
m.add_range_tombstone(random_clustering_key(), random_clustering_key());
|
||||
m.add_range_tombstone(random_clustering_key(), random_clustering_key());
|
||||
m.add_range_tombstone(random_clustering_key(), random_clustering_key());
|
||||
|
||||
s.unordered_mutations().emplace_back(std::move(m));
|
||||
}
|
||||
|
||||
// Transformations
|
||||
auto base = s.build();
|
||||
|
||||
std::vector<tests::data_model::table_description::table> schemas;
|
||||
schemas.emplace_back(base);
|
||||
|
||||
auto test_mutated_schemas = [&] {
|
||||
auto& [ base_change_log, base_schema, base_mutations ] = base;
|
||||
for (auto&& [ mutated_change_log, mutated_schema, mutated_mutations ] : schemas) {
|
||||
BOOST_TEST_MESSAGE(format("\nSchema change from:\n\n{}\n\nto:\n\n{}\n", base_change_log, mutated_change_log));
|
||||
fn(base_schema, base_mutations, mutated_schema, mutated_mutations);
|
||||
}
|
||||
for (auto i = 2u; i < schemas.size(); i++) {
|
||||
auto& [ base_change_log, base_schema, base_mutations ] = schemas[i - 1];
|
||||
auto& [ mutated_change_log, mutated_schema, mutated_mutations ] = schemas[i];
|
||||
BOOST_TEST_MESSAGE(format("\nSchema change from:\n\n{}\n\nto:\n\n{}\n", base_change_log, mutated_change_log));
|
||||
fn(base_schema, base_mutations, mutated_schema, mutated_mutations);
|
||||
}
|
||||
schemas.clear();
|
||||
schemas.emplace_back(base);
|
||||
};
|
||||
|
||||
auto original_s = s;
|
||||
// Remove and add back all static columns
|
||||
for (auto& sc : static_columns) {
|
||||
s.remove_static_column(format("s{}", sc.id));
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
for (auto& sc : static_columns) {
|
||||
s.add_static_column(format("s{}", sc.id), uuid_type);
|
||||
auto mutated = s.build();
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Remove and add back all regular columns
|
||||
for (auto& rc : regular_columns) {
|
||||
s.remove_regular_column(format("r{}", rc.id));
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
auto temp_s = s;
|
||||
auto temp_schemas = schemas;
|
||||
for (auto& rc : regular_columns) {
|
||||
s.add_regular_column(format("r{}", rc.id), uuid_type);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = temp_s;
|
||||
schemas = temp_schemas;
|
||||
// Add back all regular columns as collections
|
||||
for (auto& rc : regular_columns) {
|
||||
s.add_regular_column(format("r{}", rc.id), map_of_int_to_bytes);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = temp_s;
|
||||
schemas = temp_schemas;
|
||||
// Add back all regular columns as frozen collections
|
||||
for (auto& rc : regular_columns) {
|
||||
s.add_regular_column(format("r{}", rc.id), frozen_map_of_int_to_int);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Add more static columns
|
||||
for (auto& sc : static_columns) {
|
||||
s.add_static_column(format("s{}", sc.id + 1), uuid_type);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Add more regular columns
|
||||
for (auto& rc : regular_columns) {
|
||||
s.add_regular_column(format("r{}", rc.id + 1), uuid_type);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Alter column types
|
||||
for (auto& sc : static_columns) {
|
||||
for (auto& target : sc.alter_to) {
|
||||
s.alter_static_column_type(format("s{}", sc.id), target);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
}
|
||||
for (auto& rc : regular_columns) {
|
||||
for (auto& target : rc.alter_to) {
|
||||
s.alter_regular_column_type(format("r{}", rc.id), target);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
}
|
||||
for (auto i = 1; i <= 3; i++) {
|
||||
s.alter_clustering_column_type(format("ck{}", i), bytes_type);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
for (auto i = 1; i <= 3; i++) {
|
||||
s.alter_partition_column_type(format("pk{}", i), bytes_type);
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Rename clustering key
|
||||
for (auto i = 1; i <= 3; i++) {
|
||||
s.rename_clustering_column(format("ck{}", i), format("ck{}", 100 - i));
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
|
||||
s = original_s;
|
||||
// Rename partition key
|
||||
for (auto i = 1; i <= 3; i++) {
|
||||
s.rename_partition_column(format("pk{}", i), format("pk{}", 100 - i));
|
||||
schemas.emplace_back(s.build());
|
||||
}
|
||||
test_mutated_schemas();
|
||||
}
|
||||
|
||||
@@ -64,3 +64,6 @@ public:
|
||||
};
|
||||
|
||||
bytes make_blob(size_t blob_size);
|
||||
|
||||
void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mutation>&,
|
||||
schema_ptr, const std::vector<mutation>&)>);
|
||||
|
||||
@@ -1792,3 +1792,15 @@ SEASTAR_THREAD_TEST_CASE(test_row_size_is_immune_to_application_order) {
|
||||
|
||||
BOOST_REQUIRE_EQUAL(size1, size2);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
|
||||
for_each_schema_change([] (schema_ptr base, const std::vector<mutation>& base_mutations,
|
||||
schema_ptr changed, const std::vector<mutation>& changed_mutations) {
|
||||
BOOST_REQUIRE_EQUAL(base_mutations.size(), changed_mutations.size());
|
||||
for (auto bc : boost::range::combine(base_mutations, changed_mutations)) {
|
||||
auto b = boost::get<0>(bc);
|
||||
b.upgrade(changed);
|
||||
BOOST_CHECK_EQUAL(b, boost::get<1>(bc));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -29,7 +29,7 @@ namespace tests::random {
|
||||
|
||||
namespace internal {
|
||||
|
||||
std::random_device::result_type get_seed()
|
||||
inline std::random_device::result_type get_seed()
|
||||
{
|
||||
std::random_device rd;
|
||||
auto seed = rd();
|
||||
@@ -39,7 +39,7 @@ std::random_device::result_type get_seed()
|
||||
|
||||
}
|
||||
|
||||
static std::default_random_engine gen(internal::get_seed());
|
||||
inline std::default_random_engine gen(internal::get_seed());
|
||||
|
||||
template<typename T>
|
||||
T get_int() {
|
||||
@@ -59,28 +59,28 @@ T get_int(T min, T max) {
|
||||
return dist(gen);
|
||||
}
|
||||
|
||||
bool get_bool() {
|
||||
inline bool get_bool() {
|
||||
static std::bernoulli_distribution dist;
|
||||
return dist(gen);
|
||||
}
|
||||
|
||||
bytes get_bytes(size_t n) {
|
||||
inline bytes get_bytes(size_t n) {
|
||||
bytes b(bytes::initialized_later(), n);
|
||||
boost::generate(b, [] { return get_int<bytes::value_type>(); });
|
||||
return b;
|
||||
}
|
||||
|
||||
bytes get_bytes() {
|
||||
inline bytes get_bytes() {
|
||||
return get_bytes(get_int<unsigned>(128 * 1024));
|
||||
}
|
||||
|
||||
sstring get_sstring(size_t n) {
|
||||
inline sstring get_sstring(size_t n) {
|
||||
sstring str(sstring::initialized_later(), n);
|
||||
boost::generate(str, [] { return get_int<sstring::value_type>('a', 'z'); });
|
||||
return str;
|
||||
}
|
||||
|
||||
sstring get_sstring() {
|
||||
inline sstring get_sstring() {
|
||||
return get_sstring(get_int<unsigned>(1024));
|
||||
}
|
||||
|
||||
|
||||
@@ -1368,3 +1368,57 @@ SEASTAR_THREAD_TEST_CASE(test_large_index_pages_do_not_cause_large_allocations)
|
||||
assert_that(actual).is_equal_to(expected);
|
||||
BOOST_REQUIRE_EQUAL(large_allocs_after - large_allocs_before, 0);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_schema_changes) {
|
||||
auto dir = make_lw_shared<tmpdir>();
|
||||
storage_service_for_tests ssft;
|
||||
auto wait_bg = seastar::defer([] { sstables::await_background_jobs().get(); });
|
||||
int gen = 1;
|
||||
|
||||
std::map<std::tuple<sstables::sstable::version_types, schema_ptr>, std::tuple<shared_sstable, int>> cache;
|
||||
for_each_schema_change([&] (schema_ptr base, const std::vector<mutation>& base_mutations,
|
||||
schema_ptr changed, const std::vector<mutation>& changed_mutations) {
|
||||
for (auto version : { sstables::sstable::version_types::ka, sstables::sstable::version_types::la }) {
|
||||
auto it = cache.find(std::tuple { version, base });
|
||||
|
||||
shared_sstable created_with_base_schema;
|
||||
shared_sstable created_with_changed_schema;
|
||||
if (it == cache.end()) {
|
||||
auto mt = make_lw_shared<memtable>(base);
|
||||
for (auto& m : base_mutations) {
|
||||
mt->apply(m);
|
||||
}
|
||||
created_with_base_schema = sstables::make_sstable(base, dir->path, gen, version, sstables::sstable::format_types::big);
|
||||
sstable_writer_config cfg;
|
||||
cfg.large_partition_handler = &nop_lp_handler;
|
||||
created_with_base_schema->write_components(mt->make_flat_reader(base), base_mutations.size(), base, cfg).get();
|
||||
created_with_base_schema->load().get();
|
||||
|
||||
created_with_changed_schema = sstables::make_sstable(changed, dir->path, gen, version, sstables::sstable::format_types::big);
|
||||
created_with_changed_schema->load().get();
|
||||
|
||||
cache.emplace(std::tuple { version, base }, std::tuple { created_with_base_schema, gen });
|
||||
gen++;
|
||||
} else {
|
||||
created_with_base_schema = std::get<shared_sstable>(it->second);
|
||||
|
||||
created_with_changed_schema = sstables::make_sstable(changed, dir->path, std::get<int>(it->second), version, sstables::sstable::format_types::big);
|
||||
created_with_changed_schema->load().get();
|
||||
}
|
||||
|
||||
auto mr = assert_that(created_with_base_schema->as_mutation_source()
|
||||
.make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
|
||||
for (auto& m : changed_mutations) {
|
||||
mr.produces(m);
|
||||
}
|
||||
mr.produces_end_of_stream();
|
||||
|
||||
mr = assert_that(created_with_changed_schema->as_mutation_source()
|
||||
.make_reader(changed, dht::partition_range::make_open_ended_both_sides(), changed->full_slice()));
|
||||
for (auto& m : changed_mutations) {
|
||||
mr.produces(m);
|
||||
}
|
||||
mr.produces_end_of_stream();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user