db: restrict replica read concurrency
Since reading mutations can consume a large amount of memory, which, moreover, is not predicatable at the time the read is initiated, restrict the number of reads to 100 per shard. This is more than enough to saturate the disk, and hopefully enough to prevent allocation failures. Restriction is applied in column_family::make_sstable_reader(), which is called either on a cache miss or if the cache is disabled. This allows cached reads to proceed without restriction, since their memory usage is supposedly low. Reads from the system keyspace use a separate semaphore, to prevent user reads from blocking system reads. Perhaps we should select the semaphore based on the source of the read rather than the keyspace, but for now using the keyspace is sufficient.
This commit is contained in:
Notes:
Avi Kivity
2016-06-27 19:51:24 +03:00
backport: 1.1, 1.2
15
database.cc
15
database.cc
@@ -256,15 +256,24 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
const query::partition_range& pr,
|
||||
query::clustering_key_filtering_context ck_filtering,
|
||||
const io_priority_class& pc) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
if (_config.read_concurrency_sem) {
|
||||
return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
};
|
||||
|
||||
if (pr.is_singular() && pr.start()->value().has_key()) {
|
||||
const dht::ring_position& pos = pr.start()->value();
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
}
|
||||
return make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc);
|
||||
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(std::move(s), _sstables, *pos.key(), ck_filtering, pc));
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc);
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, ck_filtering, pc));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1732,6 +1741,7 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
|
||||
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
|
||||
cfg.read_concurrency_sem = _config.read_concurreny_sem;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
|
||||
@@ -2193,6 +2203,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
}
|
||||
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
|
||||
cfg.read_concurreny_sem = &_read_concurrency_sem;
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
return cfg;
|
||||
|
||||
@@ -263,6 +263,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
semaphore* read_concurrency_sem = nullptr;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
@@ -789,6 +790,7 @@ public:
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
semaphore* read_concurreny_sem = nullptr;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
private:
|
||||
@@ -868,6 +870,8 @@ public:
|
||||
|
||||
class database {
|
||||
::cf_stats _cf_stats;
|
||||
static constexpr size_t max_concurrent_reads() { return 100; }
|
||||
static constexpr size_t max_system_concurrent_reads() { return 10; }
|
||||
struct db_stats {
|
||||
uint64_t total_writes = 0;
|
||||
uint64_t total_reads = 0;
|
||||
@@ -880,6 +884,8 @@ class database {
|
||||
size_t _streaming_memtable_total_space = 500 << 20;
|
||||
logalloc::region_group _dirty_memory_region_group;
|
||||
logalloc::region_group _streaming_dirty_memory_region_group;
|
||||
semaphore _read_concurrency_sem{max_concurrent_reads()};
|
||||
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
|
||||
|
||||
std::unordered_map<sstring, keyspace> _keyspaces;
|
||||
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;
|
||||
@@ -1025,6 +1031,9 @@ public:
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::experimental::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
semaphore& system_keyspace_read_concurrency_sem() {
|
||||
return _system_read_concurrency_sem;
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME: stub
|
||||
|
||||
@@ -1022,6 +1022,8 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
kscfg.enable_disk_writes = !volatile_testing_only;
|
||||
kscfg.enable_commitlog = !volatile_testing_only;
|
||||
kscfg.enable_cache = true;
|
||||
// don't make system keyspace reads wait for user reads
|
||||
kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem();
|
||||
keyspace _ks{ksm, std::move(kscfg)};
|
||||
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
|
||||
_ks.set_replication_strategy(std::move(rs));
|
||||
|
||||
Reference in New Issue
Block a user