diff --git a/database.cc b/database.cc index 5070c7b4c1..c8b350350c 100644 --- a/database.cc +++ b/database.cc @@ -684,6 +684,10 @@ database::database(const db::config& cfg) : _cfg(std::make_unique(cfg)) , _version(empty_version) { + _memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; + if (!_memtable_total_space) { + _memtable_total_space = memory::stats().total_memory() / 2; + } bool durable = cfg.data_file_directories().size() > 0; db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing()); // Start compaction manager with two tasks for handling compaction jobs. @@ -1262,7 +1266,7 @@ future<> database::apply_in_memory(const frozen_mutation& m, const db::replay_po return make_ready_future<>(); } -future<> database::apply(const frozen_mutation& m) { +future<> database::do_apply(const frozen_mutation& m) { // I'm doing a nullcheck here since the init code path for db etc // is a little in flux and commitlog is created only when db is // initied from datadir. @@ -1288,6 +1292,43 @@ future<> database::apply(const frozen_mutation& m) { return apply_in_memory(m, db::replay_position()); } +future<> database::throttle() { + if (_dirty_memory_region_group.memory_used() < _memtable_total_space + && _throttled_requests.empty()) { + // All is well, go ahead + return make_ready_future<>(); + } + // We must throttle, wait a bit + if (_throttled_requests.empty()) { + _throttling_timer.arm_periodic(10ms); + } + _throttled_requests.emplace_back(); + return _throttled_requests.back().get_future(); +} + +void database::unthrottle() { + // Release one request per free 1MB we have + // FIXME: improve this + if (_dirty_memory_region_group.memory_used() >= _memtable_total_space) { + return; + } + size_t avail = (_memtable_total_space - _dirty_memory_region_group.memory_used()) >> 20; + avail = std::min(_throttled_requests.size(), avail); + for (size_t i = 0; i < avail; ++i) { + _throttled_requests.front().set_value(); + _throttled_requests.pop_front(); + } + if (_throttled_requests.empty()) { + _throttling_timer.cancel(); + } +} + +future<> database::apply(const frozen_mutation& m) { + return throttle().then([this, &m] { + return do_apply(m); + }); +} + keyspace::config database::make_keyspace_config(const keyspace_metadata& ksm) { // FIXME support multiple directories @@ -1298,11 +1339,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { cfg.enable_disk_reads = true; // we allways read from disk cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store(); cfg.enable_cache = _cfg->enable_cache(); - auto memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20; - if (!memtable_total_space) { - memtable_total_space = memory::stats().total_memory() / 2; - } - cfg.max_memtable_size = memtable_total_space * _cfg->memtable_cleanup_threshold(); + cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold(); } else { cfg.datadir = ""; cfg.enable_disk_writes = false; diff --git a/database.hh b/database.hh index 8d143b0e8f..8507f07d58 100644 --- a/database.hh +++ b/database.hh @@ -378,10 +378,13 @@ class database { std::unordered_map, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid; std::unique_ptr _commitlog; std::unique_ptr _cfg; + size_t _memtable_total_space = 500 << 20; utils::UUID _version; // compaction_manager object is referenced by all column families of a database. compaction_manager _compaction_manager; std::vector _collectd; + timer<> _throttling_timer{[this] { unthrottle(); }}; + circular_buffer> _throttled_requests; future<> init_commitlog(); future<> apply_in_memory(const frozen_mutation&, const db::replay_position&); @@ -395,13 +398,16 @@ private: void create_in_memory_keyspace(const lw_shared_ptr& ksm); friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only); void setup_collectd(); + future<> throttle(); + future<> do_apply(const frozen_mutation&); + void unthrottle(); public: static utils::UUID empty_version; future<> parse_system_tables(distributed&); database(); database(const db::config&); - database(database&&) = default; + database(database&&) = delete; ~database(); void update_version(const utils::UUID& version);