sstable/compaction: Use correct schema in the writing consumer

Introduced in 2a437ab427.

regular_compaction::select_sstable_writer() creates the sstable writer
when the first partition is consumed from the combined mutation
fragment stream. It gets the schema directly from the table
object. That may be a different schema than the one used by the
readers if there was a concurrent schema alter duringthat small time
window. As a result, the writing consumer attached to readers will
interpret fragments using the wrong version of the schema.

One effect of this is storing values of some columns under a different
column.

This patch replaces all column_family::schema() accesses with accesses
to the _schema memeber which is obtained once per compaction and is
the same schema which readers use.

Fixes #4304.

Tests:

  - manual tests with hard-coded schema change injection to reproduce the bug
  - build/dev/scylla boot
  - tests/sstable_mutation_test

Message-Id: <1551698056-23386-1-git-send-email-tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2019-03-04 12:14:16 +01:00
committed by Avi Kivity
parent 8f71e7ffd4
commit 58e7ad20eb

View File

@@ -313,6 +313,7 @@ public:
class compaction {
protected:
column_family& _cf;
schema_ptr _schema;
std::vector<shared_sstable> _sstables;
lw_shared_ptr<sstable_set> _compacting;
uint64_t _max_sstable_size;
@@ -325,6 +326,7 @@ protected:
protected:
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level)
: _cf(cf)
, _schema(cf.schema())
, _sstables(std::move(sstables))
, _max_sstable_size(max_sstable_size)
, _sstable_level(sstable_level)
@@ -381,10 +383,9 @@ private:
virtual flat_mutation_reader make_sstable_reader() const = 0;
flat_mutation_reader setup() {
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_cf.schema()));
auto schema = _cf.schema();
auto ssts = make_lw_shared<sstables::sstable_set>(_cf.get_compaction_strategy().make_sstable_set(_schema));
sstring formatted_msg = "[";
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - schema->gc_grace_seconds());
auto fully_expired = get_fully_expired_sstables(_cf, _sstables, gc_clock::now() - _schema->gc_grace_seconds());
for (auto& sst : _sstables) {
// Compacted sstable keeps track of its ancestors.
@@ -416,8 +417,8 @@ private:
}
formatted_msg += "]";
_info->sstables = _sstables.size();
_info->ks_name = schema->ks_name();
_info->cf_name = schema->cf_name();
_info->ks_name = _schema->ks_name();
_info->cf_name = _schema->cf_name();
report_start(formatted_msg);
_compacting = std::move(ssts);
@@ -483,7 +484,7 @@ private:
}
const schema_ptr& schema() const {
return _cf.schema();
return _schema;
}
public:
static future<compaction_info> run(std::unique_ptr<compaction> c);
@@ -546,10 +547,10 @@ public:
}
flat_mutation_reader make_sstable_reader() const override {
return ::make_local_shard_sstable_reader(_cf.schema(),
return ::make_local_shard_sstable_reader(_schema,
_compacting,
query::full_partition_range,
_cf.schema()->full_slice(),
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
@@ -597,7 +598,7 @@ public:
cfg.monitor = &_active_write_monitors.back();
cfg.large_data_handler = _cf.get_large_data_handler();
cfg.run_identifier = _run_identifier;
_writer.emplace(_sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, get_encoding_stats(), priority));
_writer.emplace(_sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats(), priority));
}
do_pending_replacements();
return &*_writer;
@@ -626,7 +627,7 @@ private:
_unreplaced_new_tables.push_back(_sst);
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
auto not_exhausted = [s = _cf.schema(), &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) {
auto not_exhausted = [s = _schema, &dk = _sst->get_last_decorated_key()] (shared_sstable& sst) {
return sst->get_last_decorated_key().tri_compare(*s, dk) > 0;
};
auto exhausted = std::partition(_sstables.begin(), _sstables.end(), not_exhausted);
@@ -722,7 +723,7 @@ public:
}
std::function<bool(const dht::decorated_key&)> filter_func() const override {
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_cf.schema()->ks_name());
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) {
if (dht::shard_of(dk.token()) != engine().cpu_id()) {
@@ -798,10 +799,10 @@ public:
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader() const override {
return ::make_range_sstable_reader(_cf.schema(),
return ::make_range_sstable_reader(_schema,
_compacting,
query::full_partition_range,
_cf.schema()->full_slice(),
_schema->full_slice(),
service::get_local_compaction_priority(),
no_resource_tracking(),
nullptr,
@@ -832,7 +833,7 @@ public:
cfg.max_sstable_size = _max_sstable_size;
cfg.large_data_handler = _cf.get_large_data_handler();
auto&& priority = service::get_local_compaction_priority();
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard));
writer.emplace(sst->get_writer(*_schema, partitions_per_sstable(_shard), cfg, get_encoding_stats(), priority, _shard));
}
return &*writer;
}