From c317391f6254236186f7ec06b13d732cfe9d2ec6 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 12:59:52 +0300 Subject: [PATCH 01/10] db: trigger memtable flush based on actual memory usage Rather than using _mutation_count as a poor proxy. --- database.hh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/database.hh b/database.hh index 090852f050..9f7eaefc3a 100644 --- a/database.hh +++ b/database.hh @@ -88,6 +88,7 @@ public: bool enable_disk_reads = true; bool enable_cache = true; bool enable_commitlog = true; + size_t max_memtable_size = 5'000'000; }; struct no_commitlog {}; struct stats { @@ -475,8 +476,10 @@ column_family::apply(const mutation& m, const db::replay_position& rp) { inline void column_family::seal_on_overflow() { - // FIXME: something better - if (++_mutation_count == 100000) { + ++_mutation_count; + if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) { + // FIXME: if sparse, do some in-memory compaction first + // FIXME: maybe merge with other in-memory memtables _mutation_count = 0; seal_active_memtable(); } From 7b67b0482240a750b9fd4de799134162e5b7ee00 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 13:15:31 +0300 Subject: [PATCH 02/10] db: wire up max memtable size configuration --- database.cc | 8 ++++++++ database.hh | 1 + db/config.hh | 4 ++-- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/database.cc b/database.cc index 1d9b084557..71ebd3f86d 100644 --- a/database.cc +++ b/database.cc @@ -955,6 +955,8 @@ keyspace::make_column_family_config(const schema& s) const { cfg.enable_disk_writes = _config.enable_disk_writes; cfg.enable_commitlog = _config.enable_commitlog; cfg.enable_cache = _config.enable_cache; + cfg.max_memtable_size = _config.max_memtable_size; + return cfg; } @@ -1222,12 +1224,18 @@ database::make_keyspace_config(const keyspace_metadata& ksm) const { cfg.enable_disk_reads = true; // we allways read from disk cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store(); cfg.enable_cache = _cfg->enable_cache(); + auto memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; + if (!memtable_total_space) { + memtable_total_space = memory::stats().total_memory() / 2; + } + cfg.max_memtable_size = memtable_total_space * _cfg->memtable_cleanup_threshold(); } else { cfg.datadir = ""; cfg.enable_disk_writes = false; cfg.enable_disk_reads = false; cfg.enable_commitlog = false; cfg.enable_cache = false; + cfg.max_memtable_size = std::numeric_limits::max(); } return cfg; } diff --git a/database.hh b/database.hh index 9f7eaefc3a..a99872124c 100644 --- a/database.hh +++ b/database.hh @@ -308,6 +308,7 @@ public: bool enable_disk_reads = true; bool enable_disk_writes = true; bool enable_cache = true; + size_t max_memtable_size = 5'000'000; }; private: std::unique_ptr _replication_strategy; diff --git a/db/config.hh b/db/config.hh index 519ecec07c..6f256a678e 100644 --- a/db/config.hh +++ b/db/config.hh @@ -220,7 +220,7 @@ public: "Related information: Configuring compaction" \ ) \ /* Common memtable settings */ \ - val(memtable_total_space_in_mb, uint32_t, 0, Unused, \ + val(memtable_total_space_in_mb, uint32_t, 0, Used, \ "Specifies the total memory used for all memtables on a node. This replaces the per-table storage settings memtable_operations_in_millions and memtable_throughput_in_mb." \ ) \ /* Common disk settings */ \ @@ -297,7 +297,7 @@ public: "\toffheap_buffers Off heap (direct) NIO buffers.\n" \ "\toffheap_objects Native memory, eliminating NIO buffer heap overhead." \ ) \ - val(memtable_cleanup_threshold, double, .11, Unused, \ + val(memtable_cleanup_threshold, double, .11, Used, \ "Ratio of occupied non-flushing memtable size to total permitted size for triggering a flush of the largest memtable. Larger values mean larger flushes and less compaction, but also less concurrent flush activity, which can make it difficult to keep your disks saturated under heavy write load." \ ) \ val(file_cache_size_in_mb, uint32_t, 512, Unused, \ From 71aad57ca8d23a21d8376403e6755eaa282094dc Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 14:33:23 +0300 Subject: [PATCH 03/10] lsa: make region::impl a top-level class Makes using forward declarations possible. --- utils/logalloc.cc | 14 +++++++------- utils/logalloc.hh | 4 +++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/utils/logalloc.cc b/utils/logalloc.cc index 4c383b0b57..6f7fb0672f 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -382,7 +382,7 @@ segment::heap_handle() { // Per-segment metadata is kept in a separate array, managed by segment_pool // object. // -class region::impl : public allocation_strategy { +class region_impl : public allocation_strategy { static constexpr float max_occupancy_for_compaction = 0.85; // FIXME: make configurable static constexpr size_t max_managed_object_size = segment::size * 0.1; @@ -584,13 +584,13 @@ private: return id.fetch_add(1); } public: - impl() + region_impl() : _id(next_id()) { tracker_instance._impl->register_region(this); } - virtual ~impl() { + virtual ~region_impl() { tracker_instance._impl->unregister_region(this); assert(_segments.empty()); @@ -600,8 +600,8 @@ public: } } - impl(impl&&) = delete; - impl(const impl&) = delete; + region_impl(region_impl&&) = delete; + region_impl(const region_impl&) = delete; bool empty() const { return occupancy().used_space() == 0; @@ -673,7 +673,7 @@ public: // Merges another region into this region. The other region is left empty. // Doesn't invalidate references to allocated objects. - void merge(region::impl& other) { + void merge(region_impl& other) { if (_active && _active->is_empty()) { shard_segment_pool.free_segment(_active); _active = nullptr; @@ -817,7 +817,7 @@ occupancy_stats tracker::impl::occupancy() const { void tracker::impl::full_compaction() { logger.debug("Full compaction on all regions, {}", occupancy()); - for (region::impl* r : _regions) { + for (region_impl* r : _regions) { if (r->is_compactible()) { r->full_compaction(); } diff --git a/utils/logalloc.hh b/utils/logalloc.hh index c7b287ffe6..50f88e2736 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -12,6 +12,7 @@ namespace logalloc { struct occupancy_stats; +class region_impl; using eviction_fn = std::function; @@ -23,6 +24,7 @@ private: std::unique_ptr _impl; memory::reclaimer _reclaimer; friend class region; + friend class region_impl; public: tracker(); ~tracker(); @@ -123,7 +125,7 @@ public: // class region { public: - class impl; + using impl = region_impl; private: std::unique_ptr _impl; public: From 9ed2bbb25c6779a471bcd8433e6987c7f20f3007 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 15:35:07 +0300 Subject: [PATCH 04/10] lsa: introduce region_group A region_group is a nestable group of regions, for cumulative statistics purposes. --- utils/logalloc.cc | 101 ++++++++++++++++++++++++++++++++++++++++++---- utils/logalloc.hh | 48 +++++++++++++++++++++- 2 files changed, 140 insertions(+), 9 deletions(-) diff --git a/utils/logalloc.cc b/utils/logalloc.cc index 6f7fb0672f..96f7a1ca7f 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -3,6 +3,7 @@ */ #include +#include #include #include @@ -494,6 +495,7 @@ class region_impl : public allocation_strategy { } } __attribute__((packed)); private: + region_group* _group = nullptr; segment* _active = nullptr; size_t _active_offset; segment_heap _segments; // Contains only closed segments @@ -507,7 +509,7 @@ private: assert(alignment < obj_flags::max_alignment); if (!_active) { - _active = shard_segment_pool.new_segment(); + _active = new_segment(); _active_offset = 0; } @@ -563,17 +565,32 @@ private: _active = nullptr; } + void free_segment(segment* seg) { + shard_segment_pool.free_segment(seg); + if (_group) { + _group->update(-segment::size); + } + } + + segment* new_segment() { + segment* seg = shard_segment_pool.new_segment(); + if (_group) { + _group->update(segment::size); + } + return seg; + } + void compact(segment* seg) { for_each_live(seg, [this] (object_descriptor* desc, void* obj) { auto dst = alloc_small(desc->migrator(), desc->size(), desc->alignment()); desc->migrator()(obj, dst, desc->size()); }); - shard_segment_pool.free_segment(seg); + free_segment(seg); } void close_and_open() { - segment* new_active = shard_segment_pool.new_segment(); + segment* new_active = new_segment(); close_active(); _active = new_active; _active_offset = 0; @@ -583,11 +600,30 @@ private: static std::atomic id{0}; return id.fetch_add(1); } + struct degroup_temporarily { + region_impl* impl; + region_group* group; + explicit degroup_temporarily(region_impl* impl) + : impl(impl), group(impl->_group) { + if (group) { + group->del(impl); + } + } + ~degroup_temporarily() { + if (group) { + group->add(impl); + } + } + }; + public: - region_impl() - : _id(next_id()) + explicit region_impl(region_group* group = nullptr) + : _group(group), _id(next_id()) { tracker_instance._impl->register_region(this); + if (group) { + group->add(this); + } } virtual ~region_impl() { @@ -596,7 +632,10 @@ public: assert(_segments.empty()); if (_active) { assert(_active->is_empty()); - shard_segment_pool.free_segment(_active); + free_segment(_active); + } + if (_group) { + _group->del(this); } } @@ -663,7 +702,7 @@ public: if (seg != _active) { if (seg_desc.is_empty()) { _segments.erase(seg_desc.heap_handle()); - shard_segment_pool.free_segment(seg, seg_desc); + free_segment(seg); } else { _closed_occupancy += seg_desc.occupancy(); _segments.decrease(seg_desc.heap_handle()); @@ -674,6 +713,9 @@ public: // Merges another region into this region. The other region is left empty. // Doesn't invalidate references to allocated objects. void merge(region_impl& other) { + degroup_temporarily dgt1(this); + degroup_temporarily dgt2(&other); + if (_active && _active->is_empty()) { shard_segment_pool.free_segment(_active); _active = nullptr; @@ -768,12 +810,17 @@ public: _evictable = true; _eviction_fn = std::move(fn); } + friend class region_group; }; region::region() : _impl(std::make_unique()) { } +region::region(region_group& group) + : _impl(std::make_unique(&group)) { +} + region::~region() { } @@ -968,4 +1015,44 @@ void tracker::impl::register_collectd_metrics() { }); } +region_group::region_group(region_group&& o) noexcept + : _parent(o._parent), _total_memory(o._total_memory) + , _subgroups(std::move(o._subgroups)), _regions(std::move(o._regions)) { + if (_parent) { + _parent->del(&o); + _parent->add(this); + } + o._total_memory = 0; + for (auto&& sg : _subgroups) { + sg->_parent = this; + } + for (auto&& r : _regions) { + r->_group = this; + } +} + +void +region_group::add(region_group* child) { + _subgroups.push_back(child); + update(child->_total_memory); +} + +void +region_group::del(region_group* child) { + _subgroups.erase(boost::range::remove(_subgroups, child), _subgroups.end()); + update(-child->_total_memory); +} + +void +region_group::add(region_impl* child) { + _regions.push_back(child); + update(child->occupancy().total_space()); +} + +void +region_group::del(region_impl* child) { + _regions.erase(boost::range::remove(_regions, child), _regions.end()); + update(-child->occupancy().total_space()); +} + } diff --git a/utils/logalloc.hh b/utils/logalloc.hh index 50f88e2736..36a8b53808 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -12,10 +12,51 @@ namespace logalloc { struct occupancy_stats; +class region; class region_impl; using eviction_fn = std::function; +// Groups regions for the purpose of statistics. Can be nested. +class region_group { + region_group* _parent = nullptr; + size_t _total_memory = 0; + std::vector _subgroups; + std::vector _regions; +public: + region_group() = default; + region_group(region_group* parent) : _parent(parent) { + if (_parent) { + _parent->add(this); + } + } + region_group(region_group&& o) noexcept; + region_group(const region_group&) = delete; + ~region_group() { + if (_parent) { + _parent->del(this); + } + } + region_group& operator=(const region_group&) = delete; + region_group& operator=(region_group&&) = delete; + size_t memory_used() const { + return _total_memory; + } + void update(ssize_t delta) { + auto rg = this; + while (rg) { + rg->_total_memory += delta; + rg = rg->_parent; + } + } +private: + void add(region_group* child); + void del(region_group* child); + void add(region_impl* child); + void del(region_impl* child); + friend class region_impl; +}; + // Controller for all LSA regions. There's one per shard. class tracker { public: @@ -130,9 +171,10 @@ private: std::unique_ptr _impl; public: region(); + explicit region(region_group& group); ~region(); - region(region&& other) = default; - region& operator=(region&& other) = default; + region(region&& other); + region& operator=(region&& other); region(const region& other) = delete; occupancy_stats occupancy() const; @@ -156,6 +198,8 @@ public: // when data from this region needs to be evicted in order to reclaim space. // The function should free some space from this region. void make_evictable(eviction_fn); + + friend class region_group; }; } From c175025bb6eb9196e0b4a95c0877526653ecff6b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 15:36:06 +0300 Subject: [PATCH 05/10] db: place all memtables into a single region_group We can use this to track the amount of unevictable memory in the system. --- database.cc | 6 ++++-- database.hh | 5 ++++- memtable.cc | 5 +++-- memtable.hh | 2 +- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/database.cc b/database.cc index 71ebd3f86d..7dcdd6c861 100644 --- a/database.cc +++ b/database.cc @@ -382,7 +382,7 @@ void column_family::add_sstable(sstables::sstable&& sstable) { void column_family::add_memtable() { // allow in-progress reads to continue using old list _memtables = make_lw_shared(memtable_list(*_memtables)); - _memtables->emplace_back(make_lw_shared(_schema)); + _memtables->emplace_back(make_lw_shared(_schema, _config.dirty_memory_region_group)); } future<> @@ -956,6 +956,7 @@ keyspace::make_column_family_config(const schema& s) const { cfg.enable_commitlog = _config.enable_commitlog; cfg.enable_cache = _config.enable_cache; cfg.max_memtable_size = _config.max_memtable_size; + cfg.dirty_memory_region_group = _config.dirty_memory_region_group; return cfg; } @@ -1215,7 +1216,7 @@ future<> database::apply(const frozen_mutation& m) { } keyspace::config -database::make_keyspace_config(const keyspace_metadata& ksm) const { +database::make_keyspace_config(const keyspace_metadata& ksm) { // FIXME support multiple directories keyspace::config cfg; if (_cfg->data_file_directories().size() > 0) { @@ -1237,6 +1238,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) const { cfg.enable_cache = false; cfg.max_memtable_size = std::numeric_limits::max(); } + cfg.dirty_memory_region_group = &_dirty_memory_region_group; return cfg; } diff --git a/database.hh b/database.hh index a99872124c..1927b83fde 100644 --- a/database.hh +++ b/database.hh @@ -89,6 +89,7 @@ public: bool enable_cache = true; bool enable_commitlog = true; size_t max_memtable_size = 5'000'000; + logalloc::region_group* dirty_memory_region_group = nullptr; }; struct no_commitlog {}; struct stats { @@ -309,6 +310,7 @@ public: bool enable_disk_writes = true; bool enable_cache = true; size_t max_memtable_size = 5'000'000; + logalloc::region_group* dirty_memory_region_group = nullptr; }; private: std::unique_ptr _replication_strategy; @@ -365,6 +367,7 @@ class database { utils::UUID _version; // compaction_manager object is referenced by all column families of a database. compaction_manager _compaction_manager; + logalloc::region_group _dirty_memory_region_group; future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation&, const db::replay_position&); @@ -441,7 +444,7 @@ public: future> query(const query::read_command& cmd, const std::vector& ranges); future query_mutations(const query::read_command& cmd, const query::partition_range& range); future<> apply(const frozen_mutation&); - keyspace::config make_keyspace_config(const keyspace_metadata& ksm) const; + keyspace::config make_keyspace_config(const keyspace_metadata& ksm); const sstring& get_snitch_name() const; friend std::ostream& operator<<(std::ostream& out, const database& db); diff --git a/memtable.cc b/memtable.cc index 63e7b0e599..17d525e128 100644 --- a/memtable.cc +++ b/memtable.cc @@ -7,9 +7,10 @@ namespace stdx = std::experimental; -memtable::memtable(schema_ptr schema) +memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group) : _schema(std::move(schema)) - , partitions(partition_entry::compare(_schema)) { + , partitions(partition_entry::compare(_schema)) + , _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) { } memtable::~memtable() { diff --git a/memtable.hh b/memtable.hh index 8b02f67c62..3ce759bed7 100644 --- a/memtable.hh +++ b/memtable.hh @@ -81,7 +81,7 @@ private: private: boost::iterator_range slice(const query::partition_range& r) const; public: - explicit memtable(schema_ptr schema); + explicit memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group = nullptr); ~memtable(); schema_ptr schema() const { return _schema; } mutation_partition& find_or_create_partition(const dht::decorated_key& key); From 5bf5476beb946b1e3afba5e64211c6d910e113d6 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 17:03:29 +0300 Subject: [PATCH 06/10] db: add collectd counter for dirty memory --- database.cc | 12 ++++++++++++ database.hh | 3 ++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/database.cc b/database.cc index 7dcdd6c861..6e2e646a04 100644 --- a/database.cc +++ b/database.cc @@ -643,6 +643,18 @@ database::database(const db::config& cfg) db::system_keyspace::make(*this, durable); // Start compaction manager with two tasks for handling compaction jobs. _compaction_manager.start(2); + setup_collectd(); +} + +void +database::setup_collectd() { + _collectd.push_back( + scollectd::add_polled_metric(scollectd::type_instance_id("memory" + , scollectd::per_cpu_plugin_instance + , "bytes", "dirty") + , scollectd::make_typed(scollectd::data_type::GAUGE, [this] { + return _dirty_memory_region_group.memory_used(); + }))); } database::~database() { diff --git a/database.hh b/database.hh index 1927b83fde..1bcc9489b2 100644 --- a/database.hh +++ b/database.hh @@ -368,6 +368,7 @@ class database { // compaction_manager object is referenced by all column families of a database. compaction_manager _compaction_manager; logalloc::region_group _dirty_memory_region_group; + std::vector _collectd; future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation&, const db::replay_position&); @@ -380,7 +381,7 @@ private: void add_keyspace(sstring name, keyspace k); void create_in_memory_keyspace(const lw_shared_ptr& ksm); friend void db::system_keyspace::make(database& db, bool durable); - + void setup_collectd(); public: static utils::UUID empty_version; From 68469095335f2cd953a46da6964d2d568efcb5ab Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 17:44:31 +0300 Subject: [PATCH 07/10] db: extract sstable flushing code to a function --- database.cc | 91 ++++++++++++++++++++++++++++------------------------- database.hh | 1 + 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/database.cc b/database.cc index 6e2e646a04..93cb43a7e4 100644 --- a/database.cc +++ b/database.cc @@ -422,54 +422,59 @@ column_family::seal_active_memtable() { auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); return seastar::with_gate(_in_flight_seals, [gen, old, this] { - sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(), - _config.datadir, gen, - sstables::sstable::version_types::ka, - sstables::sstable::format_types::big); - - dblog.debug("Flushing to {}", newtab.get_filename()); - return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) { - // FIXME: write all components - return newtab.write_components(*old).then([this, &newtab, old] { - return newtab.load(); - }).then([this, old, &newtab] { - dblog.debug("Flushing done"); - // We must add sstable before we call update_cache(), because - // memtable's data after moving to cache can be evicted at any time. - auto old_sstables = _sstables; - add_sstable(std::move(newtab)); - return update_cache(*old, std::move(old_sstables)); - }).then_wrapped([this, old] (future<> ret) { - try { - ret.get(); - - // FIXME: until the surrounding function returns a future and - // caller ensures ordering (i.e. finish flushing one or more sequential tables before - // doing the discard), this below is _not_ correct, since the use of replay_position - // depends on us reporting the factual highest position we've actually flushed, - // _and_ all positions (for a given UUID) below having been dealt with. - // - // Note that the whole scheme is also dependent on memtables being "allocated" in order, - // i.e. we may not flush a younger memtable before and older, and we need to use the - // highest rp. - if (_commitlog) { - _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); - } - _memtables->erase(boost::range::find(*_memtables, old)); - dblog.debug("Memtable replaced"); - trigger_compaction(); - } catch (std::exception& e) { - dblog.error("failed to write sstable: {}", e.what()); - } catch (...) { - dblog.error("failed to write sstable: unknown error"); - } - }); - }); + return flush_memtable_to_sstable(gen, old); }); // FIXME: release commit log // FIXME: provide back-pressure to upper layers } +future<> +column_family::flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr old) { + sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(), + _config.datadir, gen, + sstables::sstable::version_types::ka, + sstables::sstable::format_types::big); + + dblog.debug("Flushing to {}", newtab.get_filename()); + return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) { + // FIXME: write all components + return newtab.write_components(*old).then([this, &newtab, old] { + return newtab.load(); + }).then([this, old, &newtab] { + dblog.debug("Flushing done"); + // We must add sstable before we call update_cache(), because + // memtable's data after moving to cache can be evicted at any time. + auto old_sstables = _sstables; + add_sstable(std::move(newtab)); + return update_cache(*old, std::move(old_sstables)); + }).then_wrapped([this, old] (future<> ret) { + try { + ret.get(); + + // FIXME: until the surrounding function returns a future and + // caller ensures ordering (i.e. finish flushing one or more sequential tables before + // doing the discard), this below is _not_ correct, since the use of replay_position + // depends on us reporting the factual highest position we've actually flushed, + // _and_ all positions (for a given UUID) below having been dealt with. + // + // Note that the whole scheme is also dependent on memtables being "allocated" in order, + // i.e. we may not flush a younger memtable before and older, and we need to use the + // highest rp. + if (_commitlog) { + _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); + } + _memtables->erase(boost::range::find(*_memtables, old)); + dblog.debug("Memtable replaced"); + trigger_compaction(); + } catch (std::exception& e) { + dblog.error("failed to write sstable: {}", e.what()); + } catch (...) { + dblog.error("failed to write sstable: unknown error"); + } + }); + }); +} + void column_family::start() { // FIXME: add option to disable automatic compaction. diff --git a/database.hh b/database.hh index 1bcc9489b2..6511140cd9 100644 --- a/database.hh +++ b/database.hh @@ -125,6 +125,7 @@ private: void update_stats_for_new_sstable(uint64_t new_sstable_data_size); void add_sstable(sstables::sstable&& sstable); void add_memtable(); + future<> flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr memt); future<> update_cache(memtable&, lw_shared_ptr old_sstables); struct merge_comparator; private: From c01bc16f5885c1d69838995419766b2f631a88e6 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 18:07:00 +0300 Subject: [PATCH 08/10] db: don't give up flushing a memtable on error We must try again, or the memtable's memory will never be reclaimed. --- database.cc | 32 ++++++++++++++++++++++++-------- database.hh | 3 ++- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/database.cc b/database.cc index 93cb43a7e4..cc4fb7a8cd 100644 --- a/database.cc +++ b/database.cc @@ -16,6 +16,7 @@ #include "nway_merger.hh" #include "cql3/column_identifier.hh" #include "core/seastar.hh" +#include #include #include #include "sstables/sstables.hh" @@ -34,6 +35,8 @@ #include "service/storage_service.hh" #include "mutation_query.hh" +using namespace std::chrono_literals; + logging::logger dblog("database"); column_family::column_family(schema_ptr schema, config config, db::commitlog& cl, compaction_manager& compaction_manager) @@ -417,19 +420,19 @@ column_family::seal_active_memtable() { ); _highest_flushed_rp = old->replay_position(); - // FIXME: better way of ensuring we don't attemt to - // overwrite an existing table. - auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); - - return seastar::with_gate(_in_flight_seals, [gen, old, this] { - return flush_memtable_to_sstable(gen, old); + return seastar::with_gate(_in_flight_seals, [old, this] { + return flush_memtable_to_sstable(old); }); // FIXME: release commit log // FIXME: provide back-pressure to upper layers } -future<> -column_family::flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr old) { +future +column_family::try_flush_memtable_to_sstable(lw_shared_ptr old) { + // FIXME: better way of ensuring we don't attemt to + // overwrite an existing table. + auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); + sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(), _config.datadir, gen, sstables::sstable::version_types::ka, @@ -466,11 +469,24 @@ column_family::flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr o _memtables->erase(boost::range::find(*_memtables, old)); dblog.debug("Memtable replaced"); trigger_compaction(); + return make_ready_future(stop_iteration::yes); } catch (std::exception& e) { dblog.error("failed to write sstable: {}", e.what()); } catch (...) { dblog.error("failed to write sstable: unknown error"); } + return sleep(10s).then([] { + return make_ready_future(stop_iteration::no); + }); + }); + }); +} + +future<> +column_family::flush_memtable_to_sstable(lw_shared_ptr memt) { + return repeat([this, memt] { + return seastar::with_gate(_in_flight_seals, [memt, this] { + return try_flush_memtable_to_sstable(memt); }); }); } diff --git a/database.hh b/database.hh index 6511140cd9..3752e64ddb 100644 --- a/database.hh +++ b/database.hh @@ -125,7 +125,8 @@ private: void update_stats_for_new_sstable(uint64_t new_sstable_data_size); void add_sstable(sstables::sstable&& sstable); void add_memtable(); - future<> flush_memtable_to_sstable(uint64_t gen, lw_shared_ptr memt); + future<> flush_memtable_to_sstable(lw_shared_ptr memt); + future try_flush_memtable_to_sstable(lw_shared_ptr memt); future<> update_cache(memtable&, lw_shared_ptr old_sstables); struct merge_comparator; private: From bcff75003ec0f9730a850345e5e72d59768f4160 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 19 Aug 2015 19:06:12 +0300 Subject: [PATCH 09/10] row_cache: yield while moving data to cache If we don't yield, we can run out of memory while moving a memtable into cache. This reduces the chance that writing an sstable will fail because we could not transfer the memtable into the cache. --- row_cache.cc | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/row_cache.cc b/row_cache.cc index 9ace290ab6..859870088b 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -147,31 +147,33 @@ void row_cache::populate(const mutation& m) { future<> row_cache::update(memtable& m, negative_mutation_reader underlying_negative) { _tracker.region().merge(m._region); // Now all data in memtable belongs to cache - with_allocator(_tracker.allocator(), [this, &m, underlying_negative = std::move(underlying_negative)] { - auto i = m.partitions.begin(); - const schema& s = *m.schema(); - while (i != m.partitions.end()) { - partition_entry& mem_e = *i; - // FIXME: Optimize knowing we lookup in-order. - auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema)); - // If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete. - // FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to - // search it. - if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) { - cache_entry& entry = *cache_i; - _tracker.touch(entry); - entry.partition().apply(s, std::move(mem_e.partition())); - } else if (underlying_negative(mem_e.key().key()) == negative_mutation_reader_result::definitely_doesnt_exists) { - cache_entry* entry = current_allocator().construct(mem_e.key(), mem_e.partition()); - _tracker.insert(*entry); - _partitions.insert(cache_i, *entry); + return repeat([this, &m, underlying_negative = std::move(underlying_negative)] () mutable { + return with_allocator(_tracker.allocator(), [this, &m, &underlying_negative] () { + unsigned quota = 30; + auto i = m.partitions.begin(); + const schema& s = *m.schema(); + while (i != m.partitions.end() && quota-- != 0) { + partition_entry& mem_e = *i; + // FIXME: Optimize knowing we lookup in-order. + auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema)); + // If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete. + // FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to + // search it. + if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) { + cache_entry& entry = *cache_i; + _tracker.touch(entry); + entry.partition().apply(s, std::move(mem_e.partition())); + } else if (underlying_negative(mem_e.key().key()) == negative_mutation_reader_result::definitely_doesnt_exists) { + cache_entry* entry = current_allocator().construct(mem_e.key(), mem_e.partition()); + _tracker.insert(*entry); + _partitions.insert(cache_i, *entry); + } + i = m.partitions.erase(i); + current_allocator().destroy(&mem_e); } - i = m.partitions.erase(i); - current_allocator().destroy(&mem_e); - } + return make_ready_future(m.partitions.empty() ? stop_iteration::yes : stop_iteration::no); + }); }); - // FIXME: yield voluntarily every now and then to cap latency. - return make_ready_future<>(); } row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, cache_tracker& tracker) From 9012f991bf0b7eae74ac688d1fa43e72e336e474 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 20 Aug 2015 12:10:03 +0300 Subject: [PATCH 10/10] logalloc: really allow dipping into the emergency pool during reclaim The RAII wrapper for the emergency pool was invoked without an object, and so had no effect. --- utils/logalloc.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/logalloc.cc b/utils/logalloc.cc index 96f7a1ca7f..40c2b2dc1f 100644 --- a/utils/logalloc.cc +++ b/utils/logalloc.cc @@ -938,7 +938,7 @@ size_t tracker::impl::reclaim(size_t bytes) { in_use, in_use * segment::size, segments_to_release, segments_to_release * segment::size); // Allow dipping into reserves while compacting - segment_pool::reservation_goal(shard_segment_pool, 0); + segment_pool::reservation_goal open_emergency_pool(shard_segment_pool, 0); boost::range::make_heap(_regions, cmp);