commitlog: Add write/flush limits

Configured on start (for now - and dummy values at that). 
When shard write/flush count reaches limit, and incoming ops will queue
until previous ones finish. 

Consequently, if an allocation op forces a write, which blocks, any 
other incoming allocations will also queue up to provide back pressure.
This commit is contained in:
Calle Wilund
2016-01-26 09:33:51 +00:00
parent 7628a4dfe0
commit f2c5315d33
2 changed files with 206 additions and 101 deletions

View File

@@ -157,6 +157,9 @@ public:
bool _shutdown = false;
semaphore _new_segment_semaphore;
semaphore _write_semaphore;
semaphore _flush_semaphore;
scollectd::registrations _regs;
// TODO: verify that we're ok with not-so-great granularity
@@ -174,6 +177,9 @@ public:
uint64_t segments_destroyed = 0;
uint64_t pending_writes = 0;
uint64_t pending_flushes = 0;
uint64_t pending_allocations = 0;
uint64_t write_limit_exceeded = 0;
uint64_t flush_limit_exceeded = 0;
uint64_t total_size = 0;
uint64_t buffer_list_bytes = 0;
uint64_t total_size_on_disk = 0;
@@ -181,54 +187,73 @@ public:
stats totals;
void begin_write() {
future<> begin_write() {
_gate.enter();
++totals.pending_writes;
++totals.pending_writes; // redundant, given semaphore. but easier to read
if (totals.pending_writes >= cfg.max_active_writes) {
++totals.write_limit_exceeded;
logger.trace("Write ops overflow: {}. Will block.", totals.pending_writes);
}
return _write_semaphore.wait();
}
void end_write() {
_write_semaphore.signal();
--totals.pending_writes;
_gate.leave();
}
void begin_flush() {
future<> begin_flush() {
_gate.enter();
++totals.pending_flushes;
if (totals.pending_flushes >= cfg.max_active_flushes) {
++totals.flush_limit_exceeded;
logger.trace("Flush ops overflow: {}. Will block.", totals.pending_flushes);
}
return _flush_semaphore.wait();
}
void end_flush() {
_flush_semaphore.signal();
--totals.pending_flushes;
_gate.leave();
}
bool should_wait_for_write() const {
return _write_semaphore.waiters() > 0 || _flush_semaphore.waiters() > 0;
}
segment_manager(config c)
: cfg(c), max_size(
std::min<size_t>(std::numeric_limits<position_type>::max(),
std::max<size_t>(cfg.commitlog_segment_size_in_mb,
1) * 1024 * 1024)), max_mutation_size(
max_size >> 1), max_disk_size(
size_t(
std::ceil(
cfg.commitlog_total_space_in_mb
/ double(smp::count))) * 1024 * 1024)
: cfg([&c] {
config cfg(c);
if (cfg.commit_log_location.empty()) {
cfg.commit_log_location = "/var/lib/scylla/commitlog";
}
if (cfg.max_active_writes == 0) {
cfg.max_active_writes = // TODO: call someone to get an idea...
25 * smp::count;
}
cfg.max_active_writes = std::max(uint64_t(1), cfg.max_active_writes / smp::count);
if (cfg.max_active_flushes == 0) {
cfg.max_active_flushes = // TODO: call someone to get an idea...
5 * smp::count;
}
cfg.max_active_flushes = std::max(uint64_t(1), cfg.max_active_flushes / smp::count);
return cfg;
}())
, max_size(std::min<size_t>(std::numeric_limits<position_type>::max(), std::max<size_t>(cfg.commitlog_segment_size_in_mb, 1) * 1024 * 1024))
, max_mutation_size(max_size >> 1)
, max_disk_size(size_t(std::ceil(cfg.commitlog_total_space_in_mb / double(smp::count))) * 1024 * 1024)
, _write_semaphore(cfg.max_active_writes)
, _flush_semaphore(cfg.max_active_flushes)
{
assert(max_size > 0);
if (cfg.commit_log_location.empty()) {
cfg.commit_log_location = "/var/lib/scylla/commitlog";
}
if (cfg.max_active_writes == 0) {
cfg.max_active_writes = // TODO: call someone to get an idea...
25 * smp::count;
}
cfg.max_active_writes = std::max(uint64_t(1), cfg.max_active_writes / smp::count);
if (cfg.max_active_flushes == 0) {
cfg.max_active_flushes = // TODO: call someone to get an idea...
5 * smp::count;
}
cfg.max_active_flushes = std::max(uint64_t(1), cfg.max_active_flushes / smp::count);
logger.trace("Commitlog {} maximum disk size: {} MB / cpu ({} cpus)",
cfg.commit_log_location, max_disk_size / (1024 * 1024),
smp::count);
_regs = create_counters();
}
~segment_manager() {
@@ -367,11 +392,39 @@ class db::commitlog::segment: public enable_lw_shared_from_this<segment> {
std::unordered_map<cf_id_type, position_type> _cf_dirty;
time_point _sync_time;
seastar::gate _gate;
uint64_t _write_waiters = 0;
semaphore _queue;
std::unordered_set<table_schema_version> _known_schema_versions;
friend std::ostream& operator<<(std::ostream&, const segment&);
friend class segment_manager;
future<> begin_flush() {
// This is maintaining the semantica of only using the write-lock
// as a gate for flushing, i.e. once we've begun a flush for position X
// we are ok with writes to positions > X
return _dwrite.write_lock().then(std::bind(&segment_manager::begin_flush, _segment_manager)).finally([this] {
_dwrite.write_unlock();
});
}
void end_flush() {
_segment_manager->end_flush();
}
future<> begin_write() {
// This is maintaining the semantica of only using the write-lock
// as a gate for flushing, i.e. once we've begun a flush for position X
// we are ok with writes to positions > X
return _dwrite.read_lock().then(std::bind(&segment_manager::begin_write, _segment_manager));
}
void end_write() {
_segment_manager->end_write();
_dwrite.read_unlock();
}
public:
struct cf_mark {
const segment& s;
@@ -393,7 +446,7 @@ public:
segment(segment_manager* m, const descriptor& d, file && f, bool active)
: _segment_manager(m), _desc(std::move(d)), _file(std::move(f)), _sync_time(
clock_type::now())
clock_type::now()), _queue(0)
{
++_segment_manager->totals.segments_created;
logger.debug("Created new {} segment {}", active ? "active" : "reserve", *this);
@@ -478,15 +531,13 @@ public:
// This is not 100% neccesary, we really only need the ones below our flush pos,
// but since we pretty much assume that task ordering will make this the case anyway...
return _dwrite.write_lock().then(
return begin_flush().then(
[this, me, pos]() mutable {
_dwrite.write_unlock(); // release it already.
pos = std::max(pos, _file_pos);
if (pos <= _flush_pos) {
logger.trace("{} already synced! ({} < {})", *this, pos, _flush_pos);
return make_ready_future<sseg_ptr>(std::move(me));
}
_segment_manager->begin_flush();
return _file.flush().then_wrapped([this, pos, me](future<> f) {
try {
f.get();
@@ -500,16 +551,50 @@ public:
logger.error("Failed to flush commits to disk: {}", std::current_exception());
throw;
}
}).finally([this, me] {
_segment_manager->end_flush();
});
});
}).finally([this] {
end_flush();
});
}
/**
* Allocate a new buffer
*/
void new_buffer(size_t s) {
assert(_buffer.empty());
auto overhead = segment_overhead_size;
if (_file_pos == 0) {
overhead += descriptor_header_size;
}
auto a = align_up(s + overhead, alignment);
auto k = std::max(a, default_size);
for (;;) {
try {
_buffer = _segment_manager->acquire_buffer(k);
break;
} catch (std::bad_alloc&) {
logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024);
if (k > a) {
k = std::max(a, k / 2);
logger.debug("Trying reduced size: {} k", k / 1024);
continue;
}
throw;
}
}
_buf_pos = overhead;
auto * p = reinterpret_cast<uint32_t *>(_buffer.get_write());
std::fill(p, p + overhead, 0);
_segment_manager->totals.total_size += k;
}
/**
* Send any buffer contents to disk and get a new tmp buffer
*/
// See class comment for info
future<sseg_ptr> cycle(size_t s = 0) {
future<sseg_ptr> cycle() {
auto size = clear_buffer_slack();
auto buf = std::move(_buffer);
auto off = _file_pos;
@@ -517,36 +602,6 @@ public:
_file_pos += size;
_buf_pos = 0;
// if we need new buffer, get one.
// TODO: keep a queue of available buffers?
if (s > 0) {
auto overhead = segment_overhead_size;
if (_file_pos == 0) {
overhead += descriptor_header_size;
}
auto a = align_up(s + overhead, alignment);
auto k = std::max(a, default_size);
for (;;) {
try {
_buffer = _segment_manager->acquire_buffer(k);
break;
} catch (std::bad_alloc&) {
logger.warn("Could not allocate {} k bytes output buffer ({} k required)", k / 1024, a / 1024);
if (k > a) {
k = std::max(a, k / 2);
logger.debug("Trying reduced size: {} k", k / 1024);
continue;
}
throw;
}
}
_buf_pos = overhead;
auto * p = reinterpret_cast<uint32_t *>(_buffer.get_write());
std::fill(p, p + overhead, 0);
_segment_manager->totals.total_size += k;
}
auto me = shared_from_this();
assert(!me.owned());
@@ -586,10 +641,9 @@ public:
forget_schema_versions();
// acquire read lock
return _dwrite.read_lock().then([this, size, off, buf = std::move(buf), me]() mutable {
return begin_write().then([this, size, off, buf = std::move(buf), me]() mutable {
auto written = make_lw_shared<size_t>(0);
auto p = buf.get();
_segment_manager->begin_write();
return repeat([this, size, off, written, p]() mutable {
auto& priority_class = service::get_local_commitlog_priority();
return _file.dma_write(off + *written, p + *written, size - *written, priority_class).then_wrapped([this, size, written](future<size_t>&& f) {
@@ -616,12 +670,51 @@ public:
});
}).finally([this, buf = std::move(buf)]() mutable {
_segment_manager->release_buffer(std::move(buf));
_segment_manager->end_write();
});
}).then([me] {
return make_ready_future<sseg_ptr>(std::move(me));
}).finally([me, this]() {
_dwrite.read_unlock(); // release
end_write(); // release
});
}
future<sseg_ptr> maybe_wait_for_write(future<sseg_ptr> f) {
if (_segment_manager->should_wait_for_write()) {
++_write_waiters;
logger.trace("Too many pending writes. Must wait.");
return f.finally([this] {
if (--_write_waiters == 0) {
_queue.signal(_queue.waiters());
}
});
}
return make_ready_future<sseg_ptr>(shared_from_this());
}
/**
* If an allocation causes a write, and the write causes a block,
* any allocations post that need to wait for this to finish,
* other wise we will just continue building up more write queue
* eventually (+ loose more ordering)
*
* Some caution here, since maybe_wait_for_write actually
* releases _all_ queued up ops when finishing, we could get
* "bursts" of alloc->write, causing build-ups anyway.
* This should be measured properly. For now I am hoping this
* will work out as these should "block as a group". However,
* buffer memory usage might grow...
*/
bool must_wait_for_alloc() {
return _write_waiters > 0;
}
future<sseg_ptr> wait_for_alloc() {
auto me = shared_from_this();
++_segment_manager->totals.pending_allocations;
logger.trace("Previous allocation is blocking. Must wait.");
return _queue.wait().then([me] { // TODO: do we need a finally?
--me->_segment_manager->totals.pending_allocations;
return make_ready_future<sseg_ptr>(me);
});
}
@@ -638,23 +731,26 @@ public:
+ " bytes is too large for the maxiumum size of "
+ std::to_string(_segment_manager->max_mutation_size)));
}
// would we make the file too big?
for (;;) {
if (position() + s > _segment_manager->max_size) {
// do this in next segment instead.
return finish_and_get_new().then(
[id, writer = std::move(writer)] (sseg_ptr new_seg) mutable {
return new_seg->allocate(id, std::move(writer));
});
}
// enough data?
if (s > (_buffer.size() - _buf_pos)) {
// TODO: iff we have to many writes running, maybe we should
// wait for this?
cycle(s);
continue; // re-check file size overflow
}
break;
std::experimental::optional<future<sseg_ptr>> op;
if (must_sync()) {
op = sync();
} else if (must_wait_for_alloc()) {
op = wait_for_alloc();
} else if (position() + s > _segment_manager->max_size) { // would we make the file too big?
// do this in next segment instead.
op = finish_and_get_new();
} else if (_buffer.empty()) {
new_buffer(s);
} else if (s > (_buffer.size() - _buf_pos)) { // enough data?
op = maybe_wait_for_write(cycle());
}
if (op) {
return op->then([id, writer = std::move(writer)] (sseg_ptr new_seg) mutable {
return new_seg->allocate(id, std::move(writer));
});
}
_gate.enter(); // this might throw. I guess we accept this?
@@ -686,13 +782,6 @@ public:
_gate.leave();
// finally, check if we're required to sync.
if (must_sync()) {
return sync().then([rp](sseg_ptr seg) {
return make_ready_future<replay_position>(rp);
});
}
return make_ready_future<replay_position>(rp);
}
@@ -880,6 +969,16 @@ scollectd::registrations db::commitlog::segment_manager::create_counters() {
, per_cpu_plugin_instance, "queue_length", "pending_flushes")
, make_typed(data_type::GAUGE, totals.pending_flushes)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "write_limit_exceeded")
, make_typed(data_type::DERIVE, totals.write_limit_exceeded)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "total_operations", "flush_limit_exceeded")
, make_typed(data_type::DERIVE, totals.flush_limit_exceeded)
),
add_polled_metric(type_instance_id("commitlog"
, per_cpu_plugin_instance, "memory", "total_size")
, make_typed(data_type::GAUGE, totals.total_size)
@@ -1568,6 +1667,18 @@ uint64_t db::commitlog::get_pending_flushes() const {
return _segment_manager->totals.pending_flushes;
}
uint64_t db::commitlog::get_pending_allocations() const {
return _segment_manager->totals.pending_allocations;
}
uint64_t db::commitlog::get_write_limit_exceeded_count() const {
return _segment_manager->totals.write_limit_exceeded;
}
uint64_t db::commitlog::get_flush_limit_exceeded_count() const {
return _segment_manager->totals.flush_limit_exceeded;
}
uint64_t db::commitlog::get_num_segments_created() const {
return _segment_manager->totals.segments_created;
}