From 5519fdba72f264c2682fe097f132fe652156e082 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 23 Dec 2020 14:27:43 -0300 Subject: [PATCH 1/6] table: Extend cache update to operate a memtable split into multiple sstables This extension is needed for future work where a memtable will be segregated during flush into one sstable or more. So now multiple sstables can be added to the set after a memtable flush, and compaction is only triggered at the end. Signed-off-by: Raphael S. Carvalho --- database.hh | 2 +- table.cc | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/database.hh b/database.hh index f49950ad4c..5a534b4b4f 100644 --- a/database.hh +++ b/database.hh @@ -546,7 +546,7 @@ private: lw_shared_ptr new_memtable(); future try_flush_memtable_to_sstable(lw_shared_ptr memt, sstable_write_permit&& permit); // Caller must keep m alive. - future<> update_cache(lw_shared_ptr m, sstables::shared_sstable sst); + future<> update_cache(lw_shared_ptr m, std::vector ssts); struct merge_comparator; // update the sstable generation, making sure that new new sstables don't overwrite this one. diff --git a/table.cc b/table.cc index ae252f2684..6bbe6c830a 100644 --- a/table.cc +++ b/table.cc @@ -369,11 +369,18 @@ table::add_sstable_and_update_cache(sstables::shared_sstable sst) { } future<> -table::update_cache(lw_shared_ptr m, sstables::shared_sstable sst) { - auto adder = row_cache::external_updater([this, m, sst] { - auto newtab_ms = sst->as_mutation_source(); - add_sstable(sst); - m->mark_flushed(std::move(newtab_ms)); +table::update_cache(lw_shared_ptr m, std::vector ssts) { + std::vector sources; + sources.reserve(ssts.size()); + for (auto& sst : ssts) { + sources.push_back(sst->as_mutation_source()); + } + auto new_ssts_ms = make_combined_mutation_source(std::move(sources)); + auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(new_ssts_ms)] () mutable { + for (auto& sst : ssts) { + add_sstable(sst); + } + m->mark_flushed(std::move(new_ssts_ms)); try_trigger_compaction(); }); if (cache_enabled()) { @@ -543,7 +550,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ return newtab->open_data().then([this, old, newtab] () { tlogger.debug("Flushing to {} done", newtab->get_filename()); return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] { - return update_cache(old, newtab); + return update_cache(old, { newtab }); }); }).then([this, old, newtab] () noexcept { _memtables->erase(old); From 738049cba2d0821193754a0bfb29e235c4d73c03 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 23 Dec 2020 17:04:35 -0300 Subject: [PATCH 2/6] memtable: Track min timestamp Tracking both min and max timestamp will be required for memtable flush to short-circuit interposer consumer if needed. Signed-off-by: Raphael S. Carvalho --- memtable.cc | 4 ++-- memtable.hh | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/memtable.cc b/memtable.cc index 4965e676f4..0accd0e301 100644 --- a/memtable.cc +++ b/memtable.cc @@ -34,12 +34,12 @@ thread_local reader_concurrency_semaphore _flush_semaphore(reader_concurrency_se void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) { if (ts != api::missing_timestamp) { encoding_stats_collector::update_timestamp(ts); - max_timestamp.update(ts); + min_max_timestamp.update(ts); } } memtable::memtable_encoding_stats_collector::memtable_encoding_stats_collector() - : max_timestamp(0) + : min_max_timestamp(0, 0) {} void memtable::memtable_encoding_stats_collector::update(atomic_cell_view cell) { diff --git a/memtable.hh b/memtable.hh index fe63a867da..3a78c734f7 100644 --- a/memtable.hh +++ b/memtable.hh @@ -138,7 +138,7 @@ private: class memtable_encoding_stats_collector : public encoding_stats_collector { private: - max_tracker max_timestamp; + min_max_tracker min_max_timestamp; void update_timestamp(api::timestamp_type ts); @@ -154,8 +154,12 @@ private: void update(const ::schema& s, const deletable_row& dr); void update(const ::schema& s, const mutation_partition& mp); + api::timestamp_type get_min_timestamp() const { + return min_max_timestamp.min(); + } + api::timestamp_type get_max_timestamp() const { - return max_timestamp.get(); + return min_max_timestamp.max(); } } _stats_collector; @@ -212,6 +216,10 @@ public: return _stats_collector.get(); } + api::timestamp_type get_min_timestamp() const { + return _stats_collector.get_min_timestamp(); + } + api::timestamp_type get_max_timestamp() const { return _stats_collector.get_max_timestamp(); } From 32acb44fecbf48ca15ba7a0bbf03ae6d3a1a18e1 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 4 Jan 2021 14:46:43 -0300 Subject: [PATCH 3/6] table: Allow sstable write permit to be shared across monitors As a preparation for interposer on flush, let's allow database write monitor to store a shared sstable write permit, which will be released as soon as any of the sstable writers reach the sealing stage. Signed-off-by: Raphael S. Carvalho --- table.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/table.cc b/table.cc index 6bbe6c830a..111363853a 100644 --- a/table.cc +++ b/table.cc @@ -393,9 +393,9 @@ table::update_cache(lw_shared_ptr m, std::vector _permit; public: - permit_monitor(sstable_write_permit&& permit) + permit_monitor(lw_shared_ptr permit) : _permit(std::move(permit)) { } @@ -405,7 +405,7 @@ public: // we'll have a period without significant disk activity when the current // SSTable is being sealed, the caches are being updated, etc. To do that, // we ensure the permit doesn't outlive this continuation. - _permit = sstable_write_permit::unconditional(); + *_permit = sstable_write_permit::unconditional(); } }; @@ -418,8 +418,8 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri uint64_t _progress_seen = 0; api::timestamp_type _maximum_timestamp; public: - database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, - sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) + database_sstable_write_monitor(lw_shared_ptr permit, sstables::shared_sstable sst, + compaction_manager& manager, sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) : permit_monitor(std::move(permit)) , _sst(std::move(sst)) , _compaction_manager(manager) @@ -536,7 +536,7 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. - database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); + database_sstable_write_monitor monitor(make_lw_shared(std::move(permit)), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) { auto&& priority = service::get_local_memtable_flush_priority(); sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); @@ -1650,7 +1650,7 @@ write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, future<> write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::sstable_writer_config cfg) { - return do_with(permit_monitor(sstable_write_permit::unconditional()), cfg, [&mt, sst] (auto& monitor, auto& cfg) { + return do_with(permit_monitor(make_lw_shared(sstable_write_permit::unconditional())), cfg, [&mt, sst] (auto& monitor, auto& cfg) { return write_memtable_to_sstable(mt, std::move(sst), monitor, cfg); }); } From c926a948e5984fa2271fe7e6c0e3bee0e9e44bad Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 4 Jan 2021 16:23:00 -0300 Subject: [PATCH 4/6] table: Add write_memtable_to_sstable variant which accepts flat_mutation_reader This new variant will be needed for interposer consumer. Signed-off-by: Raphael S. Carvalho --- memtable-sstable.hh | 7 +++++++ table.cc | 14 +++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/memtable-sstable.hh b/memtable-sstable.hh index 9346d80f6d..a522f9bbd8 100644 --- a/memtable-sstable.hh +++ b/memtable-sstable.hh @@ -38,6 +38,13 @@ class sstables_manager; class sstable_writer_config; } +future<> +write_memtable_to_sstable(flat_mutation_reader reader, + memtable& mt, sstables::shared_sstable sst, + sstables::write_monitor& monitor, + sstables::sstable_writer_config& cfg, + const io_priority_class& pc); + future<> write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, diff --git a/table.cc b/table.cc index 111363853a..268cb8f4ad 100644 --- a/table.cc +++ b/table.cc @@ -1638,14 +1638,22 @@ table::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle } future<> -write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, +write_memtable_to_sstable(flat_mutation_reader reader, + memtable& mt, sstables::shared_sstable sst, sstables::write_monitor& monitor, sstables::sstable_writer_config& cfg, const io_priority_class& pc) { cfg.replay_position = mt.replay_position(); cfg.monitor = &monitor; - return sst->write_components(mt.make_flush_reader(mt.schema(), pc), mt.partition_count(), - mt.schema(), cfg, mt.get_encoding_stats(), pc); + return sst->write_components(std::move(reader), mt.partition_count(), mt.schema(), cfg, mt.get_encoding_stats(), pc); +} + +future<> +write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, + sstables::write_monitor& monitor, + sstables::sstable_writer_config& cfg, + const io_priority_class& pc) { + return write_memtable_to_sstable(mt.make_flush_reader(mt.schema(), pc), mt, std::move(sst), monitor, cfg, pc); } future<> From 9124a708f16a6b7644fc643456f40494a6cb5b1a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 4 Jan 2021 16:23:50 -0300 Subject: [PATCH 5/6] table: Wire interposer consumer for memtable flush From now on, memtable flush will use the strategy's interposer consumer iff split_during_flush is enabled (disabled by default). It has effect only for TWCS users as TWCS it's the only strategy that goes on to implement this interposer consumer, which consists of segregating data according to the window configuration. Fixes #4617. Signed-off-by: Raphael S. Carvalho --- table.cc | 64 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/table.cc b/table.cc index 268cb8f4ad..1e2aca6673 100644 --- a/table.cc +++ b/table.cc @@ -42,6 +42,7 @@ #include #include "utils/error_injection.hh" #include "utils/histogram_metrics_helper.hh" +#include "mutation_source_metadata.hh" static logging::logger tlogger("table"); static seastar::metrics::label column_family_label("cf"); @@ -522,9 +523,6 @@ table::seal_active_memtable(flush_permit&& permit) { future table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_permit&& permit) { return with_scheduling_group(_config.memtable_scheduling_group, [this, old = std::move(old), permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); - - tlogger.debug("Flushing to {}", newtab->get_filename()); // Note that due to our sharded architecture, it is possible that // in the face of a value change some shards will backup sstables // while others won't. @@ -536,31 +534,57 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. - database_sstable_write_monitor monitor(make_lw_shared(std::move(permit)), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) { - auto&& priority = service::get_local_memtable_flush_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - auto f = write_memtable_to_sstable(*old, newtab, monitor, cfg, priority); + + return do_with(std::vector(), [this, old, permit = make_lw_shared(std::move(permit))] (auto& newtabs) { + auto metadata = mutation_source_metadata{}; + metadata.min_timestamp = old->get_min_timestamp(); + metadata.max_timestamp = old->get_max_timestamp(); + + auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs] (flat_mutation_reader reader) mutable { + auto&& priority = service::get_local_memtable_flush_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + + auto newtab = make_sstable(); + newtabs.push_back(newtab); + tlogger.debug("Flushing to {}", newtab->get_filename()); + + auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_manager, _compaction_strategy, + old->get_max_timestamp()); + + return do_with(std::move(monitor), [newtab, cfg = std::move(cfg), old, reader = std::move(reader), &priority] (auto& monitor) mutable { + // FIXME: certain writers may receive only a small subset of the partitions, so bloom filters will be + // bigger than needed, due to overestimation. That's eventually adjusted through compaction, though. + return write_memtable_to_sstable(std::move(reader), *old, newtab, monitor, cfg, priority); + }); + }); + + auto f = consumer(old->make_flush_reader(old->schema(), service::get_local_memtable_flush_priority())); + // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to // priority inversion. - return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), newtab = std::move(newtab), f = std::move(f)] () mutable { - return f.then([this, newtab, old] { - return newtab->open_data().then([this, old, newtab] () { - tlogger.debug("Flushing to {} done", newtab->get_filename()); - return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] { - return update_cache(old, { newtab }); + return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), &newtabs, f = std::move(f)] () mutable { + return f.then([this, &newtabs, old] { + return parallel_for_each(newtabs, [] (auto& newtab) { + return newtab->open_data().then([&newtab] { + tlogger.debug("Flushing to {} done", newtab->get_filename()); }); - }).then([this, old, newtab] () noexcept { + }).then([this, old, &newtabs] () { + return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs] { + return update_cache(old, newtabs); + }); + }).then([this, old, &newtabs] () noexcept { _memtables->erase(old); - tlogger.debug("Memtable for {} replaced", newtab->get_filename()); + tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size()); return stop_iteration::yes; }); - }).handle_exception([this, old, newtab] (auto e) { - newtab->mark_for_deletion(); + }).handle_exception([this, old, &newtabs] (auto e) { + for (auto& newtab : newtabs) { + newtab->mark_for_deletion(); + tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e); + } _config.cf_stats->failed_memtables_flushes_count++; - tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e); // If we failed this write we will try the write again and that will create a new flush reader // that will decrease dirty memory again. So we need to reset the accounting. old->revert_flushed_memory(); From d265bb9bdbcd03251244faab281d2c9b52de7d98 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 4 Jan 2021 16:55:06 -0300 Subject: [PATCH 6/6] test: Add test for TWCS interposer on memtable flush Signed-off-by: Raphael S. Carvalho --- test/boost/sstable_datafile_test.cc | 69 +++++++++++++++++++++++++++++ test/boost/sstable_test.hh | 4 ++ 2 files changed, 73 insertions(+) diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 246a3e0f6f..e0a4f13169 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6744,3 +6744,72 @@ SEASTAR_TEST_CASE(stcs_reshape_test) { BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size()); }); } + +SEASTAR_TEST_CASE(test_twcs_interposer_on_memtable_flush) { + return test_env::do_with_async([] (test_env& env) { + storage_service_for_tests ssft; + + auto test_interposer_on_flush = [&] (bool split_during_flush) { + auto builder = schema_builder("tests", "test_twcs_interposer_on_flush") + .with_column("id", utf8_type, column_kind::partition_key) + .with_column("cl", int32_type, column_kind::clustering_key) + .with_column("value", int32_type); + builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window); + std::map opts = { + { time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" }, + { time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" }, + }; + builder.set_compaction_strategy_options(std::move(opts)); + auto s = builder.build(); + + auto next_timestamp = [] (auto step) { + using namespace std::chrono; + return (gc_clock::now().time_since_epoch() - duration_cast(step)).count(); + }; + auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count); + + auto make_row = [&] (std::chrono::hours step) { + static thread_local int32_t value = 1; + auto key_str = tokens[0].first; + auto key = partition_key::from_exploded(*s, {to_bytes(key_str)}); + + mutation m(s, key); + auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)}); + m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step)); + return m; + }; + + auto tmp = tmpdir(); + auto cm = make_lw_shared(); + column_family::config cfg = column_family_test_config(env.manager()); + cfg.datadir = tmp.path().string(); + cfg.enable_disk_writes = true; + cfg.enable_cache = false; + auto tracker = make_lw_shared(); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker); + cf->mark_ready_for_writes(); + cf->start(); + + size_t target_windows_span = (split_during_flush) ? 10 : 1; + constexpr size_t rows_per_window = 10; + + auto mt = make_lw_shared(s); + for (auto i = 1; i <= target_windows_span; i++) { + for (auto j = 0; j < rows_per_window; j++) { + mt->apply(make_row(std::chrono::hours(i))); + } + } + + auto ret = column_family_test(cf).try_flush_memtable_to_sstable(mt).get0(); + BOOST_REQUIRE(ret == stop_iteration::yes); + + auto expected_ssts = (split_during_flush) ? target_windows_span : 1; + testlog.info("split_during_flush={}, actual={}, expected={}", split_during_flush, cf->get_sstables()->size(), expected_ssts); + BOOST_REQUIRE(cf->get_sstables()->size() == expected_ssts); + }; + + test_interposer_on_flush(true); + test_interposer_on_flush(false); + }); +} diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index e1065ce015..30790ab29c 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -66,6 +66,10 @@ public: static int64_t calculate_shard_from_sstable_generation(int64_t generation) { return column_family::calculate_shard_from_sstable_generation(generation); } + + future try_flush_memtable_to_sstable(lw_shared_ptr mt) { + return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); + } }; namespace sstables {