table: Wire interposer consumer for memtable flush
From now on, memtable flush will use the strategy's interposer consumer iff split_during_flush is enabled (disabled by default). It has effect only for TWCS users as TWCS it's the only strategy that goes on to implement this interposer consumer, which consists of segregating data according to the window configuration. Fixes #4617. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
64
table.cc
64
table.cc
@@ -42,6 +42,7 @@
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/histogram_metrics_helper.hh"
|
||||
#include "mutation_source_metadata.hh"
|
||||
|
||||
static logging::logger tlogger("table");
|
||||
static seastar::metrics::label column_family_label("cf");
|
||||
@@ -522,9 +523,6 @@ table::seal_active_memtable(flush_permit&& permit) {
|
||||
future<stop_iteration>
|
||||
table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
|
||||
return with_scheduling_group(_config.memtable_scheduling_group, [this, old = std::move(old), permit = std::move(permit)] () mutable {
|
||||
auto newtab = make_sstable();
|
||||
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
// Note that due to our sharded architecture, it is possible that
|
||||
// in the face of a value change some shards will backup sstables
|
||||
// while others won't.
|
||||
@@ -536,31 +534,57 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
//
|
||||
// The code as is guarantees that we'll never partially backup a
|
||||
// single sstable, so that is enough of a guarantee.
|
||||
database_sstable_write_monitor monitor(make_lw_shared<sstable_write_permit>(std::move(permit)), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
|
||||
return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) {
|
||||
auto&& priority = service::get_local_memtable_flush_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
auto f = write_memtable_to_sstable(*old, newtab, monitor, cfg, priority);
|
||||
|
||||
return do_with(std::vector<sstables::shared_sstable>(), [this, old, permit = make_lw_shared(std::move(permit))] (auto& newtabs) {
|
||||
auto metadata = mutation_source_metadata{};
|
||||
metadata.min_timestamp = old->get_min_timestamp();
|
||||
metadata.max_timestamp = old->get_max_timestamp();
|
||||
|
||||
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs] (flat_mutation_reader reader) mutable {
|
||||
auto&& priority = service::get_local_memtable_flush_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
|
||||
auto newtab = make_sstable();
|
||||
newtabs.push_back(newtab);
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
|
||||
auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_manager, _compaction_strategy,
|
||||
old->get_max_timestamp());
|
||||
|
||||
return do_with(std::move(monitor), [newtab, cfg = std::move(cfg), old, reader = std::move(reader), &priority] (auto& monitor) mutable {
|
||||
// FIXME: certain writers may receive only a small subset of the partitions, so bloom filters will be
|
||||
// bigger than needed, due to overestimation. That's eventually adjusted through compaction, though.
|
||||
return write_memtable_to_sstable(std::move(reader), *old, newtab, monitor, cfg, priority);
|
||||
});
|
||||
});
|
||||
|
||||
auto f = consumer(old->make_flush_reader(old->schema(), service::get_local_memtable_flush_priority()));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), newtab = std::move(newtab), f = std::move(f)] () mutable {
|
||||
return f.then([this, newtab, old] {
|
||||
return newtab->open_data().then([this, old, newtab] () {
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] {
|
||||
return update_cache(old, { newtab });
|
||||
return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), &newtabs, f = std::move(f)] () mutable {
|
||||
return f.then([this, &newtabs, old] {
|
||||
return parallel_for_each(newtabs, [] (auto& newtab) {
|
||||
return newtab->open_data().then([&newtab] {
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
});
|
||||
}).then([this, old, newtab] () noexcept {
|
||||
}).then([this, old, &newtabs] () {
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs] {
|
||||
return update_cache(old, newtabs);
|
||||
});
|
||||
}).then([this, old, &newtabs] () noexcept {
|
||||
_memtables->erase(old);
|
||||
tlogger.debug("Memtable for {} replaced", newtab->get_filename());
|
||||
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
||||
return stop_iteration::yes;
|
||||
});
|
||||
}).handle_exception([this, old, newtab] (auto e) {
|
||||
newtab->mark_for_deletion();
|
||||
}).handle_exception([this, old, &newtabs] (auto e) {
|
||||
for (auto& newtab : newtabs) {
|
||||
newtab->mark_for_deletion();
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
}
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
|
||||
Reference in New Issue
Block a user