mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 03:20:37 +00:00
Merge "Improve memory reclaim robustness"
This commit is contained in:
129
database.cc
129
database.cc
@@ -16,6 +16,7 @@
|
||||
#include "nway_merger.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "core/seastar.hh"
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include "sstables/sstables.hh"
|
||||
@@ -34,6 +35,8 @@
|
||||
#include "service/storage_service.hh"
|
||||
#include "mutation_query.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
logging::logger dblog("database");
|
||||
|
||||
column_family::column_family(schema_ptr schema, config config, db::commitlog& cl, compaction_manager& compaction_manager)
|
||||
@@ -382,7 +385,7 @@ void column_family::add_sstable(sstables::sstable&& sstable) {
|
||||
void column_family::add_memtable() {
|
||||
// allow in-progress reads to continue using old list
|
||||
_memtables = make_lw_shared(memtable_list(*_memtables));
|
||||
_memtables->emplace_back(make_lw_shared<memtable>(_schema));
|
||||
_memtables->emplace_back(make_lw_shared<memtable>(_schema, _config.dirty_memory_region_group));
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -417,57 +420,75 @@ column_family::seal_active_memtable() {
|
||||
);
|
||||
_highest_flushed_rp = old->replay_position();
|
||||
|
||||
return seastar::with_gate(_in_flight_seals, [old, this] {
|
||||
return flush_memtable_to_sstable(old);
|
||||
});
|
||||
// FIXME: release commit log
|
||||
// FIXME: provide back-pressure to upper layers
|
||||
}
|
||||
|
||||
future<stop_iteration>
|
||||
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
// FIXME: better way of ensuring we don't attemt to
|
||||
// overwrite an existing table.
|
||||
auto gen = _sstable_generation++ * smp::count + engine().cpu_id();
|
||||
|
||||
return seastar::with_gate(_in_flight_seals, [gen, old, this] {
|
||||
sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(),
|
||||
_config.datadir, gen,
|
||||
sstables::sstable::version_types::ka,
|
||||
sstables::sstable::format_types::big);
|
||||
sstables::sstable newtab = sstables::sstable(_schema->ks_name(), _schema->cf_name(),
|
||||
_config.datadir, gen,
|
||||
sstables::sstable::version_types::ka,
|
||||
sstables::sstable::format_types::big);
|
||||
|
||||
dblog.debug("Flushing to {}", newtab.get_filename());
|
||||
return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) {
|
||||
// FIXME: write all components
|
||||
return newtab.write_components(*old).then([this, &newtab, old] {
|
||||
return newtab.load();
|
||||
}).then([this, old, &newtab] {
|
||||
dblog.debug("Flushing done");
|
||||
// We must add sstable before we call update_cache(), because
|
||||
// memtable's data after moving to cache can be evicted at any time.
|
||||
auto old_sstables = _sstables;
|
||||
add_sstable(std::move(newtab));
|
||||
return update_cache(*old, std::move(old_sstables));
|
||||
}).then_wrapped([this, old] (future<> ret) {
|
||||
try {
|
||||
ret.get();
|
||||
dblog.debug("Flushing to {}", newtab.get_filename());
|
||||
return do_with(std::move(newtab), [old, this] (sstables::sstable& newtab) {
|
||||
// FIXME: write all components
|
||||
return newtab.write_components(*old).then([this, &newtab, old] {
|
||||
return newtab.load();
|
||||
}).then([this, old, &newtab] {
|
||||
dblog.debug("Flushing done");
|
||||
// We must add sstable before we call update_cache(), because
|
||||
// memtable's data after moving to cache can be evicted at any time.
|
||||
auto old_sstables = _sstables;
|
||||
add_sstable(std::move(newtab));
|
||||
return update_cache(*old, std::move(old_sstables));
|
||||
}).then_wrapped([this, old] (future<> ret) {
|
||||
try {
|
||||
ret.get();
|
||||
|
||||
// FIXME: until the surrounding function returns a future and
|
||||
// caller ensures ordering (i.e. finish flushing one or more sequential tables before
|
||||
// doing the discard), this below is _not_ correct, since the use of replay_position
|
||||
// depends on us reporting the factual highest position we've actually flushed,
|
||||
// _and_ all positions (for a given UUID) below having been dealt with.
|
||||
//
|
||||
// Note that the whole scheme is also dependent on memtables being "allocated" in order,
|
||||
// i.e. we may not flush a younger memtable before and older, and we need to use the
|
||||
// highest rp.
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
|
||||
}
|
||||
_memtables->erase(boost::range::find(*_memtables, old));
|
||||
dblog.debug("Memtable replaced");
|
||||
trigger_compaction();
|
||||
} catch (std::exception& e) {
|
||||
dblog.error("failed to write sstable: {}", e.what());
|
||||
} catch (...) {
|
||||
dblog.error("failed to write sstable: unknown error");
|
||||
// FIXME: until the surrounding function returns a future and
|
||||
// caller ensures ordering (i.e. finish flushing one or more sequential tables before
|
||||
// doing the discard), this below is _not_ correct, since the use of replay_position
|
||||
// depends on us reporting the factual highest position we've actually flushed,
|
||||
// _and_ all positions (for a given UUID) below having been dealt with.
|
||||
//
|
||||
// Note that the whole scheme is also dependent on memtables being "allocated" in order,
|
||||
// i.e. we may not flush a younger memtable before and older, and we need to use the
|
||||
// highest rp.
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
|
||||
}
|
||||
_memtables->erase(boost::range::find(*_memtables, old));
|
||||
dblog.debug("Memtable replaced");
|
||||
trigger_compaction();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} catch (std::exception& e) {
|
||||
dblog.error("failed to write sstable: {}", e.what());
|
||||
} catch (...) {
|
||||
dblog.error("failed to write sstable: unknown error");
|
||||
}
|
||||
return sleep(10s).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
});
|
||||
// FIXME: release commit log
|
||||
// FIXME: provide back-pressure to upper layers
|
||||
}
|
||||
|
||||
future<>
|
||||
column_family::flush_memtable_to_sstable(lw_shared_ptr<memtable> memt) {
|
||||
return repeat([this, memt] {
|
||||
return seastar::with_gate(_in_flight_seals, [memt, this] {
|
||||
return try_flush_memtable_to_sstable(memt);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
@@ -643,6 +664,18 @@ database::database(const db::config& cfg)
|
||||
db::system_keyspace::make(*this, durable);
|
||||
// Start compaction manager with two tasks for handling compaction jobs.
|
||||
_compaction_manager.start(2);
|
||||
setup_collectd();
|
||||
}
|
||||
|
||||
void
|
||||
database::setup_collectd() {
|
||||
_collectd.push_back(
|
||||
scollectd::add_polled_metric(scollectd::type_instance_id("memory"
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", "dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return _dirty_memory_region_group.memory_used();
|
||||
})));
|
||||
}
|
||||
|
||||
database::~database() {
|
||||
@@ -955,6 +988,9 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.enable_disk_writes = _config.enable_disk_writes;
|
||||
cfg.enable_commitlog = _config.enable_commitlog;
|
||||
cfg.enable_cache = _config.enable_cache;
|
||||
cfg.max_memtable_size = _config.max_memtable_size;
|
||||
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
|
||||
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -1213,7 +1249,7 @@ future<> database::apply(const frozen_mutation& m) {
|
||||
}
|
||||
|
||||
keyspace::config
|
||||
database::make_keyspace_config(const keyspace_metadata& ksm) const {
|
||||
database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
// FIXME support multiple directories
|
||||
keyspace::config cfg;
|
||||
if (_cfg->data_file_directories().size() > 0) {
|
||||
@@ -1222,13 +1258,20 @@ database::make_keyspace_config(const keyspace_metadata& ksm) const {
|
||||
cfg.enable_disk_reads = true; // we allways read from disk
|
||||
cfg.enable_commitlog = ksm.durable_writes() && _cfg->enable_commitlog() && !_cfg->enable_in_memory_data_store();
|
||||
cfg.enable_cache = _cfg->enable_cache();
|
||||
auto memtable_total_space = size_t(_cfg->memtable_total_space_in_mb()) << 20;
|
||||
if (!memtable_total_space) {
|
||||
memtable_total_space = memory::stats().total_memory() / 2;
|
||||
}
|
||||
cfg.max_memtable_size = memtable_total_space * _cfg->memtable_cleanup_threshold();
|
||||
} else {
|
||||
cfg.datadir = "";
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_disk_reads = false;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.max_memtable_size = std::numeric_limits<size_t>::max();
|
||||
}
|
||||
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
|
||||
18
database.hh
18
database.hh
@@ -88,6 +88,8 @@ public:
|
||||
bool enable_disk_reads = true;
|
||||
bool enable_cache = true;
|
||||
bool enable_commitlog = true;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
struct stats {
|
||||
@@ -123,6 +125,8 @@ private:
|
||||
void update_stats_for_new_sstable(uint64_t new_sstable_data_size);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_memtable();
|
||||
future<> flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
future<> update_cache(memtable&, lw_shared_ptr<sstable_list> old_sstables);
|
||||
struct merge_comparator;
|
||||
private:
|
||||
@@ -307,6 +311,8 @@ public:
|
||||
bool enable_disk_reads = true;
|
||||
bool enable_disk_writes = true;
|
||||
bool enable_cache = true;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
};
|
||||
private:
|
||||
std::unique_ptr<locator::abstract_replication_strategy> _replication_strategy;
|
||||
@@ -363,6 +369,8 @@ class database {
|
||||
utils::UUID _version;
|
||||
// compaction_manager object is referenced by all column families of a database.
|
||||
compaction_manager _compaction_manager;
|
||||
logalloc::region_group _dirty_memory_region_group;
|
||||
std::vector<scollectd::registration> _collectd;
|
||||
|
||||
future<> init_commitlog();
|
||||
future<> apply_in_memory(const frozen_mutation&, const db::replay_position&);
|
||||
@@ -375,7 +383,7 @@ private:
|
||||
void add_keyspace(sstring name, keyspace k);
|
||||
void create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
friend void db::system_keyspace::make(database& db, bool durable);
|
||||
|
||||
void setup_collectd();
|
||||
public:
|
||||
static utils::UUID empty_version;
|
||||
|
||||
@@ -439,7 +447,7 @@ public:
|
||||
future<lw_shared_ptr<query::result>> query(const query::read_command& cmd, const std::vector<query::partition_range>& ranges);
|
||||
future<reconcilable_result> query_mutations(const query::read_command& cmd, const query::partition_range& range);
|
||||
future<> apply(const frozen_mutation&);
|
||||
keyspace::config make_keyspace_config(const keyspace_metadata& ksm) const;
|
||||
keyspace::config make_keyspace_config(const keyspace_metadata& ksm);
|
||||
const sstring& get_snitch_name() const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const database& db);
|
||||
@@ -475,8 +483,10 @@ column_family::apply(const mutation& m, const db::replay_position& rp) {
|
||||
inline
|
||||
void
|
||||
column_family::seal_on_overflow() {
|
||||
// FIXME: something better
|
||||
if (++_mutation_count == 100000) {
|
||||
++_mutation_count;
|
||||
if (active_memtable().occupancy().total_space() >= _config.max_memtable_size) {
|
||||
// FIXME: if sparse, do some in-memory compaction first
|
||||
// FIXME: maybe merge with other in-memory memtables
|
||||
_mutation_count = 0;
|
||||
seal_active_memtable();
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ public:
|
||||
"Related information: Configuring compaction" \
|
||||
) \
|
||||
/* Common memtable settings */ \
|
||||
val(memtable_total_space_in_mb, uint32_t, 0, Unused, \
|
||||
val(memtable_total_space_in_mb, uint32_t, 0, Used, \
|
||||
"Specifies the total memory used for all memtables on a node. This replaces the per-table storage settings memtable_operations_in_millions and memtable_throughput_in_mb." \
|
||||
) \
|
||||
/* Common disk settings */ \
|
||||
@@ -297,7 +297,7 @@ public:
|
||||
"\toffheap_buffers Off heap (direct) NIO buffers.\n" \
|
||||
"\toffheap_objects Native memory, eliminating NIO buffer heap overhead." \
|
||||
) \
|
||||
val(memtable_cleanup_threshold, double, .11, Unused, \
|
||||
val(memtable_cleanup_threshold, double, .11, Used, \
|
||||
"Ratio of occupied non-flushing memtable size to total permitted size for triggering a flush of the largest memtable. Larger values mean larger flushes and less compaction, but also less concurrent flush activity, which can make it difficult to keep your disks saturated under heavy write load." \
|
||||
) \
|
||||
val(file_cache_size_in_mb, uint32_t, 512, Unused, \
|
||||
|
||||
@@ -7,9 +7,10 @@
|
||||
|
||||
namespace stdx = std::experimental;
|
||||
|
||||
memtable::memtable(schema_ptr schema)
|
||||
memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group)
|
||||
: _schema(std::move(schema))
|
||||
, partitions(partition_entry::compare(_schema)) {
|
||||
, partitions(partition_entry::compare(_schema))
|
||||
, _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region()) {
|
||||
}
|
||||
|
||||
memtable::~memtable() {
|
||||
|
||||
@@ -81,7 +81,7 @@ private:
|
||||
private:
|
||||
boost::iterator_range<partitions_type::const_iterator> slice(const query::partition_range& r) const;
|
||||
public:
|
||||
explicit memtable(schema_ptr schema);
|
||||
explicit memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group = nullptr);
|
||||
~memtable();
|
||||
schema_ptr schema() const { return _schema; }
|
||||
mutation_partition& find_or_create_partition(const dht::decorated_key& key);
|
||||
|
||||
48
row_cache.cc
48
row_cache.cc
@@ -147,31 +147,33 @@ void row_cache::populate(const mutation& m) {
|
||||
|
||||
future<> row_cache::update(memtable& m, negative_mutation_reader underlying_negative) {
|
||||
_tracker.region().merge(m._region); // Now all data in memtable belongs to cache
|
||||
with_allocator(_tracker.allocator(), [this, &m, underlying_negative = std::move(underlying_negative)] {
|
||||
auto i = m.partitions.begin();
|
||||
const schema& s = *m.schema();
|
||||
while (i != m.partitions.end()) {
|
||||
partition_entry& mem_e = *i;
|
||||
// FIXME: Optimize knowing we lookup in-order.
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema));
|
||||
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
|
||||
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
|
||||
// search it.
|
||||
if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) {
|
||||
cache_entry& entry = *cache_i;
|
||||
_tracker.touch(entry);
|
||||
entry.partition().apply(s, std::move(mem_e.partition()));
|
||||
} else if (underlying_negative(mem_e.key().key()) == negative_mutation_reader_result::definitely_doesnt_exists) {
|
||||
cache_entry* entry = current_allocator().construct<cache_entry>(mem_e.key(), mem_e.partition());
|
||||
_tracker.insert(*entry);
|
||||
_partitions.insert(cache_i, *entry);
|
||||
return repeat([this, &m, underlying_negative = std::move(underlying_negative)] () mutable {
|
||||
return with_allocator(_tracker.allocator(), [this, &m, &underlying_negative] () {
|
||||
unsigned quota = 30;
|
||||
auto i = m.partitions.begin();
|
||||
const schema& s = *m.schema();
|
||||
while (i != m.partitions.end() && quota-- != 0) {
|
||||
partition_entry& mem_e = *i;
|
||||
// FIXME: Optimize knowing we lookup in-order.
|
||||
auto cache_i = _partitions.lower_bound(mem_e.key(), cache_entry::compare(_schema));
|
||||
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
|
||||
// FIXME: keep a bitmap indicating which sstables we do cover, so we don't have to
|
||||
// search it.
|
||||
if (cache_i != _partitions.end() && cache_i->key().equal(s, mem_e.key())) {
|
||||
cache_entry& entry = *cache_i;
|
||||
_tracker.touch(entry);
|
||||
entry.partition().apply(s, std::move(mem_e.partition()));
|
||||
} else if (underlying_negative(mem_e.key().key()) == negative_mutation_reader_result::definitely_doesnt_exists) {
|
||||
cache_entry* entry = current_allocator().construct<cache_entry>(mem_e.key(), mem_e.partition());
|
||||
_tracker.insert(*entry);
|
||||
_partitions.insert(cache_i, *entry);
|
||||
}
|
||||
i = m.partitions.erase(i);
|
||||
current_allocator().destroy(&mem_e);
|
||||
}
|
||||
i = m.partitions.erase(i);
|
||||
current_allocator().destroy(&mem_e);
|
||||
}
|
||||
return make_ready_future<stop_iteration>(m.partitions.empty() ? stop_iteration::yes : stop_iteration::no);
|
||||
});
|
||||
});
|
||||
// FIXME: yield voluntarily every now and then to cap latency.
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, cache_tracker& tracker)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
*/
|
||||
|
||||
#include <boost/range/algorithm/heap_algorithm.hpp>
|
||||
#include <boost/range/algorithm/remove.hpp>
|
||||
#include <boost/heap/binomial_heap.hpp>
|
||||
#include <stack>
|
||||
|
||||
@@ -382,7 +383,7 @@ segment::heap_handle() {
|
||||
// Per-segment metadata is kept in a separate array, managed by segment_pool
|
||||
// object.
|
||||
//
|
||||
class region::impl : public allocation_strategy {
|
||||
class region_impl : public allocation_strategy {
|
||||
static constexpr float max_occupancy_for_compaction = 0.85; // FIXME: make configurable
|
||||
static constexpr size_t max_managed_object_size = segment::size * 0.1;
|
||||
|
||||
@@ -494,6 +495,7 @@ class region::impl : public allocation_strategy {
|
||||
}
|
||||
} __attribute__((packed));
|
||||
private:
|
||||
region_group* _group = nullptr;
|
||||
segment* _active = nullptr;
|
||||
size_t _active_offset;
|
||||
segment_heap _segments; // Contains only closed segments
|
||||
@@ -507,7 +509,7 @@ private:
|
||||
assert(alignment < obj_flags::max_alignment);
|
||||
|
||||
if (!_active) {
|
||||
_active = shard_segment_pool.new_segment();
|
||||
_active = new_segment();
|
||||
_active_offset = 0;
|
||||
}
|
||||
|
||||
@@ -563,17 +565,32 @@ private:
|
||||
_active = nullptr;
|
||||
}
|
||||
|
||||
void free_segment(segment* seg) {
|
||||
shard_segment_pool.free_segment(seg);
|
||||
if (_group) {
|
||||
_group->update(-segment::size);
|
||||
}
|
||||
}
|
||||
|
||||
segment* new_segment() {
|
||||
segment* seg = shard_segment_pool.new_segment();
|
||||
if (_group) {
|
||||
_group->update(segment::size);
|
||||
}
|
||||
return seg;
|
||||
}
|
||||
|
||||
void compact(segment* seg) {
|
||||
for_each_live(seg, [this] (object_descriptor* desc, void* obj) {
|
||||
auto dst = alloc_small(desc->migrator(), desc->size(), desc->alignment());
|
||||
desc->migrator()(obj, dst, desc->size());
|
||||
});
|
||||
|
||||
shard_segment_pool.free_segment(seg);
|
||||
free_segment(seg);
|
||||
}
|
||||
|
||||
void close_and_open() {
|
||||
segment* new_active = shard_segment_pool.new_segment();
|
||||
segment* new_active = new_segment();
|
||||
close_active();
|
||||
_active = new_active;
|
||||
_active_offset = 0;
|
||||
@@ -583,25 +600,47 @@ private:
|
||||
static std::atomic<uint64_t> id{0};
|
||||
return id.fetch_add(1);
|
||||
}
|
||||
struct degroup_temporarily {
|
||||
region_impl* impl;
|
||||
region_group* group;
|
||||
explicit degroup_temporarily(region_impl* impl)
|
||||
: impl(impl), group(impl->_group) {
|
||||
if (group) {
|
||||
group->del(impl);
|
||||
}
|
||||
}
|
||||
~degroup_temporarily() {
|
||||
if (group) {
|
||||
group->add(impl);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
impl()
|
||||
: _id(next_id())
|
||||
explicit region_impl(region_group* group = nullptr)
|
||||
: _group(group), _id(next_id())
|
||||
{
|
||||
tracker_instance._impl->register_region(this);
|
||||
if (group) {
|
||||
group->add(this);
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~impl() {
|
||||
virtual ~region_impl() {
|
||||
tracker_instance._impl->unregister_region(this);
|
||||
|
||||
assert(_segments.empty());
|
||||
if (_active) {
|
||||
assert(_active->is_empty());
|
||||
shard_segment_pool.free_segment(_active);
|
||||
free_segment(_active);
|
||||
}
|
||||
if (_group) {
|
||||
_group->del(this);
|
||||
}
|
||||
}
|
||||
|
||||
impl(impl&&) = delete;
|
||||
impl(const impl&) = delete;
|
||||
region_impl(region_impl&&) = delete;
|
||||
region_impl(const region_impl&) = delete;
|
||||
|
||||
bool empty() const {
|
||||
return occupancy().used_space() == 0;
|
||||
@@ -663,7 +702,7 @@ public:
|
||||
if (seg != _active) {
|
||||
if (seg_desc.is_empty()) {
|
||||
_segments.erase(seg_desc.heap_handle());
|
||||
shard_segment_pool.free_segment(seg, seg_desc);
|
||||
free_segment(seg);
|
||||
} else {
|
||||
_closed_occupancy += seg_desc.occupancy();
|
||||
_segments.decrease(seg_desc.heap_handle());
|
||||
@@ -673,7 +712,10 @@ public:
|
||||
|
||||
// Merges another region into this region. The other region is left empty.
|
||||
// Doesn't invalidate references to allocated objects.
|
||||
void merge(region::impl& other) {
|
||||
void merge(region_impl& other) {
|
||||
degroup_temporarily dgt1(this);
|
||||
degroup_temporarily dgt2(&other);
|
||||
|
||||
if (_active && _active->is_empty()) {
|
||||
shard_segment_pool.free_segment(_active);
|
||||
_active = nullptr;
|
||||
@@ -768,12 +810,17 @@ public:
|
||||
_evictable = true;
|
||||
_eviction_fn = std::move(fn);
|
||||
}
|
||||
friend class region_group;
|
||||
};
|
||||
|
||||
region::region()
|
||||
: _impl(std::make_unique<impl>())
|
||||
{ }
|
||||
|
||||
region::region(region_group& group)
|
||||
: _impl(std::make_unique<impl>(&group)) {
|
||||
}
|
||||
|
||||
region::~region() {
|
||||
}
|
||||
|
||||
@@ -817,7 +864,7 @@ occupancy_stats tracker::impl::occupancy() const {
|
||||
void tracker::impl::full_compaction() {
|
||||
logger.debug("Full compaction on all regions, {}", occupancy());
|
||||
|
||||
for (region::impl* r : _regions) {
|
||||
for (region_impl* r : _regions) {
|
||||
if (r->is_compactible()) {
|
||||
r->full_compaction();
|
||||
}
|
||||
@@ -891,7 +938,7 @@ size_t tracker::impl::reclaim(size_t bytes) {
|
||||
in_use, in_use * segment::size, segments_to_release, segments_to_release * segment::size);
|
||||
|
||||
// Allow dipping into reserves while compacting
|
||||
segment_pool::reservation_goal(shard_segment_pool, 0);
|
||||
segment_pool::reservation_goal open_emergency_pool(shard_segment_pool, 0);
|
||||
|
||||
boost::range::make_heap(_regions, cmp);
|
||||
|
||||
@@ -968,4 +1015,44 @@ void tracker::impl::register_collectd_metrics() {
|
||||
});
|
||||
}
|
||||
|
||||
region_group::region_group(region_group&& o) noexcept
|
||||
: _parent(o._parent), _total_memory(o._total_memory)
|
||||
, _subgroups(std::move(o._subgroups)), _regions(std::move(o._regions)) {
|
||||
if (_parent) {
|
||||
_parent->del(&o);
|
||||
_parent->add(this);
|
||||
}
|
||||
o._total_memory = 0;
|
||||
for (auto&& sg : _subgroups) {
|
||||
sg->_parent = this;
|
||||
}
|
||||
for (auto&& r : _regions) {
|
||||
r->_group = this;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
region_group::add(region_group* child) {
|
||||
_subgroups.push_back(child);
|
||||
update(child->_total_memory);
|
||||
}
|
||||
|
||||
void
|
||||
region_group::del(region_group* child) {
|
||||
_subgroups.erase(boost::range::remove(_subgroups, child), _subgroups.end());
|
||||
update(-child->_total_memory);
|
||||
}
|
||||
|
||||
void
|
||||
region_group::add(region_impl* child) {
|
||||
_regions.push_back(child);
|
||||
update(child->occupancy().total_space());
|
||||
}
|
||||
|
||||
void
|
||||
region_group::del(region_impl* child) {
|
||||
_regions.erase(boost::range::remove(_regions, child), _regions.end());
|
||||
update(-child->occupancy().total_space());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -12,9 +12,51 @@
|
||||
namespace logalloc {
|
||||
|
||||
struct occupancy_stats;
|
||||
class region;
|
||||
class region_impl;
|
||||
|
||||
using eviction_fn = std::function<void()>;
|
||||
|
||||
// Groups regions for the purpose of statistics. Can be nested.
|
||||
class region_group {
|
||||
region_group* _parent = nullptr;
|
||||
size_t _total_memory = 0;
|
||||
std::vector<region_group*> _subgroups;
|
||||
std::vector<region_impl*> _regions;
|
||||
public:
|
||||
region_group() = default;
|
||||
region_group(region_group* parent) : _parent(parent) {
|
||||
if (_parent) {
|
||||
_parent->add(this);
|
||||
}
|
||||
}
|
||||
region_group(region_group&& o) noexcept;
|
||||
region_group(const region_group&) = delete;
|
||||
~region_group() {
|
||||
if (_parent) {
|
||||
_parent->del(this);
|
||||
}
|
||||
}
|
||||
region_group& operator=(const region_group&) = delete;
|
||||
region_group& operator=(region_group&&) = delete;
|
||||
size_t memory_used() const {
|
||||
return _total_memory;
|
||||
}
|
||||
void update(ssize_t delta) {
|
||||
auto rg = this;
|
||||
while (rg) {
|
||||
rg->_total_memory += delta;
|
||||
rg = rg->_parent;
|
||||
}
|
||||
}
|
||||
private:
|
||||
void add(region_group* child);
|
||||
void del(region_group* child);
|
||||
void add(region_impl* child);
|
||||
void del(region_impl* child);
|
||||
friend class region_impl;
|
||||
};
|
||||
|
||||
// Controller for all LSA regions. There's one per shard.
|
||||
class tracker {
|
||||
public:
|
||||
@@ -23,6 +65,7 @@ private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
memory::reclaimer _reclaimer;
|
||||
friend class region;
|
||||
friend class region_impl;
|
||||
public:
|
||||
tracker();
|
||||
~tracker();
|
||||
@@ -123,14 +166,15 @@ public:
|
||||
//
|
||||
class region {
|
||||
public:
|
||||
class impl;
|
||||
using impl = region_impl;
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
public:
|
||||
region();
|
||||
explicit region(region_group& group);
|
||||
~region();
|
||||
region(region&& other) = default;
|
||||
region& operator=(region&& other) = default;
|
||||
region(region&& other);
|
||||
region& operator=(region&& other);
|
||||
region(const region& other) = delete;
|
||||
|
||||
occupancy_stats occupancy() const;
|
||||
@@ -154,6 +198,8 @@ public:
|
||||
// when data from this region needs to be evicted in order to reclaim space.
|
||||
// The function should free some space from this region.
|
||||
void make_evictable(eviction_fn);
|
||||
|
||||
friend class region_group;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user