db: extract sstable flushing code to a function

This commit is contained in:
Avi Kivity
2015-08-19 17:44:31 +03:00
parent 5bf5476beb
commit 6846909533
2 changed files with 49 additions and 43 deletions

View File

@@ -422,54 +422,59 @@ column_family::seal_active_memtable() {
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
return seastar::with_gate(_in_flight_seals, [gen, old, this] {
sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(),
_config.datadir, gen,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
dblog.debug("Flushing to {}", newtab.get_filename());
return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) {
// FIXME: write all components
return newtab.write_components(*old).then([this, &newtab, old] {
return newtab.load();
}).then([this, old, &newtab] {
dblog.debug("Flushing done");
// We must add sstable before we call update_cache(), because
// memtable's data after moving to cache can be evicted at any time.
auto old_sstables = _sstables;
add_sstable(std::move(newtab));
return update_cache(*old, std::move(old_sstables));
}).then_wrapped([this, old] (future<> ret) {
try {
ret.get();
// FIXME: until the surrounding function returns a future and
// caller ensures ordering (i.e. finish flushing one or more sequential tables before
// doing the discard), this below is _not_ correct, since the use of replay_position
// depends on us reporting the factual highest position we've actually flushed,
// _and_ all positions (for a given UUID) below having been dealt with.
//
// Note that the whole scheme is also dependent on memtables being "allocated" in order,
// i.e. we may not flush a younger memtable before and older, and we need to use the
// highest rp.
if (_commitlog) {
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
}
_memtables->erase(boost::range::find(*_memtables, old));
dblog.debug("Memtable replaced");
trigger_compaction();
} catch (std::exception& e) {
dblog.error("failed to write sstable: {}", e.what());
} catch (...) {
dblog.error("failed to write sstable: unknown error");
}
});
});
return flush_memtable_to_sstable(gen, old);
});
// FIXME: release commit log
// FIXME: provide back-pressure to upper layers
}
future<>
column_family::flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr<memtable> old) {
sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(),
_config.datadir, gen,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
dblog.debug("Flushing to {}", newtab.get_filename());
return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) {
// FIXME: write all components
return newtab.write_components(*old).then([this, &newtab, old] {
return newtab.load();
}).then([this, old, &newtab] {
dblog.debug("Flushing done");
// We must add sstable before we call update_cache(), because
// memtable's data after moving to cache can be evicted at any time.
auto old_sstables = _sstables;
add_sstable(std::move(newtab));
return update_cache(*old, std::move(old_sstables));
}).then_wrapped([this, old] (future<> ret) {
try {
ret.get();
// FIXME: until the surrounding function returns a future and
// caller ensures ordering (i.e. finish flushing one or more sequential tables before
// doing the discard), this below is _not_ correct, since the use of replay_position
// depends on us reporting the factual highest position we've actually flushed,
// _and_ all positions (for a given UUID) below having been dealt with.
//
// Note that the whole scheme is also dependent on memtables being "allocated" in order,
// i.e. we may not flush a younger memtable before and older, and we need to use the
// highest rp.
if (_commitlog) {
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
}
_memtables->erase(boost::range::find(*_memtables, old));
dblog.debug("Memtable replaced");
trigger_compaction();
} catch (std::exception& e) {
dblog.error("failed to write sstable: {}", e.what());
} catch (...) {
dblog.error("failed to write sstable: unknown error");
}
});
});
}
void
column_family::start() {
// FIXME: add option to disable automatic compaction.

View File

@@ -125,6 +125,7 @@ private:
void update_stats_for_new_sstable(uint64_t new_sstable_data_size);
void add_sstable(sstables::sstable&& sstable);
void add_memtable();
future<> flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr<memtable> memt);
future<> update_cache(memtable&, lw_shared_ptr<sstable_list> old_sstables);
struct merge_comparator;
private: