From 68469095335f2cd953a46da6964d2d568efcb5ab Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 17:44:31 +0300 Subject: [PATCH] db: extract sstable flushing code to a function --- database.cc | 91 ++++++++++++++++++++++++++++------------------------- database.hh | 1 + 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/database.cc b/database.cc index 6e2e646a04..93cb43a7e4 100644 --- a/database.cc +++ b/database.cc @@ -422,54 +422,59 @@ column_family::seal_active_memtable() { auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); return seastar::with_gate(_in_flight_seals, [gen, old, this] { - sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(), - _config.datadir, gen, - sstables::sstable::version_types::ka, - sstables::sstable::format_types::big); - - dblog.debug("Flushing to {}", newtab.get_filename()); - return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) { - // FIXME: write all components - return newtab.write_components(*old).then([this, &newtab, old] { - return newtab.load(); - }).then([this, old, &newtab] { - dblog.debug("Flushing done"); - // We must add sstable before we call update_cache(), because - // memtable's data after moving to cache can be evicted at any time. - auto old_sstables = _sstables; - add_sstable(std::move(newtab)); - return update_cache(*old, std::move(old_sstables)); - }).then_wrapped([this, old] (future<> ret) { - try { - ret.get(); - - // FIXME: until the surrounding function returns a future and - // caller ensures ordering (i.e. finish flushing one or more sequential tables before - // doing the discard), this below is _not_ correct, since the use of replay_position - // depends on us reporting the factual highest position we've actually flushed, - // _and_ all positions (for a given UUID) below having been dealt with. - // - // Note that the whole scheme is also dependent on memtables being "allocated" in order, - // i.e. we may not flush a younger memtable before and older, and we need to use the - // highest rp. - if (_commitlog) { - _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); - } - _memtables->erase(boost::range::find(*_memtables, old)); - dblog.debug("Memtable replaced"); - trigger_compaction(); - } catch (std::exception& e) { - dblog.error("failed to write sstable: {}", e.what()); - } catch (...) { - dblog.error("failed to write sstable: unknown error"); - } - }); - }); + return flush_memtable_to_sstable(gen, old); }); // FIXME: release commit log // FIXME: provide back-pressure to upper layers } +future<> +column_family::flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr old) { + sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(), + _config.datadir, gen, + sstables::sstable::version_types::ka, + sstables::sstable::format_types::big); + + dblog.debug("Flushing to {}", newtab.get_filename()); + return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) { + // FIXME: write all components + return newtab.write_components(*old).then([this, &newtab, old] { + return newtab.load(); + }).then([this, old, &newtab] { + dblog.debug("Flushing done"); + // We must add sstable before we call update_cache(), because + // memtable's data after moving to cache can be evicted at any time. + auto old_sstables = _sstables; + add_sstable(std::move(newtab)); + return update_cache(*old, std::move(old_sstables)); + }).then_wrapped([this, old] (future<> ret) { + try { + ret.get(); + + // FIXME: until the surrounding function returns a future and + // caller ensures ordering (i.e. finish flushing one or more sequential tables before + // doing the discard), this below is _not_ correct, since the use of replay_position + // depends on us reporting the factual highest position we've actually flushed, + // _and_ all positions (for a given UUID) below having been dealt with. + // + // Note that the whole scheme is also dependent on memtables being "allocated" in order, + // i.e. we may not flush a younger memtable before and older, and we need to use the + // highest rp. + if (_commitlog) { + _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); + } + _memtables->erase(boost::range::find(*_memtables, old)); + dblog.debug("Memtable replaced"); + trigger_compaction(); + } catch (std::exception& e) { + dblog.error("failed to write sstable: {}", e.what()); + } catch (...) { + dblog.error("failed to write sstable: unknown error"); + } + }); + }); +} + void column_family::start() { // FIXME: add option to disable automatic compaction. diff --git a/database.hh b/database.hh index 1bcc9489b2..6511140cd9 100644 --- a/database.hh +++ b/database.hh @@ -125,6 +125,7 @@ private: void update_stats_for_new_sstable(uint64_t new_sstable_data_size); void add_sstable(sstables::sstable&& sstable); void add_memtable(); + future<> flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr memt); future<> update_cache(memtable&, lw_shared_ptr old_sstables); struct merge_comparator; private: