frozen_mutation: Guard against unfreezing using wrong schema

Currently, calling unfreeze() using the wrong version of the schema
results in undefined behavior. That can cause hard-to-debug
problems. Better to throw in such cases.

Refs #4549.

Tests:
  - unit (dev)
Message-Id: <1560459022-23786-1-git-send-email-tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2019-06-13 22:50:22 +02:00
committed by Avi Kivity
parent f32371727b
commit f798f724c8
5 changed files with 69 additions and 0 deletions

View File

@@ -107,6 +107,7 @@ frozen_mutation::frozen_mutation(const mutation& m)
mutation
frozen_mutation::unfreeze(schema_ptr schema) const {
check_schema_version(schema_version(), *schema);
mutation m(schema, key(*schema));
partition_builder b(*schema, m.partition());
partition().accept(*schema, b);

View File

@@ -65,6 +65,9 @@ public:
partition_key_view key(const schema& s) const;
dht::decorated_key decorated_key(const schema& s) const;
mutation_partition_view partition() const;
// The supplied schema must be of the same version as the schema of
// the mutation which was used to create this instance.
// throws schema_mismatch_error otherwise.
mutation unfreeze(schema_ptr s) const;
struct printer {
@@ -73,6 +76,7 @@ public:
friend std::ostream& operator<<(std::ostream&, const printer&);
};
// Same requirements about the schema as unfreeze().
printer pretty_printer(schema_ptr) const;
};

View File

@@ -1264,3 +1264,8 @@ std::ostream& operator<<(std::ostream& os, const raw_view_info& view) {
std::ostream& operator<<(std::ostream& os, const view_ptr& view) {
return view ? os << *view : os << "null";
}
schema_mismatch_error::schema_mismatch_error(table_schema_version expected, const schema& access)
: std::runtime_error(fmt::format("Attempted to deserialize schema-dependent object of version {} using {}.{} {}",
expected, access.ks_name(), access.cf_name(), access.version()))
{ }

View File

@@ -30,6 +30,7 @@
#include "cql3/column_specification.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/util/backtrace.hh>
#include "types.hh"
#include "compound.hh"
#include "gc_clock.hh"
@@ -848,3 +849,19 @@ public:
std::ostream& operator<<(std::ostream& os, const view_ptr& view);
utils::UUID generate_legacy_id(const sstring& ks_name, const sstring& cf_name);
// Thrown when attempted to access a schema-dependent object using
// an incompatible version of the schema object.
class schema_mismatch_error : public std::runtime_error {
public:
schema_mismatch_error(table_schema_version expected, const schema& access);
};
// Throws schema_mismatch_error when a schema-dependent object of "expected" version
// cannot be accessed using "access" schema.
inline void check_schema_version(table_schema_version expected, const schema& access) {
if (expected != access.version()) {
throw_with_backtrace<schema_mismatch_error>(expected, access);
}
}

View File

@@ -120,3 +120,45 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) {
}
});
}
SEASTAR_TEST_CASE(test_deserialization_using_wrong_schema_throws) {
return seastar::async([] {
storage_service_for_tests ssft;
schema_ptr s1 = new_table()
.with_column("pk_col", bytes_type, column_kind::partition_key)
.with_column("reg_1", bytes_type)
.with_column("reg_2", bytes_type)
.build();
schema_ptr s2 = new_table()
.with_column("pk_col", bytes_type, column_kind::partition_key)
.with_column("reg_0", bytes_type)
.with_column("reg_1", bytes_type)
.with_column("reg_2", bytes_type)
.build();
schema_ptr s3 = new_table()
.with_column("pk_col", bytes_type, column_kind::partition_key)
.with_column("reg_3", bytes_type)
.without_column("reg_0", new_timestamp())
.without_column("reg_1", new_timestamp())
.build();
schema_ptr s4 = new_table()
.with_column("pk_col", bytes_type, column_kind::partition_key)
.with_column("reg_1", int32_type)
.with_column("reg_2", int32_type)
.build();
partition_key key = partition_key::from_single_value(*s1, bytes("key"));
clustering_key ck = clustering_key::make_empty();
mutation m(s1, key);
m.set_clustered_cell(ck, "reg_1", data_value(bytes("val1")), new_timestamp());
m.set_clustered_cell(ck, "reg_2", data_value(bytes("val2")), new_timestamp());
for (auto s : {s2, s3, s4}) {
BOOST_REQUIRE_THROW(freeze(m).unfreeze(s), schema_mismatch_error);
}
});
}