mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
Merge "Wire interposer consumer for memtable flush" from Raphael
" Without interposer consumer on flush, it could happen that a new sstable, produced by memtable flush, will not conform to the strategy invariant. For example, with TWCS, this new sstable could span multiple time windows, making it hard for the strategy to purge expired data. If interposer is enabled, the data will be correctly segregated into different sstables, each one spanning a single window. Fixes #4617. tests: - mode(dev). - manually tested it by forcing a flush of memtable spanning many windows " * 'segregation_on_flush_v2' of github.com:raphaelsc/scylla: test: Add test for TWCS interposer on memtable flush table: Wire interposer consumer for memtable flush table: Add write_memtable_to_sstable variant which accepts flat_mutation_reader table: Allow sstable write permit to be shared across monitors memtable: Track min timestamp table: Extend cache update to operate a memtable split into multiple sstables
This commit is contained in:
@@ -546,7 +546,7 @@ private:
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
|
||||
// Caller must keep m alive.
|
||||
future<> update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst);
|
||||
future<> update_cache(lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts);
|
||||
struct merge_comparator;
|
||||
|
||||
// update the sstable generation, making sure that new new sstables don't overwrite this one.
|
||||
|
||||
@@ -38,6 +38,13 @@ class sstables_manager;
|
||||
class sstable_writer_config;
|
||||
}
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(flat_mutation_reader reader,
|
||||
memtable& mt, sstables::shared_sstable sst,
|
||||
sstables::write_monitor& monitor,
|
||||
sstables::sstable_writer_config& cfg,
|
||||
const io_priority_class& pc);
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(memtable& mt,
|
||||
sstables::shared_sstable sst,
|
||||
|
||||
@@ -34,12 +34,12 @@ thread_local reader_concurrency_semaphore _flush_semaphore(reader_concurrency_se
|
||||
void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) {
|
||||
if (ts != api::missing_timestamp) {
|
||||
encoding_stats_collector::update_timestamp(ts);
|
||||
max_timestamp.update(ts);
|
||||
min_max_timestamp.update(ts);
|
||||
}
|
||||
}
|
||||
|
||||
memtable::memtable_encoding_stats_collector::memtable_encoding_stats_collector()
|
||||
: max_timestamp(0)
|
||||
: min_max_timestamp(0, 0)
|
||||
{}
|
||||
|
||||
void memtable::memtable_encoding_stats_collector::update(atomic_cell_view cell) {
|
||||
|
||||
12
memtable.hh
12
memtable.hh
@@ -138,7 +138,7 @@ private:
|
||||
|
||||
class memtable_encoding_stats_collector : public encoding_stats_collector {
|
||||
private:
|
||||
max_tracker<api::timestamp_type> max_timestamp;
|
||||
min_max_tracker<api::timestamp_type> min_max_timestamp;
|
||||
|
||||
void update_timestamp(api::timestamp_type ts);
|
||||
|
||||
@@ -154,8 +154,12 @@ private:
|
||||
void update(const ::schema& s, const deletable_row& dr);
|
||||
void update(const ::schema& s, const mutation_partition& mp);
|
||||
|
||||
api::timestamp_type get_min_timestamp() const {
|
||||
return min_max_timestamp.min();
|
||||
}
|
||||
|
||||
api::timestamp_type get_max_timestamp() const {
|
||||
return max_timestamp.get();
|
||||
return min_max_timestamp.max();
|
||||
}
|
||||
} _stats_collector;
|
||||
|
||||
@@ -212,6 +216,10 @@ public:
|
||||
return _stats_collector.get();
|
||||
}
|
||||
|
||||
api::timestamp_type get_min_timestamp() const {
|
||||
return _stats_collector.get_min_timestamp();
|
||||
}
|
||||
|
||||
api::timestamp_type get_max_timestamp() const {
|
||||
return _stats_collector.get_max_timestamp();
|
||||
}
|
||||
|
||||
107
table.cc
107
table.cc
@@ -42,6 +42,7 @@
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/histogram_metrics_helper.hh"
|
||||
#include "mutation_source_metadata.hh"
|
||||
|
||||
static logging::logger tlogger("table");
|
||||
static seastar::metrics::label column_family_label("cf");
|
||||
@@ -369,11 +370,18 @@ table::add_sstable_and_update_cache(sstables::shared_sstable sst) {
|
||||
}
|
||||
|
||||
future<>
|
||||
table::update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst) {
|
||||
auto adder = row_cache::external_updater([this, m, sst] {
|
||||
auto newtab_ms = sst->as_mutation_source();
|
||||
add_sstable(sst);
|
||||
m->mark_flushed(std::move(newtab_ms));
|
||||
table::update_cache(lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
|
||||
std::vector<mutation_source> sources;
|
||||
sources.reserve(ssts.size());
|
||||
for (auto& sst : ssts) {
|
||||
sources.push_back(sst->as_mutation_source());
|
||||
}
|
||||
auto new_ssts_ms = make_combined_mutation_source(std::move(sources));
|
||||
auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(new_ssts_ms)] () mutable {
|
||||
for (auto& sst : ssts) {
|
||||
add_sstable(sst);
|
||||
}
|
||||
m->mark_flushed(std::move(new_ssts_ms));
|
||||
try_trigger_compaction();
|
||||
});
|
||||
if (cache_enabled()) {
|
||||
@@ -386,9 +394,9 @@ table::update_cache(lw_shared_ptr<memtable> m, sstables::shared_sstable sst) {
|
||||
// Handles permit management only, used for situations where we don't want to inform
|
||||
// the compaction manager about backlogs (i.e., tests)
|
||||
class permit_monitor : public sstables::write_monitor {
|
||||
sstable_write_permit _permit;
|
||||
lw_shared_ptr<sstable_write_permit> _permit;
|
||||
public:
|
||||
permit_monitor(sstable_write_permit&& permit)
|
||||
permit_monitor(lw_shared_ptr<sstable_write_permit> permit)
|
||||
: _permit(std::move(permit)) {
|
||||
}
|
||||
|
||||
@@ -398,7 +406,7 @@ public:
|
||||
// we'll have a period without significant disk activity when the current
|
||||
// SSTable is being sealed, the caches are being updated, etc. To do that,
|
||||
// we ensure the permit doesn't outlive this continuation.
|
||||
_permit = sstable_write_permit::unconditional();
|
||||
*_permit = sstable_write_permit::unconditional();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -411,8 +419,8 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri
|
||||
uint64_t _progress_seen = 0;
|
||||
api::timestamp_type _maximum_timestamp;
|
||||
public:
|
||||
database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager,
|
||||
sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp)
|
||||
database_sstable_write_monitor(lw_shared_ptr<sstable_write_permit> permit, sstables::shared_sstable sst,
|
||||
compaction_manager& manager, sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp)
|
||||
: permit_monitor(std::move(permit))
|
||||
, _sst(std::move(sst))
|
||||
, _compaction_manager(manager)
|
||||
@@ -515,9 +523,6 @@ table::seal_active_memtable(flush_permit&& permit) {
|
||||
future<stop_iteration>
|
||||
table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
|
||||
return with_scheduling_group(_config.memtable_scheduling_group, [this, old = std::move(old), permit = std::move(permit)] () mutable {
|
||||
auto newtab = make_sstable();
|
||||
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
// Note that due to our sharded architecture, it is possible that
|
||||
// in the face of a value change some shards will backup sstables
|
||||
// while others won't.
|
||||
@@ -529,31 +534,57 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
//
|
||||
// The code as is guarantees that we'll never partially backup a
|
||||
// single sstable, so that is enough of a guarantee.
|
||||
database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp());
|
||||
return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) {
|
||||
auto&& priority = service::get_local_memtable_flush_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
auto f = write_memtable_to_sstable(*old, newtab, monitor, cfg, priority);
|
||||
|
||||
return do_with(std::vector<sstables::shared_sstable>(), [this, old, permit = make_lw_shared(std::move(permit))] (auto& newtabs) {
|
||||
auto metadata = mutation_source_metadata{};
|
||||
metadata.min_timestamp = old->get_min_timestamp();
|
||||
metadata.max_timestamp = old->get_max_timestamp();
|
||||
|
||||
auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs] (flat_mutation_reader reader) mutable {
|
||||
auto&& priority = service::get_local_memtable_flush_priority();
|
||||
sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer();
|
||||
cfg.backup = incremental_backups_enabled();
|
||||
|
||||
auto newtab = make_sstable();
|
||||
newtabs.push_back(newtab);
|
||||
tlogger.debug("Flushing to {}", newtab->get_filename());
|
||||
|
||||
auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_manager, _compaction_strategy,
|
||||
old->get_max_timestamp());
|
||||
|
||||
return do_with(std::move(monitor), [newtab, cfg = std::move(cfg), old, reader = std::move(reader), &priority] (auto& monitor) mutable {
|
||||
// FIXME: certain writers may receive only a small subset of the partitions, so bloom filters will be
|
||||
// bigger than needed, due to overestimation. That's eventually adjusted through compaction, though.
|
||||
return write_memtable_to_sstable(std::move(reader), *old, newtab, monitor, cfg, priority);
|
||||
});
|
||||
});
|
||||
|
||||
auto f = consumer(old->make_flush_reader(old->schema(), service::get_local_memtable_flush_priority()));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
// controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to
|
||||
// priority inversion.
|
||||
return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), newtab = std::move(newtab), f = std::move(f)] () mutable {
|
||||
return f.then([this, newtab, old] {
|
||||
return newtab->open_data().then([this, old, newtab] () {
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] {
|
||||
return update_cache(old, newtab);
|
||||
return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), &newtabs, f = std::move(f)] () mutable {
|
||||
return f.then([this, &newtabs, old] {
|
||||
return parallel_for_each(newtabs, [] (auto& newtab) {
|
||||
return newtab->open_data().then([&newtab] {
|
||||
tlogger.debug("Flushing to {} done", newtab->get_filename());
|
||||
});
|
||||
}).then([this, old, newtab] () noexcept {
|
||||
}).then([this, old, &newtabs] () {
|
||||
return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs] {
|
||||
return update_cache(old, newtabs);
|
||||
});
|
||||
}).then([this, old, &newtabs] () noexcept {
|
||||
_memtables->erase(old);
|
||||
tlogger.debug("Memtable for {} replaced", newtab->get_filename());
|
||||
tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size());
|
||||
return stop_iteration::yes;
|
||||
});
|
||||
}).handle_exception([this, old, newtab] (auto e) {
|
||||
newtab->mark_for_deletion();
|
||||
}).handle_exception([this, old, &newtabs] (auto e) {
|
||||
for (auto& newtab : newtabs) {
|
||||
newtab->mark_for_deletion();
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
}
|
||||
_config.cf_stats->failed_memtables_flushes_count++;
|
||||
tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e);
|
||||
// If we failed this write we will try the write again and that will create a new flush reader
|
||||
// that will decrease dirty memory again. So we need to reset the accounting.
|
||||
old->revert_flushed_memory();
|
||||
@@ -1631,19 +1662,27 @@ table::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle
|
||||
}
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst,
|
||||
write_memtable_to_sstable(flat_mutation_reader reader,
|
||||
memtable& mt, sstables::shared_sstable sst,
|
||||
sstables::write_monitor& monitor,
|
||||
sstables::sstable_writer_config& cfg,
|
||||
const io_priority_class& pc) {
|
||||
cfg.replay_position = mt.replay_position();
|
||||
cfg.monitor = &monitor;
|
||||
return sst->write_components(mt.make_flush_reader(mt.schema(), pc), mt.partition_count(),
|
||||
mt.schema(), cfg, mt.get_encoding_stats(), pc);
|
||||
return sst->write_components(std::move(reader), mt.partition_count(), mt.schema(), cfg, mt.get_encoding_stats(), pc);
|
||||
}
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst,
|
||||
sstables::write_monitor& monitor,
|
||||
sstables::sstable_writer_config& cfg,
|
||||
const io_priority_class& pc) {
|
||||
return write_memtable_to_sstable(mt.make_flush_reader(mt.schema(), pc), mt, std::move(sst), monitor, cfg, pc);
|
||||
}
|
||||
|
||||
future<>
|
||||
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::sstable_writer_config cfg) {
|
||||
return do_with(permit_monitor(sstable_write_permit::unconditional()), cfg, [&mt, sst] (auto& monitor, auto& cfg) {
|
||||
return do_with(permit_monitor(make_lw_shared(sstable_write_permit::unconditional())), cfg, [&mt, sst] (auto& monitor, auto& cfg) {
|
||||
return write_memtable_to_sstable(mt, std::move(sst), monitor, cfg);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6745,3 +6745,72 @@ SEASTAR_TEST_CASE(stcs_reshape_test) {
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_twcs_interposer_on_memtable_flush) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
|
||||
auto test_interposer_on_flush = [&] (bool split_during_flush) {
|
||||
auto builder = schema_builder("tests", "test_twcs_interposer_on_flush")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" },
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" },
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
auto s = builder.build();
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_row = [&] (std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
cfg.datadir = tmp.path().string();
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = false;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
size_t target_windows_span = (split_during_flush) ? 10 : 1;
|
||||
constexpr size_t rows_per_window = 10;
|
||||
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto i = 1; i <= target_windows_span; i++) {
|
||||
for (auto j = 0; j < rows_per_window; j++) {
|
||||
mt->apply(make_row(std::chrono::hours(i)));
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = column_family_test(cf).try_flush_memtable_to_sstable(mt).get0();
|
||||
BOOST_REQUIRE(ret == stop_iteration::yes);
|
||||
|
||||
auto expected_ssts = (split_during_flush) ? target_windows_span : 1;
|
||||
testlog.info("split_during_flush={}, actual={}, expected={}", split_during_flush, cf->get_sstables()->size(), expected_ssts);
|
||||
BOOST_REQUIRE(cf->get_sstables()->size() == expected_ssts);
|
||||
};
|
||||
|
||||
test_interposer_on_flush(true);
|
||||
test_interposer_on_flush(false);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -66,6 +66,10 @@ public:
|
||||
static int64_t calculate_shard_from_sstable_generation(int64_t generation) {
|
||||
return column_family::calculate_shard_from_sstable_generation(generation);
|
||||
}
|
||||
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> mt) {
|
||||
return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional());
|
||||
}
|
||||
};
|
||||
|
||||
namespace sstables {
|
||||
|
||||
Reference in New Issue
Block a user