Add flag to configure

max size of a cached partition.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit 636a4acfd0)
This commit is contained in:
Piotr Jastrzebski
2016-07-15 16:01:06 +02:00
committed by Pekka Enberg
parent 30c72ef3b4
commit ec3d59bf13
7 changed files with 23 additions and 14 deletions

View File

@@ -127,7 +127,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes)
, _commitlog(cl)
, _compaction_manager(compaction_manager)
, _flush_queue(std::make_unique<memtable_flush_queue>())
@@ -1581,7 +1581,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) {
auto s = t.second;
auto& ks = this->find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
auto cfg = ks.make_column_family_config(*s, this->get_config());
this->add_column_family(s, std::move(cfg));
return ks.make_directory_for_column_family(s->cf_name(), s->id()).then([s] {});
});
@@ -1838,7 +1838,7 @@ void keyspace::update_from(::lw_shared_ptr<keyspace_metadata> ksm) {
}
column_family::config
keyspace::make_column_family_config(const schema& s) const {
keyspace::make_column_family_config(const schema& s, const db::config& db_config) const {
column_family::config cfg;
cfg.datadir = column_family_directory(s.cf_name(), s.id());
cfg.enable_disk_reads = _config.enable_disk_reads;
@@ -1852,6 +1852,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
return cfg;
}

View File

@@ -316,6 +316,7 @@ public:
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
struct no_commitlog {};
struct stats {
@@ -884,7 +885,7 @@ public:
*/
locator::abstract_replication_strategy& get_replication_strategy();
const locator::abstract_replication_strategy& get_replication_strategy() const;
column_family::config make_column_family_config(const schema& s) const;
column_family::config make_column_family_config(const schema& s, const db::config& db_config) const;
future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid);
void add_or_update_column_family(const schema_ptr& s) {
_metadata->add_or_update_column_family(s);

View File

@@ -369,6 +369,9 @@ public:
val(reduce_cache_sizes_at, double, .85, Invalid, \
"When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \
) \
val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \
"Partitions with size greater than this value won't be cached." \
) \
/* Disks settings */ \
val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \
"Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \

View File

@@ -748,7 +748,7 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
for (auto&& gs : created) {
schema_ptr s = gs.get();
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s);
auto cfg = ks.make_column_family_config(*s, db.get_config());
db.add_column_family(s, cfg);
auto& cf = db.find_column_family(s);
cf.mark_ready_for_writes();

View File

@@ -1043,7 +1043,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
db.add_keyspace(NAME, std::move(_ks));
auto& ks = db.find_keyspace(NAME);
for (auto&& table : all_tables()) {
db.add_column_family(table, ks.make_column_family_config(*table));
db.add_column_family(table, ks.make_column_family_config(*table, db.get_config()));
}
}

View File

@@ -40,12 +40,12 @@ thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduli
enum class is_wide_partition { yes, no };
future<is_wide_partition, mutation_opt> try_to_read(streamed_mutation_opt&& sm) {
future<is_wide_partition, mutation_opt>
try_to_read(uint64_t max_cached_partition_size_in_bytes, streamed_mutation_opt&& sm) {
if (!sm) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, mutation_opt());
}
static const size_t max_size_of_cached_partition = 10 * 1024 * 1024;
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_size_of_cached_partition).then(
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_cached_partition_size_in_bytes).then(
[] (mutation_opt&& omo) mutable {
if (omo) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, std::move(omo));
@@ -234,7 +234,8 @@ public:
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
}
dht::decorated_key dk = sm->decorated_key();
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
[this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) {
if (wide_partition == is_wide_partition::no) {
if (mo) {
@@ -436,8 +437,9 @@ public:
return _reader().then([this, op = std::move(op)] (auto sm) mutable {
stdx::optional<dht::decorated_key> dk = (sm) ? stdx::optional<dht::decorated_key>(sm->decorated_key())
: stdx::optional<dht::decorated_key>(stdx::nullopt);
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) {
return try_to_read(_cache._max_cached_partition_size_in_bytes, std::move(sm)).then(
[this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) mutable {
if (wide_partition == is_wide_partition::no) {
if (mo) {
_cache.populate(*mo);
@@ -961,12 +963,13 @@ void row_cache::invalidate_unwrapped(const query::partition_range& range) {
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, key_source underlying_keys,
cache_tracker& tracker)
cache_tracker& tracker, uint64_t max_cached_partition_size_in_bytes)
: _tracker(tracker)
, _schema(std::move(s))
, _partitions(cache_entry::compare(_schema))
, _underlying(std::move(fallback_factory))
, _underlying_keys(std::move(underlying_keys))
, _max_cached_partition_size_in_bytes(max_cached_partition_size_in_bytes)
{
with_allocator(_tracker.allocator(), [this] {
cache_entry* entry = current_allocator().construct<cache_entry>(_schema);

View File

@@ -211,6 +211,7 @@ private:
partitions_type _partitions; // Cached partitions are complete.
mutation_source _underlying;
key_source _underlying_keys;
uint64_t _max_cached_partition_size_in_bytes;
// Synchronizes populating reads with updates of underlying data source to ensure that cache
// remains consistent across flushes with the underlying data source.
@@ -238,7 +239,7 @@ private:
static thread_local seastar::thread_scheduling_group _update_thread_scheduling_group;
public:
~row_cache();
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&);
row_cache(schema_ptr, mutation_source underlying, key_source, cache_tracker&, uint64_t _max_cached_partition_size_in_bytes = 10 * 1024 * 1024);
row_cache(row_cache&&) = default;
row_cache(const row_cache&) = delete;
row_cache& operator=(row_cache&&) = default;