mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 12:06:44 +00:00
Merge "Flush queue ordering" from Calle
"Adds a small utility queue and through this enforces memtable flush ordering
such that a flush may _run_ unchecked, however the "post" operation may
execute once all "lower numbered" (i.e. lower replay position) post ops
has finished.
This means that:
a.) Callbacks to commitlog are now guaranteed to fulfill ordering criteria
b.) Calling column_family::flush() and waiting for the result will also
wait for any previously initiated flushes to finish. But not those
initiated _after_."
This commit is contained in:
@@ -181,6 +181,7 @@ urchin_tests = [
|
||||
'tests/logalloc_test',
|
||||
'tests/managed_vector_test',
|
||||
'tests/crc_test',
|
||||
'tests/flush_queue_test',
|
||||
]
|
||||
|
||||
apps = [
|
||||
|
||||
48
database.cc
48
database.cc
@@ -54,11 +54,15 @@
|
||||
#include "sstable_mutation_readers.hh"
|
||||
#include <core/fstream.hh>
|
||||
#include "utils/latency.hh"
|
||||
#include "utils/flush_queue.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
logging::logger dblog("database");
|
||||
|
||||
// Do this to avoid having to include or forward-declare template in our header.
|
||||
class column_family::memtable_flush_queue : public utils::flush_queue<db::replay_position> {};
|
||||
|
||||
column_family::column_family(schema_ptr schema, config config, db::commitlog& cl, compaction_manager& compaction_manager)
|
||||
: _schema(std::move(schema))
|
||||
, _config(std::move(config))
|
||||
@@ -67,6 +71,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog& cl
|
||||
, _cache(_schema, sstables_as_mutation_source(), global_cache_tracker())
|
||||
, _commitlog(&cl)
|
||||
, _compaction_manager(compaction_manager)
|
||||
, _flush_queue(std::make_unique<memtable_flush_queue>())
|
||||
{
|
||||
add_memtable();
|
||||
if (!_config.enable_disk_writes) {
|
||||
@@ -82,6 +87,7 @@ column_family::column_family(schema_ptr schema, config config, no_commitlog cl,
|
||||
, _cache(_schema, sstables_as_mutation_source(), global_cache_tracker())
|
||||
, _commitlog(nullptr)
|
||||
, _compaction_manager(compaction_manager)
|
||||
, _flush_queue(std::make_unique<memtable_flush_queue>())
|
||||
{
|
||||
add_memtable();
|
||||
if (!_config.enable_disk_writes) {
|
||||
@@ -477,8 +483,15 @@ column_family::seal_active_memtable() {
|
||||
);
|
||||
_highest_flushed_rp = old->replay_position();
|
||||
|
||||
return seastar::with_gate(_in_flight_seals, [old, this] {
|
||||
return flush_memtable_to_sstable(old);
|
||||
return _flush_queue->run_with_ordered_post_op(old->replay_position(), [old, this] {
|
||||
return repeat([this, old] {
|
||||
_flush_queue->check_open_gate();
|
||||
return try_flush_memtable_to_sstable(old);
|
||||
});
|
||||
}, [old, this] {
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
|
||||
}
|
||||
});
|
||||
// FIXME: release commit log
|
||||
// FIXME: provide back-pressure to upper layers
|
||||
@@ -530,26 +543,13 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
try {
|
||||
ret.get();
|
||||
|
||||
// FIXME: until the surrounding function returns a future and
|
||||
// caller ensures ordering (i.e. finish flushing one or more sequential tables before
|
||||
// doing the discard), this below is _not_ correct, since the use of replay_position
|
||||
// depends on us reporting the factual highest position we've actually flushed,
|
||||
// _and_ all positions (for a given UUID) below having been dealt with.
|
||||
//
|
||||
// Note that the whole scheme is also dependent on memtables being "allocated" in order,
|
||||
// i.e. we may not flush a younger memtable before and older, and we need to use the
|
||||
// highest rp.
|
||||
if (_commitlog) {
|
||||
_commitlog->discard_completed_segments(_schema->id(), old->replay_position());
|
||||
}
|
||||
_memtables->erase(boost::range::find(*_memtables, old));
|
||||
dblog.debug("Memtable replaced");
|
||||
trigger_compaction();
|
||||
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
} catch (std::exception& e) {
|
||||
dblog.error("failed to write sstable: {}", e.what());
|
||||
} catch (...) {
|
||||
dblog.error("failed to write sstable: unknown error");
|
||||
dblog.error("failed to write sstable: {}", std::current_exception());
|
||||
}
|
||||
return sleep(10s).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
@@ -557,15 +557,6 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
column_family::flush_memtable_to_sstable(lw_shared_ptr<memtable> memt) {
|
||||
return repeat([this, memt] {
|
||||
return seastar::with_gate(_in_flight_seals, [memt, this] {
|
||||
return try_flush_memtable_to_sstable(memt);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
column_family::start() {
|
||||
// FIXME: add option to disable automatic compaction.
|
||||
@@ -575,11 +566,8 @@ column_family::start() {
|
||||
future<>
|
||||
column_family::stop() {
|
||||
seal_active_memtable();
|
||||
|
||||
return _compaction_manager.remove(this).then([this] {
|
||||
return _in_flight_seals.close().then([this] {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return _flush_queue->close();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -145,12 +145,13 @@ private:
|
||||
// Whether or not a cf is queued by its compaction manager.
|
||||
bool _compaction_manager_queued = false;
|
||||
int _compaction_disabled = 0;
|
||||
class memtable_flush_queue;
|
||||
std::unique_ptr<memtable_flush_queue> _flush_queue;
|
||||
private:
|
||||
void update_stats_for_new_sstable(uint64_t new_sstable_data_size);
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_sstable(lw_shared_ptr<sstables::sstable> sstable);
|
||||
void add_memtable();
|
||||
future<> flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt);
|
||||
future<> update_cache(memtable&, lw_shared_ptr<sstable_list> old_sstables);
|
||||
struct merge_comparator;
|
||||
|
||||
1
test.py
1
test.py
@@ -55,6 +55,7 @@ boost_tests = [
|
||||
'batchlog_manager_test',
|
||||
'logalloc_test',
|
||||
'crc_test',
|
||||
'flush_queue_test',
|
||||
]
|
||||
|
||||
other_tests = [
|
||||
|
||||
88
tests/flush_queue_test.cc
Normal file
88
tests/flush_queue_test.cc
Normal file
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_DYN_LINK
|
||||
|
||||
#include <random>
|
||||
#include <bitset>
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <boost/range/irange.hpp>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "utils/flush_queue.hh"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
std::random_device rd;
|
||||
std::default_random_engine e1(rd());
|
||||
|
||||
SEASTAR_TEST_CASE(test_queue_ordering_random_ops) {
|
||||
struct env {
|
||||
env(size_t n) : promises(n) {}
|
||||
|
||||
utils::flush_queue<int> queue;
|
||||
std::vector<promise<>> promises;
|
||||
std::vector<int> result;
|
||||
promise<> end;
|
||||
};
|
||||
|
||||
auto r = boost::irange(0, 20);
|
||||
|
||||
return do_for_each(r, [](int) {
|
||||
constexpr size_t num_ops = 1000;
|
||||
|
||||
auto e = make_lw_shared<env>(num_ops);
|
||||
|
||||
int i = 0;
|
||||
for (auto& p : e->promises) {
|
||||
e->queue.run_with_ordered_post_op(i, [&p] {
|
||||
return p.get_future();
|
||||
}, [i, e] {
|
||||
//std::cout << "POST " << i << "(" << e->result.size() << ")" << std::endl;
|
||||
e->result.emplace_back(i);
|
||||
if (e->result.size() == e->promises.size()) {
|
||||
e->end.set_value();
|
||||
}
|
||||
});
|
||||
++i;
|
||||
}
|
||||
|
||||
std::uniform_int_distribution<size_t> dist(0, num_ops - 1);
|
||||
std::bitset<num_ops> set;
|
||||
|
||||
while (!set.all()) {
|
||||
size_t i = dist(e1);
|
||||
if (!set.test(i)) {
|
||||
//std::cout << "SET " << i << "(" << e->result.size() << ")" << std::endl;
|
||||
set[i] = true;
|
||||
e->promises[i].set_value();
|
||||
}
|
||||
}
|
||||
|
||||
return e->end.get_future().then([e] {
|
||||
BOOST_CHECK_EQUAL(e->result.size(), e->promises.size());
|
||||
BOOST_REQUIRE(std::is_sorted(e->result.begin(), e->result.end()));
|
||||
}).finally([e] {
|
||||
//std::cout << "Bulle" << std::endl;
|
||||
});
|
||||
});
|
||||
}
|
||||
178
utils/flush_queue.hh
Normal file
178
utils/flush_queue.hh
Normal file
@@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Copyright 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
|
||||
namespace utils {
|
||||
|
||||
/*
|
||||
* Small utility to order func()->post() operation
|
||||
* so that the "post" step is guaranteed to only be run
|
||||
* when all func+post-ops for lower valued keys (T) are
|
||||
* completed.
|
||||
*/
|
||||
template<typename T, typename Comp = std::less<T>>
|
||||
class flush_queue {
|
||||
private:
|
||||
enum class state {
|
||||
running,
|
||||
waiting,
|
||||
};
|
||||
|
||||
struct notifier {
|
||||
state s;
|
||||
// to signal when "post" action may run
|
||||
promise<> pr;
|
||||
// to carry any "func" exception
|
||||
std::exception_ptr ex;
|
||||
// to wait for before issuing next post
|
||||
std::experimental::optional<future<>> done;
|
||||
|
||||
future<> signal() {
|
||||
if (ex) {
|
||||
pr.set_exception(ex);
|
||||
} else {
|
||||
pr.set_value();
|
||||
}
|
||||
return std::move(*done);
|
||||
}
|
||||
};
|
||||
|
||||
typedef std::map<T, notifier, Comp> map_type;
|
||||
typedef typename map_type::reverse_iterator reverse_iterator;
|
||||
|
||||
map_type _map;
|
||||
// embed all ops in a seastar::gate as well
|
||||
// so we can effectively block incoming ops
|
||||
seastar::gate _gate;
|
||||
|
||||
// Adds/updates an entry to be called once "rp" maps the lowest, finished
|
||||
// operation
|
||||
template<typename Post>
|
||||
future<> add_callback(T rp, Post&& post, state s = state::running) {
|
||||
promise<> pr;
|
||||
auto fut = pr.get_future();
|
||||
|
||||
notifier n;
|
||||
n.s = s;
|
||||
n.done = n.pr.get_future().then(std::forward<Post>(post)).then_wrapped([this, pr = std::move(pr)](future<> f) mutable {
|
||||
f.forward_to(std::move(pr));
|
||||
});
|
||||
|
||||
// Do not use emplace, since we might want to overwrite
|
||||
_map[rp] = std::move(n);
|
||||
|
||||
// We also go through gate the whole func + post chain
|
||||
_gate.enter();
|
||||
return fut.finally([this] {
|
||||
_gate.leave();
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
/*
|
||||
* Runs func() followed by post(), but guaranteeing that
|
||||
* all operations with lower <T> keys have completed before
|
||||
* post() is executed.
|
||||
*
|
||||
* Returns a future containing the result of post()
|
||||
*
|
||||
* Func & Post are both restricted to return future<>
|
||||
* Any exception from Func is forwarded to end result, but
|
||||
* in case of exception post is _not_ run.
|
||||
*/
|
||||
template<typename Func, typename Post>
|
||||
future<> run_with_ordered_post_op(T rp, Func&& func, Post&& post) {
|
||||
assert(!_map.count(rp));
|
||||
assert(_map.empty() || _map.rbegin()->first < rp);
|
||||
|
||||
auto fut = add_callback(rp, std::forward<Post>(post));
|
||||
|
||||
using futurator = futurize<std::result_of_t<Func()>>;
|
||||
|
||||
futurator::apply(std::forward<Func>(func)).then_wrapped([this, rp](future<> f) {
|
||||
auto i = _map.find(rp);
|
||||
// mark us as done (waiting for notofication)
|
||||
i->second.s = state::waiting;
|
||||
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
i->second.ex = std::current_exception();
|
||||
}
|
||||
|
||||
// if we are the first item, dequeue and signal all
|
||||
// that are currently waiting, starting with ourself
|
||||
if (i == _map.begin()) {
|
||||
return do_until([this] {return _map.empty() || _map.begin()->second.s != state::waiting;},
|
||||
[this] {
|
||||
auto i = _map.begin();
|
||||
auto n = std::move(i->second);
|
||||
_map.erase(i);
|
||||
return n.signal();
|
||||
}
|
||||
);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
private:
|
||||
// waits for the entry at "i" to complete (and thus all before it)
|
||||
future<> wait_for_pending(reverse_iterator i) {
|
||||
if (i == _map.rend()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto n = std::move(i->second);
|
||||
auto s = n.s;
|
||||
return add_callback(i->first, [n = std::move(n)]() {
|
||||
// wait for original callback
|
||||
return n.signal();
|
||||
}, s);
|
||||
}
|
||||
public:
|
||||
// Waits for all operations currently active to finish
|
||||
future<> wait_for_pending() {
|
||||
return wait_for_pending(_map.rbegin());
|
||||
}
|
||||
// Waits for all operations whose key is less than or equal to "rp"
|
||||
// to complete
|
||||
future<> wait_for_pending(T rp) {
|
||||
auto i = _map.upper_bound(rp);
|
||||
return wait_for_pending(reverse_iterator(i));
|
||||
}
|
||||
// Closes this queue
|
||||
future<> close() {
|
||||
return _gate.close();
|
||||
}
|
||||
// Poll-check that queue is still open
|
||||
void check_open_gate() {
|
||||
_gate.enter();
|
||||
_gate.leave();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user