diff --git a/configure.py b/configure.py index 8eb367c180..7e00aca1d0 100755 --- a/configure.py +++ b/configure.py @@ -181,6 +181,7 @@ urchin_tests = [ 'tests/logalloc_test', 'tests/managed_vector_test', 'tests/crc_test', + 'tests/flush_queue_test', ] apps = [ diff --git a/database.cc b/database.cc index dbd48c28a5..d7ddfce389 100644 --- a/database.cc +++ b/database.cc @@ -54,11 +54,15 @@ #include "sstable_mutation_readers.hh" #include #include "utils/latency.hh" +#include "utils/flush_queue.hh" using namespace std::chrono_literals; logging::logger dblog("database"); +// Do this to avoid having to include or forward-declare template in our header. +class column_family::memtable_flush_queue : public utils::flush_queue {}; + column_family::column_family(schema_ptr schema, config config, db::commitlog& cl, compaction_manager& compaction_manager) : _schema(std::move(schema)) , _config(std::move(config)) @@ -67,6 +71,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog& cl , _cache(_schema, sstables_as_mutation_source(), global_cache_tracker()) , _commitlog(&cl) , _compaction_manager(compaction_manager) + , _flush_queue(std::make_unique()) { add_memtable(); if (!_config.enable_disk_writes) { @@ -82,6 +87,7 @@ column_family::column_family(schema_ptr schema, config config, no_commitlog cl, , _cache(_schema, sstables_as_mutation_source(), global_cache_tracker()) , _commitlog(nullptr) , _compaction_manager(compaction_manager) + , _flush_queue(std::make_unique()) { add_memtable(); if (!_config.enable_disk_writes) { @@ -477,8 +483,15 @@ column_family::seal_active_memtable() { ); _highest_flushed_rp = old->replay_position(); - return seastar::with_gate(_in_flight_seals, [old, this] { - return flush_memtable_to_sstable(old); + return _flush_queue->run_with_ordered_post_op(old->replay_position(), [old, this] { + return repeat([this, old] { + _flush_queue->check_open_gate(); + return try_flush_memtable_to_sstable(old); + }); + }, [old, this] { + if (_commitlog) { + _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); + } }); // FIXME: release commit log // FIXME: provide back-pressure to upper layers @@ -530,26 +543,13 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr old) { try { ret.get(); - // FIXME: until the surrounding function returns a future and - // caller ensures ordering (i.e. finish flushing one or more sequential tables before - // doing the discard), this below is _not_ correct, since the use of replay_position - // depends on us reporting the factual highest position we've actually flushed, - // _and_ all positions (for a given UUID) below having been dealt with. - // - // Note that the whole scheme is also dependent on memtables being "allocated" in order, - // i.e. we may not flush a younger memtable before and older, and we need to use the - // highest rp. - if (_commitlog) { - _commitlog->discard_completed_segments(_schema->id(), old->replay_position()); - } _memtables->erase(boost::range::find(*_memtables, old)); dblog.debug("Memtable replaced"); trigger_compaction(); + return make_ready_future(stop_iteration::yes); - } catch (std::exception& e) { - dblog.error("failed to write sstable: {}", e.what()); } catch (...) { - dblog.error("failed to write sstable: unknown error"); + dblog.error("failed to write sstable: {}", std::current_exception()); } return sleep(10s).then([] { return make_ready_future(stop_iteration::no); @@ -557,15 +557,6 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr old) { }); } -future<> -column_family::flush_memtable_to_sstable(lw_shared_ptr memt) { - return repeat([this, memt] { - return seastar::with_gate(_in_flight_seals, [memt, this] { - return try_flush_memtable_to_sstable(memt); - }); - }); -} - void column_family::start() { // FIXME: add option to disable automatic compaction. @@ -575,11 +566,8 @@ column_family::start() { future<> column_family::stop() { seal_active_memtable(); - return _compaction_manager.remove(this).then([this] { - return _in_flight_seals.close().then([this] { - return make_ready_future<>(); - }); + return _flush_queue->close(); }); } diff --git a/database.hh b/database.hh index 0a1ce330f4..f82cd2693b 100644 --- a/database.hh +++ b/database.hh @@ -145,12 +145,13 @@ private: // Whether or not a cf is queued by its compaction manager. bool _compaction_manager_queued = false; int _compaction_disabled = 0; + class memtable_flush_queue; + std::unique_ptr _flush_queue; private: void update_stats_for_new_sstable(uint64_t new_sstable_data_size); void add_sstable(sstables::sstable&& sstable); void add_sstable(lw_shared_ptr sstable); void add_memtable(); - future<> flush_memtable_to_sstable(lw_shared_ptr memt); future try_flush_memtable_to_sstable(lw_shared_ptr memt); future<> update_cache(memtable&, lw_shared_ptr old_sstables); struct merge_comparator; diff --git a/test.py b/test.py index ea45dbca3c..3302a4da68 100755 --- a/test.py +++ b/test.py @@ -55,6 +55,7 @@ boost_tests = [ 'batchlog_manager_test', 'logalloc_test', 'crc_test', + 'flush_queue_test', ] other_tests = [ diff --git a/tests/flush_queue_test.cc b/tests/flush_queue_test.cc new file mode 100644 index 0000000000..8ad2eb871f --- /dev/null +++ b/tests/flush_queue_test.cc @@ -0,0 +1,88 @@ +/* + * Copyright 2015 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#define BOOST_TEST_DYN_LINK + +#include +#include +#include +#include +#include + +#include "tests/test-utils.hh" +#include "utils/flush_queue.hh" + +#include + +std::random_device rd; +std::default_random_engine e1(rd()); + +SEASTAR_TEST_CASE(test_queue_ordering_random_ops) { + struct env { + env(size_t n) : promises(n) {} + + utils::flush_queue queue; + std::vector> promises; + std::vector result; + promise<> end; + }; + + auto r = boost::irange(0, 20); + + return do_for_each(r, [](int) { + constexpr size_t num_ops = 1000; + + auto e = make_lw_shared(num_ops); + + int i = 0; + for (auto& p : e->promises) { + e->queue.run_with_ordered_post_op(i, [&p] { + return p.get_future(); + }, [i, e] { + //std::cout << "POST " << i << "(" << e->result.size() << ")" << std::endl; + e->result.emplace_back(i); + if (e->result.size() == e->promises.size()) { + e->end.set_value(); + } + }); + ++i; + } + + std::uniform_int_distribution dist(0, num_ops - 1); + std::bitset set; + + while (!set.all()) { + size_t i = dist(e1); + if (!set.test(i)) { + //std::cout << "SET " << i << "(" << e->result.size() << ")" << std::endl; + set[i] = true; + e->promises[i].set_value(); + } + } + + return e->end.get_future().then([e] { + BOOST_CHECK_EQUAL(e->result.size(), e->promises.size()); + BOOST_REQUIRE(std::is_sorted(e->result.begin(), e->result.end())); + }).finally([e] { + //std::cout << "Bulle" << std::endl; + }); + }); +} diff --git a/utils/flush_queue.hh b/utils/flush_queue.hh new file mode 100644 index 0000000000..fe23c2438e --- /dev/null +++ b/utils/flush_queue.hh @@ -0,0 +1,178 @@ +/* + * Copyright 2015 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +namespace utils { + +/* + * Small utility to order func()->post() operation + * so that the "post" step is guaranteed to only be run + * when all func+post-ops for lower valued keys (T) are + * completed. + */ +template> +class flush_queue { +private: + enum class state { + running, + waiting, + }; + + struct notifier { + state s; + // to signal when "post" action may run + promise<> pr; + // to carry any "func" exception + std::exception_ptr ex; + // to wait for before issuing next post + std::experimental::optional> done; + + future<> signal() { + if (ex) { + pr.set_exception(ex); + } else { + pr.set_value(); + } + return std::move(*done); + } + }; + + typedef std::map map_type; + typedef typename map_type::reverse_iterator reverse_iterator; + + map_type _map; + // embed all ops in a seastar::gate as well + // so we can effectively block incoming ops + seastar::gate _gate; + + // Adds/updates an entry to be called once "rp" maps the lowest, finished + // operation + template + future<> add_callback(T rp, Post&& post, state s = state::running) { + promise<> pr; + auto fut = pr.get_future(); + + notifier n; + n.s = s; + n.done = n.pr.get_future().then(std::forward(post)).then_wrapped([this, pr = std::move(pr)](future<> f) mutable { + f.forward_to(std::move(pr)); + }); + + // Do not use emplace, since we might want to overwrite + _map[rp] = std::move(n); + + // We also go through gate the whole func + post chain + _gate.enter(); + return fut.finally([this] { + _gate.leave(); + }); + } + +public: + /* + * Runs func() followed by post(), but guaranteeing that + * all operations with lower keys have completed before + * post() is executed. + * + * Returns a future containing the result of post() + * + * Func & Post are both restricted to return future<> + * Any exception from Func is forwarded to end result, but + * in case of exception post is _not_ run. + */ + template + future<> run_with_ordered_post_op(T rp, Func&& func, Post&& post) { + assert(!_map.count(rp)); + assert(_map.empty() || _map.rbegin()->first < rp); + + auto fut = add_callback(rp, std::forward(post)); + + using futurator = futurize>; + + futurator::apply(std::forward(func)).then_wrapped([this, rp](future<> f) { + auto i = _map.find(rp); + // mark us as done (waiting for notofication) + i->second.s = state::waiting; + + try { + f.get(); + } catch (...) { + i->second.ex = std::current_exception(); + } + + // if we are the first item, dequeue and signal all + // that are currently waiting, starting with ourself + if (i == _map.begin()) { + return do_until([this] {return _map.empty() || _map.begin()->second.s != state::waiting;}, + [this] { + auto i = _map.begin(); + auto n = std::move(i->second); + _map.erase(i); + return n.signal(); + } + ); + } + return make_ready_future<>(); + }); + return fut; + } +private: + // waits for the entry at "i" to complete (and thus all before it) + future<> wait_for_pending(reverse_iterator i) { + if (i == _map.rend()) { + return make_ready_future<>(); + } + auto n = std::move(i->second); + auto s = n.s; + return add_callback(i->first, [n = std::move(n)]() { + // wait for original callback + return n.signal(); + }, s); + } +public: + // Waits for all operations currently active to finish + future<> wait_for_pending() { + return wait_for_pending(_map.rbegin()); + } + // Waits for all operations whose key is less than or equal to "rp" + // to complete + future<> wait_for_pending(T rp) { + auto i = _map.upper_bound(rp); + return wait_for_pending(reverse_iterator(i)); + } + // Closes this queue + future<> close() { + return _gate.close(); + } + // Poll-check that queue is still open + void check_open_gate() { + _gate.enter(); + _gate.leave(); + } +}; + +}