From 58e7ad20eb237db6456901e2b2124a98dd21a6ea Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 4 Mar 2019 12:14:16 +0100 Subject: [PATCH] sstable/compaction: Use correct schema in the writing consumer Introduced in 2a437ab42731c5801df64b386854dc53bb0eaaa8. regular_compaction::select_sstable_writer() creates the sstable writer when the first partition is consumed from the combined mutation fragment stream. It gets the schema directly from the table object. That may be a different schema than the one used by the readers if there was a concurrent schema alter duringthat small time window. As a result, the writing consumer attached to readers will interpret fragments using the wrong version of the schema. One effect of this is storing values of some columns under a different column. This patch replaces all column_family::schema() accesses with accesses to the _schema memeber which is obtained once per compaction and is the same schema which readers use. Fixes #4304. Tests: - manual tests with hard-coded schema change injection to reproduce the bug - build/dev/scylla boot - tests/sstable_mutation_test Message-Id: <1551698056-23386-1-git-send-email-tgrabiec@scylladb.com> --- sstables/compaction.cc | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index d2d3619fa2..7c106d2ac8 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -313,6 +313,7 @@ public: class compaction { protected: column_family& _cf; + schema_ptr _schema; std::vector _sstables; lw_shared_ptr _compacting; uint64_t _max_sstable_size; @@ -325,6 +326,7 @@ protected: protected: compaction(column_family& cf, std::vector sstables, uint64_t max_sstable_size, uint32_t sstable_level) : _cf(cf) + , _schema(cf.schema()) , _sstables(std::move(sstables)) , _max_sstable_size(max_sstable_size) , _sstable_level(sstable_level) @@ -381,10 +383,9 @@ private: virtual flat_mutation_reader make_sstable_reader() const = 0; flat_mutation_reader setup() { - auto ssts = make_lw_shared(_cf.get_compaction_strategy().make_sstable_set(_cf.schema())); - auto schema = _cf.schema(); + auto ssts = make_lw_shared(_cf.get_compaction_strategy().make_sstable_set(_schema)); sstring formatted_msg = "["; - auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - schema->gc_grace_seconds()); + auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds()); for (auto& sst : _sstables) { // Compacted sstable keeps track of its ancestors. @@ -416,8 +417,8 @@ private: } formatted_msg += "]"; _info->sstables = _sstables.size(); - _info->ks_name = schema->ks_name(); - _info->cf_name = schema->cf_name(); + _info->ks_name = _schema->ks_name(); + _info->cf_name = _schema->cf_name(); report_start(formatted_msg); _compacting = std::move(ssts); @@ -483,7 +484,7 @@ private: } const schema_ptr& schema() const { - return _cf.schema(); + return _schema; } public: static future run(std::unique_ptr c); @@ -546,10 +547,10 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_cf.schema(), + return ::make_local_shard_sstable_reader(_schema, _compacting, query::full_partition_range, - _cf.schema()->full_slice(), + _schema->full_slice(), service::get_local_compaction_priority(), no_resource_tracking(), nullptr, @@ -597,7 +598,7 @@ public: cfg.monitor = &_active_write_monitors.back(); cfg.large_data_handler = _cf.get_large_data_handler(); cfg.run_identifier = _run_identifier; - _writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, get_encoding_stats(), priority)); + _writer.emplace(_sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), priority)); } do_pending_replacements(); return &*_writer; @@ -626,7 +627,7 @@ private: _unreplaced_new_tables.push_back(_sst); // Replace exhausted sstable(s), if any, by new one(s) in the column family. - auto not_exhausted = [s = _cf.schema(), &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) { + auto not_exhausted = [s = _schema, &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) { return sst->get_last_decorated_key().tri_compare(*s, dk) > 0; }; auto exhausted = std::partition(_sstables.begin(), _sstables.end(), not_exhausted); @@ -722,7 +723,7 @@ public: } std::function filter_func() const override { - dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_cf.schema()->ks_name()); + dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name()); return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) { if (dht::shard_of(dk.token()) != engine().cpu_id()) { @@ -798,10 +799,10 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { - return ::make_range_sstable_reader(_cf.schema(), + return ::make_range_sstable_reader(_schema, _compacting, query::full_partition_range, - _cf.schema()->full_slice(), + _schema->full_slice(), service::get_local_compaction_priority(), no_resource_tracking(), nullptr, @@ -832,7 +833,7 @@ public: cfg.max_sstable_size = _max_sstable_size; cfg.large_data_handler = _cf.get_large_data_handler(); auto&& priority = service::get_local_compaction_priority(); - writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard)); + writer.emplace(sst->get_writer(*_schema, partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard)); } return &*writer; }