db: hard dirty memory limit

Unlike cache, dirty memory cannot be evicted at will, so we must limit it.

This patch establishes a hard limit of 50% of all memory.  Above that,
new requests are not allowed to start.  This allows the system some time
to clean up memory.

Note that we will need more fine-grained bandwidth control than this;
the hard limit is the last line of defense against running our of reclaimable
memory.

Tested with a mixed read/write load; after reads start to dominate writes
(due to the proliferation of small sstables, and the inability of compaction
to keep up, dirty memory usage starts to climb until the hard stop prevents
it from climbing further and ooming the server).
This commit is contained in:
Avi Kivity
2015-08-27 15:55:36 +03:00
committed by Tomasz Grabiec
parent f171d71c16
commit 012fd41fc0
2 changed files with 50 additions and 7 deletions

View File

@@ -684,6 +684,10 @@ database::database(const db::config& cfg)
: _cfg(std::make_unique<db::config>(cfg))
, _version(empty_version)
{
_memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20;
if (!_memtable_total_space) {
_memtable_total_space = memory::stats().total_memory() / 2;
}
bool durable = cfg.data_file_directories().size() > 0;
db::system_keyspace::make(*this, durable, _cfg->volatile_system_keyspace_for_testing());
// Start compaction manager with two tasks for handling compaction jobs.
@@ -1262,7 +1266,7 @@ future<> database::apply_in_memory(const frozen_mutation& m, const db::replay_po
return make_ready_future<>();
}
future<> database::apply(const frozen_mutation& m) {
future<> database::do_apply(const frozen_mutation& m) {
// I'm doing a nullcheck here since the init code path for db etc
// is a little in flux and commitlog is created only when db is
// initied from datadir.
@@ -1288,6 +1292,43 @@ future<> database::apply(const frozen_mutation& m) {
return apply_in_memory(m, db::replay_position());
}
future<> database::throttle() {
if (_dirty_memory_region_group.memory_used() < _memtable_total_space
&& _throttled_requests.empty()) {
// All is well, go ahead
return make_ready_future<>();
}
// We must throttle, wait a bit
if (_throttled_requests.empty()) {
_throttling_timer.arm_periodic(10ms);
}
_throttled_requests.emplace_back();
return _throttled_requests.back().get_future();
}
void database::unthrottle() {
// Release one request per free 1MB we have
// FIXME: improve this
if (_dirty_memory_region_group.memory_used() >= _memtable_total_space) {
return;
}
size_t avail = (_memtable_total_space - _dirty_memory_region_group.memory_used()) >> 20;
avail = std::min(_throttled_requests.size(), avail);
for (size_t i = 0; i < avail; ++i) {
_throttled_requests.front().set_value();
_throttled_requests.pop_front();
}
if (_throttled_requests.empty()) {
_throttling_timer.cancel();
}
}
future<> database::apply(const frozen_mutation& m) {
return throttle().then([this, &m] {
return do_apply(m);
});
}
keyspace::config
database::make_keyspace_config(const keyspace_metadata& ksm) {
// FIXME support multiple directories
@@ -1298,11 +1339,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.enable_disk_reads = true; // we allways read from disk
cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store();
cfg.enable_cache = _cfg->enable_cache();
auto memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20;
if (!memtable_total_space) {
memtable_total_space = memory::stats().total_memory() / 2;
}
cfg.max_memtable_size = memtable_total_space * _cfg->memtable_cleanup_threshold();
cfg.max_memtable_size = _memtable_total_space * _cfg->memtable_cleanup_threshold();
} else {
cfg.datadir = "";
cfg.enable_disk_writes = false;

View File

@@ -378,10 +378,13 @@ class database {
std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash> _ks_cf_to_uuid;
std::unique_ptr<db::commitlog> _commitlog;
std::unique_ptr<db::config> _cfg;
size_t _memtable_total_space = 500 << 20;
utils::UUID _version;
// compaction_manager object is referenced by all column families of a database.
compaction_manager _compaction_manager;
std::vector<scollectd::registration> _collectd;
timer<> _throttling_timer{[this] { unthrottle(); }};
circular_buffer<promise<>> _throttled_requests;
future<> init_commitlog();
future<> apply_in_memory(const frozen_mutation&, const db::replay_position&);
@@ -395,13 +398,16 @@ private:
void create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only);
void setup_collectd();
future<> throttle();
future<> do_apply(const frozen_mutation&);
void unthrottle();
public:
static utils::UUID empty_version;
future<> parse_system_tables(distributed<service::storage_proxy>&);
database();
database(const db::config&);
database(database&&) = default;
database(database&&) = delete;
~database();
void update_version(const utils::UUID& version);