sstables: sstable_writer: optionally set replay position via config

And use compaction::make_sstable_writer_config to pass
the compaction's replay_position (`_rp`) to the writer
via sstable_writer_config, instead of via the sstable
metadata_collector, that is going to move from the sstable
to the write_impl.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2020-07-07 14:49:14 +03:00
parent e314eb3f78
commit ac3c33ffca
3 changed files with 10 additions and 4 deletions

View File

@@ -488,7 +488,6 @@ protected:
_info->new_sstables.push_back(sst);
_new_unused_sstables.push_back(sst);
sst->make_metadata_collector();
sst->get_metadata_collector().set_replay_position(_rp);
sst->get_metadata_collector().sstable_level(_sstable_level);
for (auto ancestor : _ancestors) {
sst->add_ancestor(ancestor);
@@ -506,6 +505,7 @@ protected:
cfg.max_sstable_size = _max_sstable_size;
cfg.monitor = &default_write_monitor();
cfg.run_identifier = _run_identifier;
cfg.replay_position = _rp;
return cfg;
}

View File

@@ -2359,6 +2359,9 @@ sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated
} else {
_impl = std::make_unique<sstable_writer_k_l>(sst, s, estimated_partitions, cfg, pc, shard);
}
if (cfg.replay_position) {
get_metadata_collector().set_replay_position(cfg.replay_position.value());
}
}
void sstable_writer::consume_new_partition(const dht::decorated_key& dk) {
@@ -2399,6 +2402,10 @@ void sstable_writer::consume_end_of_stream() {
return _impl->consume_end_of_stream();
}
metadata_collector& sstable_writer::get_metadata_collector() {
return _impl->_sst.get_metadata_collector();
}
sstable_writer::sstable_writer(sstable_writer&& o) = default;
sstable_writer& sstable_writer::operator=(sstable_writer&& o) = default;
sstable_writer::~sstable_writer() = default;
@@ -2456,9 +2463,6 @@ future<> sstable::write_components(
assert_large_data_handler_is_running();
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc);
if (cfg.replay_position) {
get_metadata_collector().set_replay_position(cfg.replay_position.value());
}
auto validator = mutation_fragment_stream_validating_filter(format("sstable writer {}", get_filename()), *schema,
cfg.validate_keys);
mr.consume_in_thread(std::move(wr), std::move(validator), db::no_timeout);

View File

@@ -921,6 +921,8 @@ public:
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume_end_of_partition();
void consume_end_of_stream();
metadata_collector& get_metadata_collector();
};
future<> init_metrics();