From 012fd41fc045be04f78dcf0fe390e11fc4ed3888 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 27 Aug 2015 15:55:36 +0300 Subject: [PATCH] db: hard dirty memory limit Unlike cache, dirty memory cannot be evicted at will, so we must limit it. This patch establishes a hard limit of 50% of all memory. Above that, new requests are not allowed to start. This allows the system some time to clean up memory. Note that we will need more fine-grained bandwidth control than this; the hard limit is the last line of defense against running our of reclaimable memory. Tested with a mixed read/write load; after reads start to dominate writes (due to the proliferation of small sstables, and the inability of compaction to keep up, dirty memory usage starts to climb until the hard stop prevents it from climbing further and ooming the server). --- database.cc | 49 +++++++++++++++++++++++++++++++++++++++++++------ database.hh | 8 +++++++- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/database.cc b/database.cc index 5070c7b4c1..c8b350350c 100644 --- a/database.cc +++ b/database.cc @@ -684,6 +684,10 @@ database::database(const db::config& cfg) : _cfg(std::make_unique(cfg)) , _version(empty_version) { + _memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; + if (!_memtable_total_space) { + _memtable_total_space = memory::stats().total_memory() / 2; + } bool durable = cfg.data_file_directories().size() > 0; db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing()); // Start compaction manager with two tasks for handling compaction jobs. @@ -1262,7 +1266,7 @@ future<> database::apply_in_memory(const frozen_mutation& m, const db::replay_po return make_ready_future<>(); } -future<> database::apply(const frozen_mutation& m) { +future<> database::do_apply(const frozen_mutation& m) { // I'm doing a nullcheck here since the init code path for db etc // is a little in flux and commitlog is created only when db is // initied from datadir. @@ -1288,6 +1292,43 @@ future<> database::apply(const frozen_mutation& m) { return apply_in_memory(m, db::replay_position()); } +future<> database::throttle() { + if (_dirty_memory_region_group.memory_used() < _memtable_total_space + && _throttled_requests.empty()) { + // All is well, go ahead + return make_ready_future<>(); + } + // We must throttle, wait a bit + if (_throttled_requests.empty()) { + _throttling_timer.arm_periodic(10ms); + } + _throttled_requests.emplace_back(); + return _throttled_requests.back().get_future(); +} + +void database::unthrottle() { + // Release one request per free 1MB we have + // FIXME: improve this + if (_dirty_memory_region_group.memory_used() >= _memtable_total_space) { + return; + } + size_t avail = (_memtable_total_space - _dirty_memory_region_group.memory_used()) >> 20; + avail = std::min(_throttled_requests.size(), avail); + for (size_t i = 0; i < avail; ++i) { + _throttled_requests.front().set_value(); + _throttled_requests.pop_front(); + } + if (_throttled_requests.empty()) { + _throttling_timer.cancel(); + } +} + +future<> database::apply(const frozen_mutation& m) { + return throttle().then([this, &m] { + return do_apply(m); + }); +} + keyspace::config database::make_keyspace_config(const keyspace_metadata& ksm) { // FIXME support multiple directories @@ -1298,11 +1339,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { cfg.enable_disk_reads = true; // we allways read from disk cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store(); cfg.enable_cache = _cfg->enable_cache(); - auto memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; - if (!memtable_total_space) { - memtable_total_space = memory::stats().total_memory() / 2; - } - cfg.max_memtable_size = memtable_total_space * _cfg->memtable_cleanup_threshold(); + cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold(); } else { cfg.datadir = ""; cfg.enable_disk_writes = false; diff --git a/database.hh b/database.hh index 8d143b0e8f..8507f07d58 100644 --- a/database.hh +++ b/database.hh @@ -378,10 +378,13 @@ class database { std::unordered_map, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid; std::unique_ptr _commitlog; std::unique_ptr _cfg; + size_t _memtable_total_space = 500 << 20; utils::UUID _version; // compaction_manager object is referenced by all column families of a database. compaction_manager _compaction_manager; std::vector _collectd; + timer<> _throttling_timer{[this] { unthrottle(); }}; + circular_buffer> _throttled_requests; future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation&, const db::replay_position&); @@ -395,13 +398,16 @@ private: void create_in_memory_keyspace(const lw_shared_ptr& ksm); friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only); void setup_collectd(); + future<> throttle(); + future<> do_apply(const frozen_mutation&); + void unthrottle(); public: static utils::UUID empty_version; future<> parse_system_tables(distributed&); database(); database(const db::config&); - database(database&&) = default; + database(database&&) = delete; ~database(); void update_version(const utils::UUID& version);