prevent commitlog replay position reordering during reserve refill
When requests hit the commitlog, each of them will be assigned a replay
position, which we expect to be ordered. If reorders happen, the request
will be discarded and re-applied. Although this is supposed to be rare,
it does increase our latencies, specially when big requests are
involved. Processing big requests is expensive and if we have to do it
twice that adds to the cost.
The commitlog is supposed to issue replay positions in order, and it
coudl be that the code that adds them to the memtables will reorder
them. However, there is one instance in which the commitlog will not
keep its side of the bargain.
That happens when the reserve is exhausted, and we are allocating a
segment directly at the same time the reserve is being replenished. The
following sequence of events with its deferring points will ilustrate
it:
on_timer:
return this->allocate_segment(false). // defer here // then([this](sseg_ptr s) {
At this point, the segment id is already allocated.
new_segment():
if (_reserve_segments.empty()) {
[ ... ]
return allocate_segment(true).then ...
At this point, we have a new segment that has an id that is higher than
the previous id allocated.
Then we resume the execution from the deferring point in on_timer():
i = _reserve_segments.emplace(i, std::move(s));
The next time we need to allocate a segment, we'll pick it from the
reserve. But the segment in the reserve has an id that is lower than the
id that we have already used.
Reorders are bad, but this one is particularly bad: because the reorder
happens with the segment id side of the replay position, that means that
every request that falls into that segment will have to be reinserted.
This bug can be a bit tricky to reproduce. To make it more common, we
can artificially add a sleep() fiber after the allocate_segment(false)
in on_timer(). If we do that, we'll see a sea of reinsertions going on
in the logs (if dblog is set to debug).
Applying this patch (keeping the sleep) will make them all disappear.
We do this by rewriting the reserve logic, so that the segments always
come from the reserve. If we draw from a single pool all the time, there
is no chance of reordering happening. To make that more amenable, we'll
have the reserve filler always running in the background and take it out
of the timer code.
Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <49eb7edfcafaef7f1fdceb270639a9a8b50cfce7.1480531446.git.glauber@scylladb.com>
(cherry picked from commit 99a5a77234)
This commit is contained in:
committed by
Tomasz Grabiec
parent
0bce019781
commit
abe7358767
@@ -58,6 +58,8 @@
|
||||
#include <core/fstream.hh>
|
||||
#include <seastar/core/memory.hh>
|
||||
#include <seastar/core/chunked_fifo.hh>
|
||||
#include <seastar/core/queue.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <net/byteorder.hh>
|
||||
|
||||
#include "commitlog.hh"
|
||||
@@ -78,6 +80,8 @@
|
||||
|
||||
static logging::logger logger("commitlog");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class crc32_nbo {
|
||||
crc32 _c;
|
||||
public:
|
||||
@@ -164,6 +168,7 @@ public:
|
||||
const uint64_t max_disk_size; // per-shard
|
||||
|
||||
bool _shutdown = false;
|
||||
std::experimental::optional<shared_promise<>> _shutdown_promise = {};
|
||||
|
||||
semaphore _new_segment_semaphore {1};
|
||||
semaphore _flush_semaphore;
|
||||
@@ -252,7 +257,7 @@ public:
|
||||
|
||||
scollectd::registrations create_counters();
|
||||
|
||||
void orphan_all();
|
||||
future<> orphan_all();
|
||||
|
||||
void discard_unused_segments();
|
||||
void discard_completed_segments(const cf_id_type& id,
|
||||
@@ -288,22 +293,19 @@ public:
|
||||
void flush_segments(bool = false);
|
||||
|
||||
private:
|
||||
future<> clear_reserve_segments();
|
||||
|
||||
size_t max_request_controller_units() const;
|
||||
segment_id_type _ids = 0;
|
||||
std::vector<sseg_ptr> _segments;
|
||||
std::deque<sseg_ptr> _reserve_segments;
|
||||
queue<sseg_ptr> _reserve_segments;
|
||||
std::vector<buffer_type> _temp_buffers;
|
||||
std::unordered_map<flush_handler_id, flush_handler> _flush_handlers;
|
||||
flush_handler_id _flush_ids = 0;
|
||||
replay_position _flush_position;
|
||||
timer<clock_type> _timer;
|
||||
size_t _reserve_allocating = 0;
|
||||
// # segments to try to keep available in reserve
|
||||
// i.e. the amount of segments we expect to consume inbetween timer
|
||||
// callbacks.
|
||||
// The idea is that since the files are 0 len at start, and thus cost little,
|
||||
// it is easier to adapt this value compared to timer freq.
|
||||
size_t _num_reserve_segments = 0;
|
||||
future<> replenish_reserve();
|
||||
future<> _reserve_replenisher;
|
||||
seastar::gate _gate;
|
||||
uint64_t _new_counter = 0;
|
||||
};
|
||||
@@ -913,6 +915,8 @@ db::commitlog::segment_manager::segment_manager(config c)
|
||||
// than default_size at the end of the allocation, that allows for every valid mutation to
|
||||
// always be admitted for processing.
|
||||
, _request_controller(max_request_controller_units())
|
||||
, _reserve_segments(1)
|
||||
, _reserve_replenisher(make_ready_future<>())
|
||||
{
|
||||
assert(max_size > 0);
|
||||
|
||||
@@ -927,6 +931,28 @@ size_t db::commitlog::segment_manager::max_request_controller_units() const {
|
||||
return max_mutation_size + db::commitlog::segment::default_size;
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::replenish_reserve() {
|
||||
return do_until([this] { return _shutdown; }, [this] {
|
||||
return _reserve_segments.not_full().then([this] {
|
||||
if (_shutdown) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return with_gate(_gate, [this] {
|
||||
return this->allocate_segment(false).then([this](sseg_ptr s) {
|
||||
auto ret = _reserve_segments.push(std::move(s));
|
||||
if (!ret) {
|
||||
logger.error("Segment reserve is full! Ignoring and trying to continue, but shouldn't happen");
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).handle_exception([](std::exception_ptr ep) {
|
||||
logger.warn("Exception in segment reservation: {}", ep);
|
||||
return sleep(100ms);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<db::commitlog::descriptor>>
|
||||
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
|
||||
struct helper {
|
||||
@@ -997,6 +1023,9 @@ future<> db::commitlog::segment_manager::init() {
|
||||
_timer.set_callback(std::bind(&segment_manager::on_timer, this));
|
||||
auto delay = engine().cpu_id() * std::ceil(double(cfg.commitlog_sync_period_in_ms) / smp::count);
|
||||
logger.trace("Delaying timer loop {} ms", delay);
|
||||
// We need to wait until we have scanned all other segments to actually start serving new
|
||||
// segments. We are ready now
|
||||
this->_reserve_replenisher = replenish_reserve();
|
||||
this->arm(delay);
|
||||
});
|
||||
}
|
||||
@@ -1144,22 +1173,15 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
|
||||
++_new_counter;
|
||||
|
||||
if (_reserve_segments.empty()) {
|
||||
if (_num_reserve_segments < cfg.max_reserve_segments) {
|
||||
++_num_reserve_segments;
|
||||
logger.trace("Increased segment reserve count to {}", _num_reserve_segments);
|
||||
}
|
||||
return allocate_segment(true).then([this](sseg_ptr s) {
|
||||
_segments.push_back(s);
|
||||
return make_ready_future<sseg_ptr>(s);
|
||||
});
|
||||
if (_reserve_segments.empty() && (_reserve_segments.max_size() < cfg.max_reserve_segments)) {
|
||||
_reserve_segments.set_max_size(_reserve_segments.max_size() + 1);
|
||||
logger.debug("Increased segment reserve count to {}", _reserve_segments.max_size());
|
||||
}
|
||||
|
||||
_segments.push_back(_reserve_segments.front());
|
||||
_reserve_segments.pop_front();
|
||||
_segments.back()->reset_sync_time();
|
||||
logger.trace("Acquired segment {} from reserve", _segments.back());
|
||||
return make_ready_future<sseg_ptr>(_segments.back());
|
||||
return _reserve_segments.pop_eventually().then([this] (auto s) {
|
||||
_segments.push_back(std::move(s));
|
||||
_segments.back()->reset_sync_time();
|
||||
return make_ready_future<sseg_ptr>(_segments.back());
|
||||
});
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::active_segment() {
|
||||
@@ -1227,6 +1249,15 @@ void db::commitlog::segment_manager::discard_unused_segments() {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: pop() will call unlink -> sleeping in reactor thread.
|
||||
// Not urgent since mostly called during shutdown, but have to fix.
|
||||
future<> db::commitlog::segment_manager::clear_reserve_segments() {
|
||||
while (!_reserve_segments.empty()) {
|
||||
_reserve_segments.pop();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
|
||||
logger.debug("Issuing sync for all segments");
|
||||
return parallel_for_each(_segments, [this, shutdown](sseg_ptr s) {
|
||||
@@ -1237,7 +1268,9 @@ future<> db::commitlog::segment_manager::sync_all_segments(bool shutdown) {
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::shutdown() {
|
||||
if (!_shutdown) {
|
||||
if (!_shutdown_promise) {
|
||||
_shutdown_promise = shared_promise<>();
|
||||
|
||||
// Wait for all pending requests to finish. Need to sync first because segments that are
|
||||
// alive may be holding semaphore permits.
|
||||
auto block_new_requests = get_units(_request_controller, max_request_controller_units());
|
||||
@@ -1249,14 +1282,26 @@ future<> db::commitlog::segment_manager::shutdown() {
|
||||
// segments, flushing out any remaining data.
|
||||
return _gate.close().then(std::bind(&segment_manager::sync_all_segments, this, true));
|
||||
});
|
||||
}).finally([this] {
|
||||
// Now that the gate is closed and requests completed we are sure nobody else will pop()
|
||||
return clear_reserve_segments().finally([this] {
|
||||
return std::move(_reserve_replenisher).then_wrapped([this] (auto f) {
|
||||
// Could be cleaner with proper seastar support
|
||||
if (f.failed()) {
|
||||
_shutdown_promise->set_exception(f.get_exception());
|
||||
} else {
|
||||
_shutdown_promise->set_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return _shutdown_promise->get_shared_future();
|
||||
}
|
||||
|
||||
void db::commitlog::segment_manager::orphan_all() {
|
||||
future<> db::commitlog::segment_manager::orphan_all() {
|
||||
_segments.clear();
|
||||
_reserve_segments.clear();
|
||||
return clear_reserve_segments();
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1271,7 +1316,7 @@ future<> db::commitlog::segment_manager::clear() {
|
||||
for (auto& s : _segments) {
|
||||
s->mark_clean();
|
||||
}
|
||||
orphan_all();
|
||||
return orphan_all();
|
||||
});
|
||||
}
|
||||
/**
|
||||
@@ -1302,37 +1347,7 @@ void db::commitlog::segment_manager::on_timer() {
|
||||
flush_segments();
|
||||
}
|
||||
}
|
||||
// take outstanding allocations into regard. This is paranoid,
|
||||
// but if for some reason the file::open takes longer than timer period,
|
||||
// we could flood the reserve list with new segments
|
||||
//
|
||||
// #482 - _reserve_allocating is decremented in the finally clause below.
|
||||
// This is needed because if either allocate_segment _or_ emplacing into
|
||||
// _reserve_segments should throw, we still need the counter reset
|
||||
// However, because of this, it might be that emplace was done, but not decrement,
|
||||
// when we get here again. So occasionally we might get a sum of the two that is
|
||||
// not consistent. It should however always just potentially be _to much_, i.e.
|
||||
// just an indicator that we don't need to do anything. So lets do that.
|
||||
auto n = std::min(_reserve_segments.size() + _reserve_allocating, _num_reserve_segments);
|
||||
return parallel_for_each(boost::irange(n, _num_reserve_segments), [this, n](auto i) {
|
||||
++_reserve_allocating;
|
||||
return this->allocate_segment(false).then([this](sseg_ptr s) {
|
||||
if (!_shutdown) {
|
||||
// insertion sort.
|
||||
auto i = std::upper_bound(_reserve_segments.begin(), _reserve_segments.end(), s, [](sseg_ptr s1, sseg_ptr s2) {
|
||||
const descriptor& d1 = s1->_desc;
|
||||
const descriptor& d2 = s2->_desc;
|
||||
return d1.id < d2.id;
|
||||
});
|
||||
i = _reserve_segments.emplace(i, std::move(s));
|
||||
logger.trace("Added reserve segment {}", *i);
|
||||
}
|
||||
}).finally([this] {
|
||||
--_reserve_allocating;
|
||||
});
|
||||
});
|
||||
}).handle_exception([](std::exception_ptr ep) {
|
||||
logger.warn("Exception in segment reservation: {}", ep);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
arm();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user