Merge 'Fix schema version mismatch during rolling upgrade from 1.7' from Tomasz
"When there are at least 2 nodes upgraded to 2.0, and the two exchanged schema
for some reason, reads or writes which involve both 1.7 and 2.0 nodes may
start to fail with the following error logged:
storage_proxy - Exception when communicating with 127.0.0.3: Failed to load schema version 58fc9b89-74ab-37ca-8640-8b38a1204f8d
The situation should heal after whole cluster is upgraded.
Table schema versions are calculated by 2.0 nodes differently than 1.7 nodes
due to change in the schema tables format. Mismatch is meant to be avoided by
having 2.0 nodes calculate the old digest on schema migration during upgrade,
and use that version until next time the table is altered. It is thus not
allowed to alter tables during the rolling upgrade.
Two 2.0 nodes may exchange schema, if they detect through gossip that their
schema versions don't match. They may not match temporarily during boot, until
the upgraded node completes the bootstrap and propagates its new schema
through gossip. One source of such temporary mismatch is construction of new
tracing tables, which didn't exist on 1.7. Such schema pull will result in a
schema merge, which cause all tables to be altered and their schema version to
be recalculated. The new schema will not match the one used by 1.7 nodes,
causing reads and writes to fail, because schema requesting won't work during
rolling upgrade from 1.7 to 2.0.
The main fix employed here is to hold schema pulls, even among 2.0 nodes,
until rolling upgrade is complete."
* 'tgrabiec/fix-schema-mismatch' of github.com:scylladb/seastar-dev:
tests: schema_change_test: Add test_merging_does_not_alter_tables_which_didnt_change test case
tests: cql_test_env: Enable all features in tests
schema_tables: Make make_scylla_tables_mutation() visible
migration_manager: Disable pulls during rolling upgrade from 1.7
storage_service: Introduce SCHEMA_TABLES_V3 feature
schema_tables: Don't alter tables which differ only in version
schema_mutations: Use mutation_opt instead of stdx::optional<mutation>
This commit is contained in:
@@ -801,6 +801,15 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
/*auto& old_aggregates = */read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces).get0();
|
||||
#endif
|
||||
|
||||
// Incoming mutations have the version field deleted. Delete here as well so that
|
||||
// schemas which are otherwise equal don't appear as differing.
|
||||
for (auto&& e : old_column_families) {
|
||||
schema_mutations& sm = e.second;
|
||||
if (sm.scylla_tables()) {
|
||||
delete_schema_version(*sm.scylla_tables());
|
||||
}
|
||||
}
|
||||
|
||||
proxy.local().mutate_locally(std::move(mutations)).get0();
|
||||
|
||||
if (do_flush) {
|
||||
@@ -1513,7 +1522,7 @@ static void add_dropped_column_to_schema_mutation(schema_ptr table, const sstrin
|
||||
m.set_clustered_cell(ckey, "type", expand_user_type(column.type)->as_cql3_type()->to_string(), timestamp);
|
||||
}
|
||||
|
||||
static mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type timestamp) {
|
||||
mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type timestamp) {
|
||||
schema_ptr s = tables();
|
||||
auto pkey = partition_key::from_singular(*s, table->ks_name());
|
||||
auto ckey = clustering_key::from_singular(*s, table->cf_name());
|
||||
@@ -1943,7 +1952,7 @@ schema_ptr create_table_from_mutations(schema_mutations sm, std::experimental::o
|
||||
|
||||
std::vector<index_metadata> index_defs;
|
||||
if (sm.indices_mutation()) {
|
||||
index_defs = create_indices_from_index_rows(query::result_set(sm.indices_mutation().value()), ks_name, cf_name);
|
||||
index_defs = create_indices_from_index_rows(query::result_set(*sm.indices_mutation()), ks_name, cf_name);
|
||||
}
|
||||
for (auto&& index : index_defs) {
|
||||
builder.with_index(index);
|
||||
|
||||
@@ -165,6 +165,7 @@ view_ptr create_view_from_mutations(schema_mutations, std::experimental::optiona
|
||||
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
|
||||
|
||||
schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timestamp, bool with_columns);
|
||||
mutation make_scylla_tables_mutation(schema_ptr, api::timestamp_type timestamp);
|
||||
|
||||
void add_table_or_view_to_schema_mutation(schema_ptr view, api::timestamp_type timestamp, bool with_columns, std::vector<mutation>& mutations);
|
||||
|
||||
|
||||
@@ -32,22 +32,22 @@ schema_mutations::schema_mutations(canonical_mutation columnfamilies,
|
||||
stdx::optional<canonical_mutation> scylla_tables)
|
||||
: _columnfamilies(columnfamilies.to_mutation(is_view ? db::schema_tables::views() : db::schema_tables::tables()))
|
||||
, _columns(columns.to_mutation(db::schema_tables::columns()))
|
||||
, _indices(indices ? stdx::optional<mutation>{indices.value().to_mutation(db::schema_tables::indexes())} : stdx::nullopt)
|
||||
, _dropped_columns(dropped_columns ? stdx::optional<mutation>{dropped_columns.value().to_mutation(db::schema_tables::dropped_columns())} : stdx::nullopt)
|
||||
, _scylla_tables(scylla_tables ? stdx::optional<mutation>{scylla_tables.value().to_mutation(db::schema_tables::scylla_tables())} : stdx::nullopt)
|
||||
, _indices(indices ? mutation_opt{indices.value().to_mutation(db::schema_tables::indexes())} : stdx::nullopt)
|
||||
, _dropped_columns(dropped_columns ? mutation_opt{dropped_columns.value().to_mutation(db::schema_tables::dropped_columns())} : stdx::nullopt)
|
||||
, _scylla_tables(scylla_tables ? mutation_opt{scylla_tables.value().to_mutation(db::schema_tables::scylla_tables())} : stdx::nullopt)
|
||||
{}
|
||||
|
||||
void schema_mutations::copy_to(std::vector<mutation>& dst) const {
|
||||
dst.push_back(_columnfamilies);
|
||||
dst.push_back(_columns);
|
||||
if (_indices) {
|
||||
dst.push_back(_indices.value());
|
||||
dst.push_back(*_indices);
|
||||
}
|
||||
if (_dropped_columns) {
|
||||
dst.push_back(_dropped_columns.value());
|
||||
dst.push_back(*_dropped_columns);
|
||||
}
|
||||
if (_scylla_tables) {
|
||||
dst.push_back(_scylla_tables.value());
|
||||
dst.push_back(*_scylla_tables);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,26 +68,26 @@ table_schema_version schema_mutations::digest() const {
|
||||
md5_hasher h;
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies);
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _columns);
|
||||
if (_indices && !_indices.value().partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _indices.value());
|
||||
if (_indices && !_indices->partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_indices);
|
||||
}
|
||||
if (_dropped_columns && !_dropped_columns.value().partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _dropped_columns.value());
|
||||
if (_dropped_columns && !_dropped_columns->partition().empty()) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_dropped_columns);
|
||||
}
|
||||
if (_scylla_tables) {
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, _scylla_tables.value());
|
||||
db::schema_tables::feed_hash_for_schema_digest(h, *_scylla_tables);
|
||||
}
|
||||
return utils::UUID_gen::get_name_UUID(h.finalize());
|
||||
}
|
||||
|
||||
static stdx::optional<mutation> compact(const stdx::optional<mutation>& m) {
|
||||
static mutation_opt compact(const mutation_opt& m) {
|
||||
if (!m) {
|
||||
return m;
|
||||
}
|
||||
return db::schema_tables::compact_for_schema_digest(*m);
|
||||
}
|
||||
|
||||
static stdx::optional<mutation> compact(const mutation& m) {
|
||||
static mutation_opt compact(const mutation& m) {
|
||||
return db::schema_tables::compact_for_schema_digest(m);
|
||||
}
|
||||
|
||||
|
||||
@@ -31,12 +31,12 @@
|
||||
class schema_mutations {
|
||||
mutation _columnfamilies;
|
||||
mutation _columns;
|
||||
stdx::optional<mutation> _indices;
|
||||
stdx::optional<mutation> _dropped_columns;
|
||||
stdx::optional<mutation> _scylla_tables;
|
||||
mutation_opt _indices;
|
||||
mutation_opt _dropped_columns;
|
||||
mutation_opt _scylla_tables;
|
||||
public:
|
||||
schema_mutations(mutation columnfamilies, mutation columns, stdx::optional<mutation> indices, stdx::optional<mutation> dropped_columns,
|
||||
stdx::optional<mutation> scylla_tables)
|
||||
schema_mutations(mutation columnfamilies, mutation columns, mutation_opt indices, mutation_opt dropped_columns,
|
||||
mutation_opt scylla_tables)
|
||||
: _columnfamilies(std::move(columnfamilies))
|
||||
, _columns(std::move(columns))
|
||||
, _indices(std::move(indices))
|
||||
@@ -65,14 +65,18 @@ public:
|
||||
return _columns;
|
||||
}
|
||||
|
||||
const stdx::optional<mutation>& scylla_tables() const {
|
||||
const mutation_opt& scylla_tables() const {
|
||||
return _scylla_tables;
|
||||
}
|
||||
|
||||
const stdx::optional<mutation>& indices_mutation() const {
|
||||
mutation_opt& scylla_tables() {
|
||||
return _scylla_tables;
|
||||
}
|
||||
|
||||
const mutation_opt& indices_mutation() const {
|
||||
return _indices;
|
||||
}
|
||||
const stdx::optional<mutation>& dropped_columns_mutation() const {
|
||||
const mutation_opt& dropped_columns_mutation() const {
|
||||
return _dropped_columns;
|
||||
}
|
||||
|
||||
@@ -86,19 +90,19 @@ public:
|
||||
|
||||
stdx::optional<canonical_mutation> indices_canonical_mutation() const {
|
||||
if (_indices) {
|
||||
return canonical_mutation(_indices.value());
|
||||
return canonical_mutation(*_indices);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
stdx::optional<canonical_mutation> dropped_columns_canonical_mutation() const {
|
||||
if (_dropped_columns) {
|
||||
return canonical_mutation(_dropped_columns.value());
|
||||
return canonical_mutation(*_dropped_columns);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
stdx::optional<canonical_mutation> scylla_tables_canonical_mutation() const {
|
||||
if (_scylla_tables) {
|
||||
return canonical_mutation(_scylla_tables.value());
|
||||
return canonical_mutation(*_scylla_tables);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -261,7 +261,10 @@ bool migration_manager::has_compatible_schema_tables_version(const gms::inet_add
|
||||
|
||||
bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoint) {
|
||||
return has_compatible_schema_tables_version(endpoint)
|
||||
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
|
||||
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint)
|
||||
// Disable pulls during rolling upgrade from 1.7 to 2.0 to avoid
|
||||
// schema version inconsistency. See https://github.com/scylladb/scylla/issues/2802.
|
||||
&& get_storage_service().local().cluster_supports_schema_tables_v3();
|
||||
}
|
||||
|
||||
future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
|
||||
|
||||
@@ -89,6 +89,7 @@ static const sstring COUNTERS_FEATURE = "COUNTERS";
|
||||
static const sstring INDEXES_FEATURE = "INDEXES";
|
||||
static const sstring DIGEST_MULTIPARTITION_READ_FEATURE = "DIGEST_MULTIPARTITION_READ";
|
||||
static const sstring CORRECT_COUNTER_ORDER_FEATURE = "CORRECT_COUNTER_ORDER";
|
||||
static const sstring SCHEMA_TABLES_V3 = "SCHEMA_TABLES_V3";
|
||||
|
||||
distributed<storage_service> _the_storage_service;
|
||||
|
||||
@@ -131,6 +132,7 @@ sstring storage_service::get_config_supported_features() {
|
||||
COUNTERS_FEATURE,
|
||||
DIGEST_MULTIPARTITION_READ_FEATURE,
|
||||
CORRECT_COUNTER_ORDER_FEATURE,
|
||||
SCHEMA_TABLES_V3
|
||||
};
|
||||
if (service::get_local_storage_service()._db.local().get_config().experimental()) {
|
||||
features.push_back(MATERIALIZED_VIEWS_FEATURE);
|
||||
@@ -1357,6 +1359,7 @@ future<> storage_service::init_server(int delay) {
|
||||
ss._counters_feature = gms::feature(COUNTERS_FEATURE);
|
||||
ss._digest_multipartition_read_feature = gms::feature(DIGEST_MULTIPARTITION_READ_FEATURE);
|
||||
ss._correct_counter_order_feature = gms::feature(CORRECT_COUNTER_ORDER_FEATURE);
|
||||
ss._schema_tables_v3 = gms::feature(SCHEMA_TABLES_V3);
|
||||
|
||||
if (ss._db.local().get_config().experimental()) {
|
||||
ss._materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);
|
||||
|
||||
@@ -265,6 +265,7 @@ private:
|
||||
gms::feature _indexes_feature;
|
||||
gms::feature _digest_multipartition_read_feature;
|
||||
gms::feature _correct_counter_order_feature;
|
||||
gms::feature _schema_tables_v3;
|
||||
public:
|
||||
void enable_all_features() {
|
||||
_range_tombstones_feature.enable();
|
||||
@@ -274,6 +275,7 @@ public:
|
||||
_indexes_feature.enable();
|
||||
_digest_multipartition_read_feature.enable();
|
||||
_correct_counter_order_feature.enable();
|
||||
_schema_tables_v3.enable();
|
||||
}
|
||||
|
||||
void finish_bootstrapping() {
|
||||
@@ -2247,6 +2249,10 @@ public:
|
||||
bool cluster_supports_correct_counter_order() const {
|
||||
return bool(_correct_counter_order_feature);
|
||||
}
|
||||
|
||||
bool cluster_supports_schema_tables_v3() const {
|
||||
return bool(_schema_tables_v3);
|
||||
}
|
||||
};
|
||||
|
||||
inline future<> init_storage_service(distributed<database>& db) {
|
||||
|
||||
@@ -309,6 +309,10 @@ public:
|
||||
gms::get_failure_detector().stop().get();
|
||||
});
|
||||
|
||||
ss.invoke_on_all([] (auto&& ss) {
|
||||
ss.enable_all_features();
|
||||
}).get();
|
||||
|
||||
distributed<service::storage_proxy>& proxy = service::get_storage_proxy();
|
||||
distributed<service::migration_manager>& mm = service::get_migration_manager();
|
||||
distributed<db::batchlog_manager>& bm = db::get_batchlog_manager();
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "tests/result_set_assertions.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "schema_registry.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
@@ -246,6 +247,49 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
|
||||
return do_with_cql_env([](cql_test_env& e) {
|
||||
return seastar::async([&] {
|
||||
service::migration_manager& mm = service::get_local_migration_manager();
|
||||
|
||||
auto&& keyspace = e.db().local().find_keyspace("ks").metadata();
|
||||
|
||||
auto legacy_version = utils::UUID_gen::get_time_UUID();
|
||||
auto s0 = schema_builder("ks", "table1")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("v1", bytes_type)
|
||||
.with_version(legacy_version)
|
||||
.build();
|
||||
|
||||
auto find_table = [&] () -> column_family& {
|
||||
return e.db().local().find_column_family("ks", "table1");
|
||||
};
|
||||
|
||||
auto muts1 = db::schema_tables::make_create_table_mutations(keyspace, s0, api::new_timestamp()).get0();
|
||||
service::get_storage_proxy().local().mutate_locally(muts1).get();
|
||||
e.db().invoke_on_all([gs = global_schema_ptr(s0)] (database& db) {
|
||||
return db.add_column_family_and_make_directory(gs);
|
||||
}).get();
|
||||
|
||||
auto s1 = find_table().schema();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, s1->version());
|
||||
|
||||
mm.announce(muts1).get();
|
||||
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
|
||||
auto muts2 = muts1;
|
||||
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
|
||||
mm.announce(muts2).get();
|
||||
|
||||
BOOST_REQUIRE(s1 == find_table().schema());
|
||||
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
class counting_migration_listener : public service::migration_listener {
|
||||
public:
|
||||
int create_keyspace_count = 0;
|
||||
|
||||
Reference in New Issue
Block a user