diff --git a/database.cc b/database.cc index 9bdbd0c8fc..69fd597b38 100644 --- a/database.cc +++ b/database.cc @@ -256,15 +256,24 @@ column_family::make_sstable_reader(schema_ptr s, const query::partition_range& pr, query::clustering_key_filtering_context ck_filtering, const io_priority_class& pc) const { + // restricts a reader's concurrency if the configuration specifies it + auto restrict_reader = [&] (mutation_reader&& in) { + if (_config.read_concurrency_sem) { + return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in)); + } else { + return std::move(in); + } + }; + if (pr.is_singular() && pr.start()->value().has_key()) { const dht::ring_position& pos = pr.start()->value(); if (dht::shard_of(pos.token()) != engine().cpu_id()) { return make_empty_reader(); // range doesn't belong to this shard } - return make_mutation_reader(std::move(s), _sstables, *pos.key(), ck_filtering, pc); + return restrict_reader(make_mutation_reader(std::move(s), _sstables, *pos.key(), ck_filtering, pc)); } else { // range_sstable_reader is not movable so we need to wrap it - return make_mutation_reader(std::move(s), _sstables, pr, ck_filtering, pc); + return restrict_reader(make_mutation_reader(std::move(s), _sstables, pr, ck_filtering, pc)); } } @@ -1732,6 +1741,7 @@ keyspace::make_column_family_config(const schema& s) const { cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; cfg.dirty_memory_region_group = _config.dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; + cfg.read_concurrency_sem = _config.read_concurreny_sem; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2193,6 +2203,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { } cfg.dirty_memory_region_group = &_dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; + cfg.read_concurreny_sem = &_read_concurrency_sem; cfg.cf_stats = &_cf_stats; cfg.enable_incremental_backups = _enable_incremental_backups; return cfg; diff --git a/database.hh b/database.hh index f9c56c7d21..661cfb4320 100644 --- a/database.hh +++ b/database.hh @@ -263,6 +263,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + semaphore* read_concurrency_sem = nullptr; ::cf_stats* cf_stats = nullptr; }; struct no_commitlog {}; @@ -789,6 +790,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; + semaphore* read_concurreny_sem = nullptr; ::cf_stats* cf_stats = nullptr; }; private: @@ -868,6 +870,8 @@ public: class database { ::cf_stats _cf_stats; + static constexpr size_t max_concurrent_reads() { return 100; } + static constexpr size_t max_system_concurrent_reads() { return 10; } struct db_stats { uint64_t total_writes = 0; uint64_t total_reads = 0; @@ -880,6 +884,8 @@ class database { size_t _streaming_memtable_total_space = 500 << 20; logalloc::region_group _dirty_memory_region_group; logalloc::region_group _streaming_dirty_memory_region_group; + semaphore _read_concurrency_sem{max_concurrent_reads()}; + semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; std::unordered_map _keyspaces; std::unordered_map> _column_families; @@ -1025,6 +1031,9 @@ public: std::unordered_set get_initial_tokens(); std::experimental::optional get_replace_address(); bool is_replacing(); + semaphore& system_keyspace_read_concurrency_sem() { + return _system_read_concurrency_sem; + } }; // FIXME: stub diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 3d524c1f9f..411117b2f9 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1022,6 +1022,8 @@ void make(database& db, bool durable, bool volatile_testing_only) { kscfg.enable_disk_writes = !volatile_testing_only; kscfg.enable_commitlog = !volatile_testing_only; kscfg.enable_cache = true; + // don't make system keyspace reads wait for user reads + kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem(); keyspace _ks{ksm, std::move(kscfg)}; auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options())); _ks.set_replication_strategy(std::move(rs));