Compaction: propagate metadata replay position from compacted tables

This commit is contained in:
Calle Wilund
2015-08-24 19:15:20 +02:00
parent 71204648fb
commit cfcfa34028
2 changed files with 16 additions and 0 deletions

View File

@@ -60,6 +60,8 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
assert(sstables.size() > 0);
db::replay_position rp;
for (auto sst : sstables) {
// We also capture the sstable, so we keep it alive while the read isn't done
readers.emplace_back([sst, r = make_lw_shared(sst->read_rows(schema))] () mutable { return r->read(); });
@@ -73,6 +75,14 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
// FIXME: get sstable level
sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), 0);
stats->start_size += sst->data_size();
// TODO:
// Note that this is not fully correct. Since we might be merging sstables that originated on
// another shard (#cpu changed), we might be comparing RP:s with differing shard ids,
// which might vary in "comparable" size quite a bit. However, since the worst that happens
// is that we might miss a high water mark for the commit log replayer,
// this is kind of ok, esp. since we will hopefully not be trying to recover based on
// compacted sstables anyway (CL should be clean by then).
rp = std::max(rp, sst->get_stats_metadata().position);
}
sstable_logger_msg += "]";
stats->sstables = sstables.size();
@@ -111,6 +121,8 @@ future<> compact_sstables(std::vector<shared_sstable> sstables,
return output_reader->read();
};
newtab->get_metadata_collector().set_replay_position(rp);
future<> write_done = newtab->write_components(
std::move(mutation_queue_reader), estimated_partitions, schema).then([newtab, stats, start_time] {
return newtab->load().then([newtab, stats, start_time] {

View File

@@ -244,6 +244,10 @@ public:
const sstring get_filename() {
return filename(component_type::Data);
}
metadata_collector& get_metadata_collector() {
return _collector;
}
private:
void do_write_components(::mutation_reader mr,
uint64_t estimated_partitions, schema_ptr schema, file_writer& out);