From ec3d59bf136c96e401b8b507188a2dfad795d2f9 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Fri, 15 Jul 2016 16:01:06 +0200 Subject: [PATCH] Add flag to configure max size of a cached partition. Signed-off-by: Piotr Jastrzebski (cherry picked from commit 636a4acfd0027b2042edfea7b7a6849ad5a96469) --- database.cc | 7 ++++--- database.hh | 3 ++- db/config.hh | 3 +++ db/schema_tables.cc | 2 +- db/system_keyspace.cc | 2 +- row_cache.cc | 17 ++++++++++------- row_cache.hh | 3 ++- 7 files changed, 23 insertions(+), 14 deletions(-) diff --git a/database.cc b/database.cc index c127519857..089984010e 100644 --- a/database.cc +++ b/database.cc @@ -127,7 +127,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl , _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list()) , _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options())) , _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema))) - , _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker()) + , _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes) , _commitlog(cl) , _compaction_manager(compaction_manager) , _flush_queue(std::make_unique()) @@ -1581,7 +1581,7 @@ future<> database::parse_system_tables(distributed& prox return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) { auto s = t.second; auto& ks = this->find_keyspace(s->ks_name()); - auto cfg = ks.make_column_family_config(*s); + auto cfg = ks.make_column_family_config(*s, this->get_config()); this->add_column_family(s, std::move(cfg)); return ks.make_directory_for_column_family(s->cf_name(), s->id()).then([s] {}); }); @@ -1838,7 +1838,7 @@ void keyspace::update_from(::lw_shared_ptr ksm) { } column_family::config -keyspace::make_column_family_config(const schema& s) const { +keyspace::make_column_family_config(const schema& s, const db::config& db_config) const { column_family::config cfg; cfg.datadir = column_family_directory(s.cf_name(), s.id()); cfg.enable_disk_reads = _config.enable_disk_reads; @@ -1852,6 +1852,7 @@ keyspace::make_column_family_config(const schema& s) const { cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; + cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024; return cfg; } diff --git a/database.hh b/database.hh index 6fc4332d2b..d57b0dcd31 100644 --- a/database.hh +++ b/database.hh @@ -316,6 +316,7 @@ public: ::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager; restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; + uint64_t max_cached_partition_size_in_bytes; }; struct no_commitlog {}; struct stats { @@ -884,7 +885,7 @@ public: */ locator::abstract_replication_strategy& get_replication_strategy(); const locator::abstract_replication_strategy& get_replication_strategy() const; - column_family::config make_column_family_config(const schema& s) const; + column_family::config make_column_family_config(const schema& s, const db::config& db_config) const; future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid); void add_or_update_column_family(const schema_ptr& s) { _metadata->add_or_update_column_family(s); diff --git a/db/config.hh b/db/config.hh index ba4e9e2dba..a38262ce87 100644 --- a/db/config.hh +++ b/db/config.hh @@ -369,6 +369,9 @@ public: val(reduce_cache_sizes_at, double, .85, Invalid, \ "When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \ ) \ + val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \ + "Partitions with size greater than this value won't be cached." \ + ) \ /* Disks settings */ \ val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \ "Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \ diff --git a/db/schema_tables.cc b/db/schema_tables.cc index ebec85a724..e5b9d436f0 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -748,7 +748,7 @@ static void merge_tables(distributed& proxy, for (auto&& gs : created) { schema_ptr s = gs.get(); auto& ks = db.find_keyspace(s->ks_name()); - auto cfg = ks.make_column_family_config(*s); + auto cfg = ks.make_column_family_config(*s, db.get_config()); db.add_column_family(s, cfg); auto& cf = db.find_column_family(s); cf.mark_ready_for_writes(); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 5a1885932a..36228ae7d8 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1043,7 +1043,7 @@ void make(database& db, bool durable, bool volatile_testing_only) { db.add_keyspace(NAME, std::move(_ks)); auto& ks = db.find_keyspace(NAME); for (auto&& table : all_tables()) { - db.add_column_family(table, ks.make_column_family_config(*table)); + db.add_column_family(table, ks.make_column_family_config(*table, db.get_config())); } } diff --git a/row_cache.cc b/row_cache.cc index 382457e8ec..6daa1c1c6d 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -40,12 +40,12 @@ thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduli enum class is_wide_partition { yes, no }; -future try_to_read(streamed_mutation_opt&& sm) { +future +try_to_read(uint64_t max_cached_partition_size_in_bytes, streamed_mutation_opt&& sm) { if (!sm) { return make_ready_future(is_wide_partition::no, mutation_opt()); } - static const size_t max_size_of_cached_partition = 10 * 1024 * 1024; - return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_size_of_cached_partition).then( + return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_cached_partition_size_in_bytes).then( [] (mutation_opt&& omo) mutable { if (omo) { return make_ready_future(is_wide_partition::no, std::move(omo)); @@ -234,7 +234,8 @@ public: return make_ready_future(streamed_mutation_opt()); } dht::decorated_key dk = sm->decorated_key(); - return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)] + return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then( + [this, op = std::move(op), dk = std::move(dk)] (is_wide_partition wide_partition, mutation_opt&& mo) { if (wide_partition == is_wide_partition::no) { if (mo) { @@ -436,8 +437,9 @@ public: return _reader().then([this, op = std::move(op)] (auto sm) mutable { stdx::optional dk = (sm) ? stdx::optional(sm->decorated_key()) : stdx::optional(stdx::nullopt); - return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)] - (is_wide_partition wide_partition, mutation_opt&& mo) { + return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then( + [this, op = std::move(op), dk = std::move(dk)] + (is_wide_partition wide_partition, mutation_opt&& mo) mutable { if (wide_partition == is_wide_partition::no) { if (mo) { _cache.populate(*mo); @@ -961,12 +963,13 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) { } row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys, - cache_tracker& tracker) + cache_tracker& tracker, uint64_t max_cached_partition_size_in_bytes) : _tracker(tracker) , _schema(std::move(s)) , _partitions(cache_entry::compare(_schema)) , _underlying(std::move(fallback_factory)) , _underlying_keys(std::move(underlying_keys)) + , _max_cached_partition_size_in_bytes(max_cached_partition_size_in_bytes) { with_allocator(_tracker.allocator(), [this] { cache_entry* entry = current_allocator().construct(_schema); diff --git a/row_cache.hh b/row_cache.hh index b3d18bf9a8..e04f1ca1f8 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -211,6 +211,7 @@ private: partitions_type _partitions; // Cached partitions are complete. mutation_source _underlying; key_source _underlying_keys; + uint64_t _max_cached_partition_size_in_bytes; // Synchronizes populating reads with updates of underlying data source to ensure that cache // remains consistent across flushes with the underlying data source. @@ -238,7 +239,7 @@ private: static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group; public: ~row_cache(); - row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&); + row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&, uint64_t _max_cached_partition_size_in_bytes = 10 * 1024 * 1024); row_cache(row_cache&&) = default; row_cache(const row_cache&) = delete; row_cache& operator=(row_cache&&) = default;