mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 20:16:43 +00:00
Merge "Fix database freeze with load for multiple CFs" from Glauber
"Issue 1195 describes a scenario with a fairly easy reproducer in which we can freeze the database. That involves writing simultaneously to multiple CFs, such that the sum of all the memory they are using is larger than the dirty memory limit, without not any of them individually being larger than the memtable size. This patchset rewrites the throttling code, including now active flushes so that this situation cannot happen. Fixes #1195"
This commit is contained in:
154
database.cc
154
database.cc
@@ -92,21 +92,21 @@ lw_shared_ptr<memtable_list>
|
||||
column_family::make_memory_only_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior ignored) { return make_ready_future<>(); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
|
||||
}
|
||||
|
||||
lw_shared_ptr<memtable_list>
|
||||
column_family::make_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_memtable(behavior); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_region_group, _memtables_serializer);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_memtable_size, _config.dirty_memory_manager);
|
||||
}
|
||||
|
||||
lw_shared_ptr<memtable_list>
|
||||
column_family::make_streaming_memtable_list() {
|
||||
auto seal = [this] (memtable_list::flush_behavior behavior) { return seal_active_streaming_memtable(behavior); };
|
||||
auto get_schema = [this] { return schema(); };
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_region_group, _streaming_serializer);
|
||||
return make_lw_shared<memtable_list>(std::move(seal), std::move(get_schema), _config.max_streaming_memtable_size, _config.streaming_dirty_memory_manager);
|
||||
}
|
||||
|
||||
column_family::column_family(schema_ptr schema, config config, db::commitlog* cl, compaction_manager& compaction_manager)
|
||||
@@ -1329,14 +1329,18 @@ database::database(const db::config& cfg)
|
||||
return memtable_total_space;
|
||||
}())
|
||||
, _streaming_memtable_total_space(_memtable_total_space / 4)
|
||||
, _streaming_dirty_memory_region_group(&_dirty_memory_region_group)
|
||||
// Allow system tables a pool of 10 MB extra memory to write over the threshold. Under normal
|
||||
// circumnstances it won't matter, but when we throttle, some system requests will be able to
|
||||
// keep being serviced even if user requests are not.
|
||||
//
|
||||
// Note that even if we didn't allow extra memory, we would still want to keep system requests
|
||||
// in a different region group. This is because throttled requests are serviced in FIFO order,
|
||||
// and we don't want system requests to be waiting for a long time behind user requests.
|
||||
, _system_dirty_memory_manager(*this, _memtable_total_space + (10 << 20))
|
||||
, _dirty_memory_manager(*this, &_system_dirty_memory_manager, _memtable_total_space)
|
||||
, _streaming_dirty_memory_manager(*this, &_dirty_memory_manager, _streaming_memtable_total_space)
|
||||
, _version(empty_version)
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _memtables_throttler(_memtable_total_space, _dirty_memory_region_group)
|
||||
, _streaming_throttler(_streaming_memtable_total_space,
|
||||
_streaming_dirty_memory_region_group,
|
||||
&_memtables_throttler
|
||||
)
|
||||
{
|
||||
_compaction_manager.start();
|
||||
setup_collectd();
|
||||
@@ -1351,7 +1355,7 @@ database::setup_collectd() {
|
||||
, scollectd::per_cpu_plugin_instance
|
||||
, "bytes", "dirty")
|
||||
, scollectd::make_typed(scollectd::data_type::GAUGE, [this] {
|
||||
return _dirty_memory_region_group.memory_used();
|
||||
return dirty_memory_region_group().memory_used();
|
||||
})));
|
||||
|
||||
_collectd.push_back(
|
||||
@@ -1780,8 +1784,8 @@ keyspace::make_column_family_config(const schema& s) const {
|
||||
cfg.enable_cache = _config.enable_cache;
|
||||
cfg.max_memtable_size = _config.max_memtable_size;
|
||||
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
|
||||
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
|
||||
cfg.dirty_memory_manager = _config.dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
@@ -2109,14 +2113,64 @@ column_family::check_valid_rp(const db::replay_position& rp) const {
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) {
|
||||
try {
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
cf.apply(m, m_schema, rp);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
future<> dirty_memory_manager::shutdown() {
|
||||
_db_shutdown_requested = true;
|
||||
return _waiting_flush_gate.close().then([this] {
|
||||
return _region_group.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
void dirty_memory_manager::maybe_do_active_flush() {
|
||||
if (!under_pressure() || _db_shutdown_requested) {
|
||||
return;
|
||||
}
|
||||
return make_ready_future<>();
|
||||
|
||||
// Flush already ongoing. We don't need to initiate an active flush at this moment.
|
||||
if (_flush_serializer.current() != _concurrency) {
|
||||
return;
|
||||
}
|
||||
|
||||
// There are many criteria that can be used to select what is the best memtable to
|
||||
// flush. Most of the time we want some coordination with the commitlog to allow us to
|
||||
// release commitlog segments as early as we can.
|
||||
//
|
||||
// But during pressure condition, we'll just pick the CF that holds the largest
|
||||
// memtable. The advantage of doing this is that this is objectively the one that will
|
||||
// release the biggest amount of memory and is less likely to be generating tiny
|
||||
// SSTables. The disadvantage is that right now, because we only release memory when the
|
||||
// SSTable is fully written, that may take a bit of time to happen.
|
||||
//
|
||||
// However, since we'll very soon have a mechanism in place to account for the memory
|
||||
// that was already written in one form or another, that disadvantage is mitigated.
|
||||
memtable& biggest_memtable = memtable::from_region(*_region_group.get_largest_region());
|
||||
auto& biggest_cf = _db.find_column_family(biggest_memtable.schema());
|
||||
memtable_list& mtlist = get_memtable_list(biggest_cf);
|
||||
// Please note that this will eventually take the semaphore and prevent two concurrent flushes.
|
||||
// We don't need any other extra protection.
|
||||
mtlist.seal_active_memtable(memtable_list::flush_behavior::immediate);
|
||||
}
|
||||
|
||||
memtable_list& memtable_dirty_memory_manager::get_memtable_list(column_family& cf) {
|
||||
return *(cf._memtables);
|
||||
}
|
||||
|
||||
memtable_list& streaming_dirty_memory_manager::get_memtable_list(column_family& cf) {
|
||||
return *(cf._streaming_memtables);
|
||||
}
|
||||
|
||||
void dirty_memory_manager::start_reclaiming() {
|
||||
maybe_do_active_flush();
|
||||
}
|
||||
|
||||
future<> database::apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position rp) {
|
||||
return _dirty_memory_manager.region_group().run_when_memory_available([this, &m, m_schema = std::move(m_schema), rp = std::move(rp)] {
|
||||
try {
|
||||
auto& cf = find_column_family(m.column_family_id());
|
||||
cf.apply(m, m_schema, rp);
|
||||
} catch (no_such_column_family&) {
|
||||
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::do_apply(schema_ptr s, const frozen_mutation& m) {
|
||||
@@ -2148,43 +2202,11 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m) {
|
||||
return apply_in_memory(m, s, db::replay_position());
|
||||
}
|
||||
|
||||
future<> throttle_state::throttle() {
|
||||
if (!should_throttle() && _throttled_requests.empty()) {
|
||||
// All is well, go ahead
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// We must throttle, wait a bit
|
||||
if (_throttled_requests.empty()) {
|
||||
_throttling_timer.arm_periodic(10ms);
|
||||
}
|
||||
_throttled_requests.emplace_back();
|
||||
return _throttled_requests.back().get_future();
|
||||
}
|
||||
|
||||
void throttle_state::unthrottle() {
|
||||
// Release one request per free 1MB we have
|
||||
// FIXME: improve this
|
||||
if (should_throttle()) {
|
||||
return;
|
||||
}
|
||||
size_t avail = std::max((_max_space - _region_group.memory_used()) >> 20, size_t(1));
|
||||
avail = std::min(_throttled_requests.size(), avail);
|
||||
for (size_t i = 0; i < avail; ++i) {
|
||||
_throttled_requests.front().set_value();
|
||||
_throttled_requests.pop_front();
|
||||
}
|
||||
if (_throttled_requests.empty()) {
|
||||
_throttling_timer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
future<> database::apply(schema_ptr s, const frozen_mutation& m) {
|
||||
if (dblog.is_enabled(logging::log_level::trace)) {
|
||||
dblog.trace("apply {}", m.pretty_printer(s));
|
||||
}
|
||||
return _memtables_throttler.throttle().then([this, &m, s = std::move(s)] {
|
||||
return do_apply(std::move(s), m);
|
||||
}).then([this, s = _stats] {
|
||||
return do_apply(std::move(s), m).then([this, s = _stats] {
|
||||
++s->total_writes;
|
||||
});
|
||||
}
|
||||
@@ -2194,23 +2216,7 @@ future<> database::apply_streaming_mutation(schema_ptr s, const frozen_mutation&
|
||||
throw std::runtime_error(sprint("attempted to mutate using not synced schema of %s.%s, version=%s",
|
||||
s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
|
||||
// TODO (maybe): This will use the same memory region group as memtables, so when
|
||||
// one of them throttles, both will.
|
||||
//
|
||||
// It would be possible to provide further QoS for CQL originated memtables
|
||||
// by keeping the streaming memtables into a different region group, with its own
|
||||
// separate limit.
|
||||
//
|
||||
// Because, however, there are many other limits in play that may kick in,
|
||||
// I am not convinced that this will ever be a problem.
|
||||
//
|
||||
// If we do find ourselves in the situation that we are throttling incoming
|
||||
// writes due to high level of streaming writes, and we are sure that this
|
||||
// is the best solution, we can just change the memtable creation method so
|
||||
// that each kind of memtable creates from a different region group - and then
|
||||
// update the throttle conditions accordingly.
|
||||
return _streaming_throttler.throttle().then([this, &m, s = std::move(s)] {
|
||||
return _streaming_dirty_memory_manager.region_group().run_when_memory_available([this, &m, s = std::move(s)] {
|
||||
auto uuid = m.column_family_id();
|
||||
auto& cf = find_column_family(uuid);
|
||||
cf.apply_streaming_mutation(s, std::move(m));
|
||||
@@ -2242,8 +2248,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
// All writes should go to the main memtable list if we're not durable
|
||||
cfg.max_streaming_memtable_size = 0;
|
||||
}
|
||||
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
|
||||
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
|
||||
cfg.dirty_memory_manager = &_dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = &_streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
|
||||
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
|
||||
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
|
||||
@@ -2316,6 +2322,12 @@ database::stop() {
|
||||
return parallel_for_each(_column_families, [this] (auto& val_pair) {
|
||||
return val_pair.second->stop();
|
||||
});
|
||||
}).then([this] {
|
||||
return _system_dirty_memory_manager.shutdown();
|
||||
}).then([this] {
|
||||
return _dirty_memory_manager.shutdown();
|
||||
}).then([this] {
|
||||
return _streaming_dirty_memory_manager.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
162
database.hh
162
database.hh
@@ -99,35 +99,93 @@ void make(database& db, bool durable, bool volatile_testing_only);
|
||||
}
|
||||
}
|
||||
|
||||
class throttle_state {
|
||||
size_t _max_space;
|
||||
logalloc::region_group& _region_group;
|
||||
throttle_state* _parent;
|
||||
class replay_position_reordered_exception : public std::exception {};
|
||||
|
||||
circular_buffer<promise<>> _throttled_requests;
|
||||
timer<> _throttling_timer{[this] { unthrottle(); }};
|
||||
void unthrottle();
|
||||
bool should_throttle() const {
|
||||
if (_region_group.memory_used() > _max_space) {
|
||||
return true;
|
||||
}
|
||||
if (_parent) {
|
||||
return _parent->should_throttle();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
using shared_memtable = lw_shared_ptr<memtable>;
|
||||
class memtable_list;
|
||||
|
||||
class dirty_memory_manager: public logalloc::region_group_reclaimer {
|
||||
// We need a separate boolean, because from the LSA point of view, pressure may still be
|
||||
// mounting, in which case the pressure flag could be set back on if we force it off.
|
||||
bool _db_shutdown_requested = false;
|
||||
|
||||
database& _db;
|
||||
logalloc::region_group _region_group;
|
||||
|
||||
// We would like to serialize the flushing of memtables. While flushing many memtables
|
||||
// simultaneously can sustain high levels of throughput, the memory is not freed until the
|
||||
// memtable is totally gone. That means that if we have throttled requests, they will stay
|
||||
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
|
||||
// estimate, and we can't release requests that early.
|
||||
//
|
||||
// Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
|
||||
// would take care of the rest. But that still has issues, so we'll limit parallelism to some
|
||||
// number (4), that we will hopefully reduce to 1 when write behind works.
|
||||
//
|
||||
// When streaming is going on, we'll separate half of that for the streaming code, which
|
||||
// effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O
|
||||
// Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have
|
||||
// to do for the moment. Hopefully we can set both to 1 soon (with write behind)
|
||||
//
|
||||
// FIXME: enable write behind and set both to 1. Right now we will take advantage of the fact
|
||||
// that memtables and streaming will use different specialized classes here and set them as
|
||||
// default values here.
|
||||
size_t _concurrency;
|
||||
semaphore _flush_serializer;
|
||||
|
||||
seastar::gate _waiting_flush_gate;
|
||||
std::vector<shared_memtable> _pending_flushes;
|
||||
void maybe_do_active_flush();
|
||||
protected:
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) = 0;
|
||||
virtual void start_reclaiming() override;
|
||||
public:
|
||||
throttle_state(size_t max_space, logalloc::region_group& region, throttle_state* parent = nullptr)
|
||||
: _max_space(max_space)
|
||||
, _region_group(region)
|
||||
, _parent(parent)
|
||||
{}
|
||||
future<> shutdown();
|
||||
dirty_memory_manager(database& db, size_t threshold, size_t concurrency)
|
||||
: logalloc::region_group_reclaimer(threshold)
|
||||
, _db(db)
|
||||
, _region_group(*this)
|
||||
, _concurrency(concurrency)
|
||||
, _flush_serializer(concurrency) {}
|
||||
|
||||
future<> throttle();
|
||||
dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold, size_t concurrency)
|
||||
: logalloc::region_group_reclaimer(threshold)
|
||||
, _db(db)
|
||||
, _region_group(&parent->_region_group, *this)
|
||||
, _concurrency(concurrency)
|
||||
, _flush_serializer(concurrency) {}
|
||||
logalloc::region_group& region_group() {
|
||||
return _region_group;
|
||||
}
|
||||
|
||||
const logalloc::region_group& region_group() const {
|
||||
return _region_group;
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
future<> serialize_flush(Func&& func) {
|
||||
return seastar::with_gate(_waiting_flush_gate, [this, func] () mutable {
|
||||
return with_semaphore(_flush_serializer, 1, func).finally([this] {
|
||||
maybe_do_active_flush();
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
class streaming_dirty_memory_manager: public dirty_memory_manager {
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) override;
|
||||
public:
|
||||
streaming_dirty_memory_manager(database& db, dirty_memory_manager *parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 2) {}
|
||||
};
|
||||
|
||||
class replay_position_reordered_exception : public std::exception {};
|
||||
class memtable_dirty_memory_manager: public dirty_memory_manager {
|
||||
virtual memtable_list& get_memtable_list(column_family& cf) override;
|
||||
public:
|
||||
memtable_dirty_memory_manager(database& db, dirty_memory_manager* parent, size_t threshold) : dirty_memory_manager(db, parent, threshold, 4) {}
|
||||
// This constructor will be called for the system tables (no parent). Its flushes are usually drive by us
|
||||
// and not the user, and tend to be small in size. So we'll allow only two slots.
|
||||
memtable_dirty_memory_manager(database& db, size_t threshold) : dirty_memory_manager(db, threshold, 2) {}
|
||||
};
|
||||
|
||||
// We could just add all memtables, regardless of types, to a single list, and
|
||||
// then filter them out when we read them. Here's why I have chosen not to do
|
||||
@@ -151,21 +209,18 @@ class memtable_list {
|
||||
public:
|
||||
enum class flush_behavior { delayed, immediate };
|
||||
private:
|
||||
using shared_memtable = lw_shared_ptr<memtable>;
|
||||
std::vector<shared_memtable> _memtables;
|
||||
std::function<future<> (flush_behavior)> _seal_fn;
|
||||
std::function<schema_ptr()> _current_schema;
|
||||
size_t _max_memtable_size;
|
||||
logalloc::region_group* _dirty_memory_region_group;
|
||||
semaphore& _region_group_serializer;
|
||||
dirty_memory_manager* _dirty_memory_manager;
|
||||
public:
|
||||
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, logalloc::region_group* region_group, semaphore& sem)
|
||||
memtable_list(std::function<future<> (flush_behavior)> seal_fn, std::function<schema_ptr()> cs, size_t max_memtable_size, dirty_memory_manager* dirty_memory_manager)
|
||||
: _memtables({})
|
||||
, _seal_fn(seal_fn)
|
||||
, _current_schema(cs)
|
||||
, _max_memtable_size(max_memtable_size)
|
||||
, _dirty_memory_region_group(region_group)
|
||||
, _region_group_serializer(sem) {
|
||||
, _dirty_memory_manager(dirty_memory_manager) {
|
||||
add_memtable();
|
||||
}
|
||||
|
||||
@@ -191,11 +246,7 @@ public:
|
||||
if (behavior == flush_behavior::delayed) {
|
||||
return _seal_fn(behavior);
|
||||
}
|
||||
return _region_group_serializer.wait().then([this] {
|
||||
return _seal_fn(flush_behavior::immediate);
|
||||
}).finally([this] {
|
||||
_region_group_serializer.signal();
|
||||
});
|
||||
return _dirty_memory_manager->serialize_flush([this] { return _seal_fn(flush_behavior::immediate); });
|
||||
}
|
||||
|
||||
auto begin() noexcept {
|
||||
@@ -235,7 +286,7 @@ public:
|
||||
}
|
||||
private:
|
||||
lw_shared_ptr<memtable> new_memtable() {
|
||||
return make_lw_shared<memtable>(_current_schema(), _dirty_memory_region_group);
|
||||
return make_lw_shared<memtable>(_current_schema(), &(_dirty_memory_manager->region_group()));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -262,8 +313,8 @@ public:
|
||||
bool enable_incremental_backups = false;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
::dirty_memory_manager* dirty_memory_manager = nullptr;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
@@ -296,24 +347,6 @@ private:
|
||||
config _config;
|
||||
stats _stats;
|
||||
|
||||
// We would like to serialize the flushing of memtables. While flushing many memtables
|
||||
// simultaneously can sustain high levels of throughput, the memory is not freed until the
|
||||
// memtable is totally gone. That means that if we have throttled requests, they will stay
|
||||
// throttled for a long time. Even when we have virtual dirty, that only provides a rough
|
||||
// estimate, and we can't release requests that early.
|
||||
//
|
||||
// Ideally, we'd allow one memtable flush per shard (or per database object), and write-behind
|
||||
// would take care of the rest. But that still has issues, so we'll limit parallelism to some
|
||||
// number (4), that we will hopefully reduce to 1 when write behind works.
|
||||
//
|
||||
// When streaming is going on, we'll separate half of that for the streaming code, which
|
||||
// effectively increases the total to 6. That is a bit ugly and a bit redundant with the I/O
|
||||
// Scheduler, but it's the easiest way not to hurt the common case (no streaming) and will have
|
||||
// to do for the moment. Hopefully we can set both to 1 soon (with write behind)
|
||||
//
|
||||
// FIXME: enable write behind and set both to 1.
|
||||
semaphore _memtables_serializer = { 4 };
|
||||
semaphore _streaming_serializer = { 2 };
|
||||
lw_shared_ptr<memtable_list> _memtables;
|
||||
|
||||
// In older incarnations, we simply commited the mutations to memtables.
|
||||
@@ -335,6 +368,9 @@ private:
|
||||
// server.
|
||||
lw_shared_ptr<memtable_list> _streaming_memtables;
|
||||
|
||||
friend class memtable_dirty_memory_manager;
|
||||
friend class streaming_dirty_memory_manager;
|
||||
|
||||
lw_shared_ptr<memtable_list> make_memory_only_memtable_list();
|
||||
lw_shared_ptr<memtable_list> make_memtable_list();
|
||||
lw_shared_ptr<memtable_list> make_streaming_memtable_list();
|
||||
@@ -793,8 +829,8 @@ public:
|
||||
bool enable_incremental_backups = false;
|
||||
size_t max_memtable_size = 5'000'000;
|
||||
size_t max_streaming_memtable_size = 5'000'000;
|
||||
logalloc::region_group* dirty_memory_region_group = nullptr;
|
||||
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
|
||||
::dirty_memory_manager* dirty_memory_manager = nullptr;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = nullptr;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
@@ -888,8 +924,9 @@ class database {
|
||||
std::unique_ptr<db::config> _cfg;
|
||||
size_t _memtable_total_space = 500 << 20;
|
||||
size_t _streaming_memtable_total_space = 500 << 20;
|
||||
logalloc::region_group _dirty_memory_region_group;
|
||||
logalloc::region_group _streaming_dirty_memory_region_group;
|
||||
memtable_dirty_memory_manager _system_dirty_memory_manager;
|
||||
memtable_dirty_memory_manager _dirty_memory_manager;
|
||||
streaming_dirty_memory_manager _streaming_dirty_memory_manager;
|
||||
semaphore _read_concurrency_sem{max_concurrent_reads()};
|
||||
restricted_mutation_reader_config _read_concurrency_config;
|
||||
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
|
||||
@@ -906,7 +943,7 @@ class database {
|
||||
bool _enable_incremental_backups = false;
|
||||
|
||||
future<> init_commitlog();
|
||||
future<> apply_in_memory(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position&);
|
||||
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::replay_position);
|
||||
future<> populate(sstring datadir);
|
||||
future<> populate_keyspace(sstring datadir, sstring ks_name);
|
||||
|
||||
@@ -918,9 +955,6 @@ private:
|
||||
friend void db::system_keyspace::make(database& db, bool durable, bool volatile_testing_only);
|
||||
void setup_collectd();
|
||||
|
||||
throttle_state _memtables_throttler;
|
||||
throttle_state _streaming_throttler;
|
||||
|
||||
future<> do_apply(schema_ptr, const frozen_mutation&);
|
||||
public:
|
||||
static utils::UUID empty_version;
|
||||
@@ -1033,7 +1067,7 @@ public:
|
||||
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func);
|
||||
|
||||
const logalloc::region_group& dirty_memory_region_group() const {
|
||||
return _dirty_memory_region_group;
|
||||
return _dirty_memory_manager.region_group();
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
|
||||
@@ -1035,6 +1035,8 @@ void make(database& db, bool durable, bool volatile_testing_only) {
|
||||
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
|
||||
kscfg.read_concurrency_config.timeout = {};
|
||||
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
|
||||
// don't make system keyspace writes wait for user writes (if under pressure)
|
||||
kscfg.dirty_memory_manager = &db._system_dirty_memory_manager;
|
||||
keyspace _ks{ksm, std::move(kscfg)};
|
||||
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
|
||||
_ks.set_replication_strategy(std::move(rs));
|
||||
|
||||
32
memtable.cc
32
memtable.cc
@@ -26,20 +26,20 @@
|
||||
namespace stdx = std::experimental;
|
||||
|
||||
memtable::memtable(schema_ptr schema, logalloc::region_group* dirty_memory_region_group)
|
||||
: _schema(std::move(schema))
|
||||
, _region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
|
||||
: logalloc::region(dirty_memory_region_group ? logalloc::region(*dirty_memory_region_group) : logalloc::region())
|
||||
, _schema(std::move(schema))
|
||||
, partitions(memtable_entry::compare(_schema)) {
|
||||
}
|
||||
|
||||
memtable::~memtable() {
|
||||
with_allocator(_region.allocator(), [this] {
|
||||
with_allocator(allocator(), [this] {
|
||||
partitions.clear_and_dispose(current_deleter<memtable_entry>());
|
||||
});
|
||||
}
|
||||
|
||||
partition_entry&
|
||||
memtable::find_or_create_partition_slow(partition_key_view key) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
assert(!reclaiming_enabled());
|
||||
|
||||
// FIXME: Perform lookup using std::pair<token, partition_key_view>
|
||||
// to avoid unconditional copy of the partition key.
|
||||
@@ -59,7 +59,7 @@ memtable::find_or_create_partition_slow(partition_key_view key) {
|
||||
|
||||
partition_entry&
|
||||
memtable::find_or_create_partition(const dht::decorated_key& key) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
assert(!reclaiming_enabled());
|
||||
|
||||
// call lower_bound so we have a hint for the insert, just in case.
|
||||
auto i = partitions.lower_bound(key, memtable_entry::compare(_schema));
|
||||
@@ -127,7 +127,7 @@ private:
|
||||
}
|
||||
void update_iterators() {
|
||||
// We must be prepared that iterators may get invalidated during compaction.
|
||||
auto current_reclaim_counter = _memtable->_region.reclaim_counter();
|
||||
auto current_reclaim_counter = _memtable->reclaim_counter();
|
||||
auto cmp = memtable_entry::compare(_memtable->_schema);
|
||||
if (_last) {
|
||||
if (current_reclaim_counter != _last_reclaim_counter ||
|
||||
@@ -177,7 +177,7 @@ public:
|
||||
return _delegate();
|
||||
}
|
||||
|
||||
logalloc::reclaim_lock _(_memtable->_region);
|
||||
logalloc::reclaim_lock _(*_memtable);
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
update_iterators();
|
||||
if (_i == _end) {
|
||||
@@ -202,7 +202,7 @@ memtable::make_reader(schema_ptr s,
|
||||
|
||||
if (query::is_single_partition(range)) {
|
||||
const query::ring_position& pos = range.start()->value();
|
||||
return _read_section(_region, [&] {
|
||||
return _read_section(*this, [&] {
|
||||
managed_bytes::linearization_context_guard lcg;
|
||||
auto i = partitions.find(pos, memtable_entry::compare(_schema));
|
||||
if (i != partitions.end()) {
|
||||
@@ -236,8 +236,8 @@ memtable::apply(memtable& mt) {
|
||||
|
||||
void
|
||||
memtable::apply(const mutation& m, const db::replay_position& rp) {
|
||||
with_allocator(_region.allocator(), [this, &m] {
|
||||
_allocating_section(_region, [&, this] {
|
||||
with_allocator(allocator(), [this, &m] {
|
||||
_allocating_section(*this, [&, this] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto& p = find_or_create_partition(m.decorated_key());
|
||||
p.apply(*_schema, m.partition(), *m.schema());
|
||||
@@ -249,8 +249,8 @@ memtable::apply(const mutation& m, const db::replay_position& rp) {
|
||||
|
||||
void
|
||||
memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& rp) {
|
||||
with_allocator(_region.allocator(), [this, &m, &m_schema] {
|
||||
_allocating_section(_region, [&, this] {
|
||||
with_allocator(allocator(), [this, &m, &m_schema] {
|
||||
_allocating_section(*this, [&, this] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
auto& p = find_or_create_partition_slow(m.key(*_schema));
|
||||
p.apply(*_schema, m.partition(), *m_schema);
|
||||
@@ -261,7 +261,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::
|
||||
}
|
||||
|
||||
logalloc::occupancy_stats memtable::occupancy() const {
|
||||
return _region.occupancy();
|
||||
return logalloc::region::occupancy();
|
||||
}
|
||||
|
||||
mutation_source memtable::as_data_source() {
|
||||
@@ -308,13 +308,13 @@ memtable_entry::read(lw_shared_ptr<memtable> mtbl, const schema_ptr& target_sche
|
||||
}
|
||||
auto& cr = ck_filtering.get_ranges(_key.key());
|
||||
auto snp = _pe.read(_schema);
|
||||
return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, mtbl->_region, mtbl->_read_section, mtbl);
|
||||
return make_partition_snapshot_reader(_schema, _key, ck_filtering, cr, snp, *mtbl, mtbl->_read_section, mtbl);
|
||||
}
|
||||
|
||||
void memtable::upgrade_entry(memtable_entry& e) {
|
||||
if (e._schema != _schema) {
|
||||
assert(!_region.reclaiming_enabled());
|
||||
with_allocator(_region.allocator(), [this, &e] {
|
||||
assert(!reclaiming_enabled());
|
||||
with_allocator(allocator(), [this, &e] {
|
||||
with_linearized_managed_bytes([&] {
|
||||
e.partition().upgrade(e._schema, _schema);
|
||||
e._schema = _schema;
|
||||
|
||||
10
memtable.hh
10
memtable.hh
@@ -91,7 +91,7 @@ public:
|
||||
};
|
||||
|
||||
// Managed by lw_shared_ptr<>.
|
||||
class memtable final : public enable_lw_shared_from_this<memtable> {
|
||||
class memtable final : public enable_lw_shared_from_this<memtable>, private logalloc::region {
|
||||
public:
|
||||
using partitions_type = bi::set<memtable_entry,
|
||||
bi::member_hook<memtable_entry, bi::set_member_hook<>, &memtable_entry::_link>,
|
||||
@@ -99,7 +99,6 @@ public:
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
logalloc::allocating_section _read_section;
|
||||
mutable logalloc::region _region;
|
||||
logalloc::allocating_section _allocating_section;
|
||||
partitions_type partitions;
|
||||
db::replay_position _replay_position;
|
||||
@@ -123,8 +122,13 @@ public:
|
||||
void apply(const mutation& m, const db::replay_position& = db::replay_position());
|
||||
// The mutation is upgraded to current schema.
|
||||
void apply(const frozen_mutation& m, const schema_ptr& m_schema, const db::replay_position& = db::replay_position());
|
||||
|
||||
static memtable& from_region(logalloc::region& r) {
|
||||
return static_cast<memtable&>(r);
|
||||
}
|
||||
|
||||
const logalloc::region& region() const {
|
||||
return _region;
|
||||
return *this;
|
||||
}
|
||||
public:
|
||||
size_t partition_count() const;
|
||||
|
||||
@@ -698,7 +698,7 @@ future<> row_cache::clear() {
|
||||
}
|
||||
|
||||
future<> row_cache::update(memtable& m, partition_presence_checker presence_checker) {
|
||||
_tracker.region().merge(m._region); // Now all data in memtable belongs to cache
|
||||
_tracker.region().merge(m); // Now all data in memtable belongs to cache
|
||||
auto attr = seastar::thread_attributes();
|
||||
attr.scheduling_group = &_update_thread_scheduling_group;
|
||||
auto t = seastar::thread(attr, [this, &m, presence_checker = std::move(presence_checker)] {
|
||||
|
||||
Reference in New Issue
Block a user