db: restrict replica read concurrency

Since reading mutations can consume a large amount of memory, which, moreover,
is not predicatable at the time the read is initiated, restrict the number
of reads to 100 per shard.  This is more than enough to saturate the disk,
and hopefully enough to prevent allocation failures.

Restriction is applied in column_family::make_sstable_reader(), which is
called either on a cache miss or if the cache is disabled.  This allows
cached reads to proceed without restriction, since their memory usage is
supposedly low.

Reads from the system keyspace use a separate semaphore, to prevent
user reads from blocking system reads.  Perhaps we should select the
semaphore based on the source of the read rather than the keyspace,
but for now using the keyspace is sufficient.
This commit is contained in:
Avi Kivity
2016-06-19 16:30:30 +03:00
parent bea7d7ee94
commit edeef03b34
Notes: Avi Kivity 2016-06-27 19:51:24 +03:00
backport: 1.1, 1.2
3 changed files with 24 additions and 2 deletions

View File

@@ -256,15 +256,24 @@ column_family::make_sstable_reader(schema_ptr s,
const query::partition_range& pr,
query::clustering_key_filtering_context ck_filtering,
const io_priority_class& pc) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_sem) {
return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in));
} else {
return std::move(in);
}
};
if (pr.is_singular() && pr.start()->value().has_key()) {
const dht::ring_position& pos = pr.start()->value();
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return make_empty_reader(); // range doesn't belong to this shard
}
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc);
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc));
} else {
// range_sstable_reader is not movable so we need to wrap it
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc);
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc));
}
}
@@ -1732,6 +1741,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
cfg.read_concurrency_sem = _config.read_concurreny_sem;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
@@ -2193,6 +2203,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
}
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
cfg.read_concurreny_sem = &_read_concurrency_sem;
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;

View File

@@ -263,6 +263,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
semaphore* read_concurrency_sem = nullptr;
::cf_stats* cf_stats = nullptr;
};
struct no_commitlog {};
@@ -789,6 +790,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
semaphore* read_concurreny_sem = nullptr;
::cf_stats* cf_stats = nullptr;
};
private:
@@ -868,6 +870,8 @@ public:
class database {
::cf_stats _cf_stats;
static constexpr size_t max_concurrent_reads() { return 100; }
static constexpr size_t max_system_concurrent_reads() { return 10; }
struct db_stats {
uint64_t total_writes = 0;
uint64_t total_reads = 0;
@@ -880,6 +884,8 @@ class database {
size_t _streaming_memtable_total_space = 500 << 20;
logalloc::region_group _dirty_memory_region_group;
logalloc::region_group _streaming_dirty_memory_region_group;
semaphore _read_concurrency_sem{max_concurrent_reads()};
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
@@ -1025,6 +1031,9 @@ public:
std::unordered_set<sstring> get_initial_tokens();
std::experimental::optional<gms::inet_address> get_replace_address();
bool is_replacing();
semaphore& system_keyspace_read_concurrency_sem() {
return _system_read_concurrency_sem;
}
};
// FIXME: stub

View File

@@ -1022,6 +1022,8 @@ void make(database& db, bool durable, bool volatile_testing_only) {
kscfg.enable_disk_writes = !volatile_testing_only;
kscfg.enable_commitlog = !volatile_testing_only;
kscfg.enable_cache = true;
// don't make system keyspace reads wait for user reads
kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem();
keyspace _ks{ksm, std::move(kscfg)};
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
_ks.set_replication_strategy(std::move(rs));