streaming memtables: coalesce incoming writes
The repair process will potentially send ranges containing few mutations, definitely not enough to fill a memtable. It wants to know whether or not each of those ranges individually succeeded or failed, so we need a future for each. Small memtables being flushed are bad, and we would like to write bigger memtables so we can better utilize our disks. One of the ways to fix that, is changing the repair itself to send more mutations at a single batch. But relying on that is a bad idea for two reasons: First, the goals of the SSTable writer and the repair sender are at odds. The SSTable writer wants to write as few SSTables as possible, while the repair sender wants to break down the range in pieces as small as it can and checksum them individually, so it doesn't have to send a lot of mutations for no reason. Second, even if the repair process wants to process larger ranges at once, some ranges themselves may be small. So while most ranges would be large, we would still have potentially some fairly small SSTables lying around. The best course of action in this case is to coalesce the incoming streams write-side. repair can now choose whatever strategy - small or big ranges - it wants, resting assure that the incoming memtables will be coalesced together. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Notes:
Avi Kivity
2016-03-26 22:06:40 +03:00
backport: 1.0
75
database.cc
75
database.cc
@@ -90,7 +90,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog& cl
|
||||
, _config(std::move(config))
|
||||
, _memtables(make_lw_shared<memtable_list>([this] { return seal_active_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size))
|
||||
, _streaming_memtables(_config.enable_disk_writes ?
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_streaming_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size) :
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_streaming_memtable_delayed(); }, [this] { return new_memtable(); }, _config.max_memtable_size) :
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size))
|
||||
, _sstables(make_lw_shared<sstable_list>())
|
||||
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
|
||||
@@ -108,7 +108,7 @@ column_family::column_family(schema_ptr schema, config config, no_commitlog cl,
|
||||
, _config(std::move(config))
|
||||
, _memtables(make_lw_shared<memtable_list>([this] { return seal_active_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size))
|
||||
, _streaming_memtables(_config.enable_disk_writes ?
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_streaming_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size) :
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_streaming_memtable_delayed(); }, [this] { return new_memtable(); }, _config.max_memtable_size) :
|
||||
make_lw_shared<memtable_list>([this] { return seal_active_memtable(); }, [this] { return new_memtable(); }, _config.max_memtable_size))
|
||||
, _sstables(make_lw_shared<sstable_list>())
|
||||
, _cache(_schema, sstables_as_mutation_source(), sstables_as_key_source(), global_cache_tracker())
|
||||
@@ -558,6 +558,38 @@ column_family::update_cache(memtable& m, lw_shared_ptr<sstable_list> old_sstable
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: because we are coalescing, it could be that mutations belonging to the same
|
||||
// range end up in two different tables. Technically, we should wait for both. However,
|
||||
// the only way we have to make this happen now is to wait on all previous writes. This
|
||||
// certainly is an overkill, so we won't do it. We can fix this longer term by looking
|
||||
// at the PREPARE messages, and then noting what is the minimum future we should be
|
||||
// waiting for.
|
||||
future<>
|
||||
column_family::seal_active_streaming_memtable_delayed() {
|
||||
auto old = _streaming_memtables->back();
|
||||
if (old->empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (_streaming_memtables->should_flush()) {
|
||||
return seal_active_streaming_memtable();
|
||||
}
|
||||
|
||||
if (!_delayed_streaming_flush.armed()) {
|
||||
// We don't want to wait for too long, because the incoming mutations will not be available
|
||||
// until we flush them to SSTables. On top of that, if the sender ran out of messages, it won't
|
||||
// send more until we respond to some - which depends on these futures resolving. Sure enough,
|
||||
// the real fix for that second one is to have better communication between sender and receiver,
|
||||
// but that's not realistic ATM. If we did have better negotiation here, we would not need a timer
|
||||
// at all.
|
||||
_delayed_streaming_flush.arm(2s);
|
||||
}
|
||||
|
||||
return with_gate(_streaming_flush_gate, [this, old] {
|
||||
return _waiting_streaming_flushes.get_shared_future();
|
||||
});
|
||||
}
|
||||
|
||||
future<>
|
||||
column_family::seal_active_streaming_memtable() {
|
||||
auto old = _streaming_memtables->back();
|
||||
@@ -567,7 +599,12 @@ column_family::seal_active_streaming_memtable() {
|
||||
_streaming_memtables->add_memtable();
|
||||
_streaming_memtables->erase(old);
|
||||
return with_gate(_streaming_flush_gate, [this, old] {
|
||||
return with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
_delayed_streaming_flush.cancel();
|
||||
|
||||
auto current_waiters = std::exchange(_waiting_streaming_flushes, shared_promise<>());
|
||||
auto f = current_waiters.get_shared_future(); // for this seal
|
||||
|
||||
with_lock(_sstables_lock.for_read(), [this, old] {
|
||||
auto newtab = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(),
|
||||
_config.datadir, calculate_generation_for_new_table(),
|
||||
sstables::sstable::version_types::ka,
|
||||
@@ -600,7 +637,15 @@ column_family::seal_active_streaming_memtable() {
|
||||
});
|
||||
// We will also not have any retry logic. If we fail here, we'll fail the streaming and let
|
||||
// the upper layers know. They can then apply any logic they want here.
|
||||
}).then_wrapped([this, current_waiters = std::move(current_waiters)] (future <> f) mutable {
|
||||
if (f.failed()) {
|
||||
current_waiters.set_exception(f.get_exception());
|
||||
} else {
|
||||
current_waiters.set_value();
|
||||
}
|
||||
});
|
||||
|
||||
return f;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -713,8 +758,8 @@ column_family::start() {
|
||||
|
||||
future<>
|
||||
column_family::stop() {
|
||||
_memtables->seal_active_memtable();
|
||||
_streaming_memtables->seal_active_memtable();
|
||||
seal_active_memtable();
|
||||
seal_active_streaming_memtable();
|
||||
return _compaction_manager.remove(this).then([this] {
|
||||
// Nest, instead of using when_all, so we don't lose any exceptions.
|
||||
return _flush_queue->close().then([this] {
|
||||
@@ -2372,7 +2417,7 @@ future<> column_family::flush() {
|
||||
// FIXME: this will synchronously wait for this write to finish, but doesn't guarantee
|
||||
// anything about previous writes.
|
||||
_stats.pending_flushes++;
|
||||
return _memtables->seal_active_memtable().finally([this]() mutable {
|
||||
return seal_active_memtable().finally([this]() mutable {
|
||||
_stats.pending_flushes--;
|
||||
// In origin memtable_switch_count is incremented inside
|
||||
// ColumnFamilyMeetrics Flush.run
|
||||
@@ -2394,7 +2439,7 @@ future<> column_family::flush(const db::replay_position& pos) {
|
||||
// We ignore this for now and just say that if we're asked for
|
||||
// a CF and it exists, we pretty much have to have data that needs
|
||||
// flushing. Let's do it.
|
||||
return _memtables->seal_active_memtable();
|
||||
return seal_active_memtable();
|
||||
}
|
||||
|
||||
// FIXME: We can do much better than this in terms of cache management. Right
|
||||
@@ -2406,12 +2451,18 @@ future<> column_family::flush(const db::replay_position& pos) {
|
||||
// be indiscriminately touching the cache during repair. We will just have to
|
||||
// invalidate the entries that are relevant to things we already have in the cache.
|
||||
future<> column_family::flush_streaming_mutations(std::vector<query::partition_range> ranges) {
|
||||
return _streaming_memtables->seal_active_memtable().finally([this, ranges = std::move(ranges)] {
|
||||
if (_config.enable_cache) {
|
||||
for (auto& range : ranges) {
|
||||
_cache.invalidate(range);
|
||||
// This will effectively take the gate twice for this call. The proper way to fix that would
|
||||
// be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we
|
||||
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
|
||||
// temporary counter measure.
|
||||
return with_gate(_streaming_flush_gate, [this, ranges = std::move(ranges)] {
|
||||
return seal_active_streaming_memtable_delayed().finally([this, ranges = std::move(ranges)] {
|
||||
if (_config.enable_cache) {
|
||||
for (auto& range : ranges) {
|
||||
_cache.invalidate(range);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
32
database.hh
32
database.hh
@@ -71,6 +71,7 @@
|
||||
#include "sstables/compaction.hh"
|
||||
#include "key_reader.hh"
|
||||
#include <seastar/core/rwlock.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
|
||||
class frozen_mutation;
|
||||
class reconcilable_result;
|
||||
@@ -175,8 +176,13 @@ public:
|
||||
void add_memtable() {
|
||||
_memtables.emplace_back(_new_memtable());
|
||||
}
|
||||
|
||||
bool should_flush() {
|
||||
return active_memtable().occupancy().total_space() >= _max_memtable_size;
|
||||
}
|
||||
|
||||
void seal_on_overflow() {
|
||||
if (active_memtable().occupancy().total_space() >= _max_memtable_size) {
|
||||
if (should_flush()) {
|
||||
// FIXME: if sparse, do some in-memory compaction first
|
||||
// FIXME: maybe merge with other in-memory memtables
|
||||
_seal_fn();
|
||||
@@ -527,7 +533,31 @@ private:
|
||||
// waiting on this future. This is useful in situations where we want to
|
||||
// synchronously flush data to disk.
|
||||
future<> seal_active_memtable();
|
||||
|
||||
// I am assuming here that the repair process will potentially send ranges containing
|
||||
// few mutations, definitely not enough to fill a memtable. It wants to know whether or
|
||||
// not each of those ranges individually succeeded or failed, so we need a future for
|
||||
// each.
|
||||
//
|
||||
// One of the ways to fix that, is changing the repair itself to send more mutations at
|
||||
// a single batch. But relying on that is a bad idea for two reasons:
|
||||
//
|
||||
// First, the goals of the SSTable writer and the repair sender are at odds. The SSTable
|
||||
// writer wants to write as few SSTables as possible, while the repair sender wants to
|
||||
// break down the range in pieces as small as it can and checksum them individually, so
|
||||
// it doesn't have to send a lot of mutations for no reason.
|
||||
//
|
||||
// Second, even if the repair process wants to process larger ranges at once, some ranges
|
||||
// themselves may be small. So while most ranges would be large, we would still have
|
||||
// potentially some fairly small SSTables lying around.
|
||||
//
|
||||
// The best course of action in this case is to coalesce the incoming streams write-side.
|
||||
// repair can now choose whatever strategy - small or big ranges - it wants, resting assure
|
||||
// that the incoming memtables will be coalesced together.
|
||||
shared_promise<> _waiting_streaming_flushes;
|
||||
timer<> _delayed_streaming_flush{[this] { seal_active_streaming_memtable(); }};
|
||||
future<> seal_active_streaming_memtable();
|
||||
future<> seal_active_streaming_memtable_delayed();
|
||||
|
||||
// filter manifest.json files out
|
||||
static bool manifest_json_filter(const sstring& fname);
|
||||
|
||||
Reference in New Issue
Block a user