diff --git a/database.hh b/database.hh index d15166e3df..ec425788d1 100644 --- a/database.hh +++ b/database.hh @@ -546,7 +546,7 @@ private: lw_shared_ptr new_memtable(); future try_flush_memtable_to_sstable(lw_shared_ptr memt, sstable_write_permit&& permit); // Caller must keep m alive. - future<> update_cache(lw_shared_ptr m, sstables::shared_sstable sst); + future<> update_cache(lw_shared_ptr m, std::vector ssts); struct merge_comparator; // update the sstable generation, making sure that new new sstables don't overwrite this one. diff --git a/memtable-sstable.hh b/memtable-sstable.hh index 9346d80f6d..a522f9bbd8 100644 --- a/memtable-sstable.hh +++ b/memtable-sstable.hh @@ -38,6 +38,13 @@ class sstables_manager; class sstable_writer_config; } +future<> +write_memtable_to_sstable(flat_mutation_reader reader, + memtable& mt, sstables::shared_sstable sst, + sstables::write_monitor& monitor, + sstables::sstable_writer_config& cfg, + const io_priority_class& pc); + future<> write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, diff --git a/memtable.cc b/memtable.cc index 4965e676f4..0accd0e301 100644 --- a/memtable.cc +++ b/memtable.cc @@ -34,12 +34,12 @@ thread_local reader_concurrency_semaphore _flush_semaphore(reader_concurrency_se void memtable::memtable_encoding_stats_collector::update_timestamp(api::timestamp_type ts) { if (ts != api::missing_timestamp) { encoding_stats_collector::update_timestamp(ts); - max_timestamp.update(ts); + min_max_timestamp.update(ts); } } memtable::memtable_encoding_stats_collector::memtable_encoding_stats_collector() - : max_timestamp(0) + : min_max_timestamp(0, 0) {} void memtable::memtable_encoding_stats_collector::update(atomic_cell_view cell) { diff --git a/memtable.hh b/memtable.hh index fe63a867da..3a78c734f7 100644 --- a/memtable.hh +++ b/memtable.hh @@ -138,7 +138,7 @@ private: class memtable_encoding_stats_collector : public encoding_stats_collector { private: - max_tracker max_timestamp; + min_max_tracker min_max_timestamp; void update_timestamp(api::timestamp_type ts); @@ -154,8 +154,12 @@ private: void update(const ::schema& s, const deletable_row& dr); void update(const ::schema& s, const mutation_partition& mp); + api::timestamp_type get_min_timestamp() const { + return min_max_timestamp.min(); + } + api::timestamp_type get_max_timestamp() const { - return max_timestamp.get(); + return min_max_timestamp.max(); } } _stats_collector; @@ -212,6 +216,10 @@ public: return _stats_collector.get(); } + api::timestamp_type get_min_timestamp() const { + return _stats_collector.get_min_timestamp(); + } + api::timestamp_type get_max_timestamp() const { return _stats_collector.get_max_timestamp(); } diff --git a/table.cc b/table.cc index ae252f2684..1e2aca6673 100644 --- a/table.cc +++ b/table.cc @@ -42,6 +42,7 @@ #include #include "utils/error_injection.hh" #include "utils/histogram_metrics_helper.hh" +#include "mutation_source_metadata.hh" static logging::logger tlogger("table"); static seastar::metrics::label column_family_label("cf"); @@ -369,11 +370,18 @@ table::add_sstable_and_update_cache(sstables::shared_sstable sst) { } future<> -table::update_cache(lw_shared_ptr m, sstables::shared_sstable sst) { - auto adder = row_cache::external_updater([this, m, sst] { - auto newtab_ms = sst->as_mutation_source(); - add_sstable(sst); - m->mark_flushed(std::move(newtab_ms)); +table::update_cache(lw_shared_ptr m, std::vector ssts) { + std::vector sources; + sources.reserve(ssts.size()); + for (auto& sst : ssts) { + sources.push_back(sst->as_mutation_source()); + } + auto new_ssts_ms = make_combined_mutation_source(std::move(sources)); + auto adder = row_cache::external_updater([this, m, ssts = std::move(ssts), new_ssts_ms = std::move(new_ssts_ms)] () mutable { + for (auto& sst : ssts) { + add_sstable(sst); + } + m->mark_flushed(std::move(new_ssts_ms)); try_trigger_compaction(); }); if (cache_enabled()) { @@ -386,9 +394,9 @@ table::update_cache(lw_shared_ptr m, sstables::shared_sstable sst) { // Handles permit management only, used for situations where we don't want to inform // the compaction manager about backlogs (i.e., tests) class permit_monitor : public sstables::write_monitor { - sstable_write_permit _permit; + lw_shared_ptr _permit; public: - permit_monitor(sstable_write_permit&& permit) + permit_monitor(lw_shared_ptr permit) : _permit(std::move(permit)) { } @@ -398,7 +406,7 @@ public: // we'll have a period without significant disk activity when the current // SSTable is being sealed, the caches are being updated, etc. To do that, // we ensure the permit doesn't outlive this continuation. - _permit = sstable_write_permit::unconditional(); + *_permit = sstable_write_permit::unconditional(); } }; @@ -411,8 +419,8 @@ class database_sstable_write_monitor : public permit_monitor, public backlog_wri uint64_t _progress_seen = 0; api::timestamp_type _maximum_timestamp; public: - database_sstable_write_monitor(sstable_write_permit&& permit, sstables::shared_sstable sst, compaction_manager& manager, - sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) + database_sstable_write_monitor(lw_shared_ptr permit, sstables::shared_sstable sst, + compaction_manager& manager, sstables::compaction_strategy& strategy, api::timestamp_type max_timestamp) : permit_monitor(std::move(permit)) , _sst(std::move(sst)) , _compaction_manager(manager) @@ -515,9 +523,6 @@ table::seal_active_memtable(flush_permit&& permit) { future table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_permit&& permit) { return with_scheduling_group(_config.memtable_scheduling_group, [this, old = std::move(old), permit = std::move(permit)] () mutable { - auto newtab = make_sstable(); - - tlogger.debug("Flushing to {}", newtab->get_filename()); // Note that due to our sharded architecture, it is possible that // in the face of a value change some shards will backup sstables // while others won't. @@ -529,31 +534,57 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ // // The code as is guarantees that we'll never partially backup a // single sstable, so that is enough of a guarantee. - database_sstable_write_monitor monitor(std::move(permit), newtab, _compaction_manager, _compaction_strategy, old->get_max_timestamp()); - return do_with(std::move(monitor), [this, old, newtab] (auto& monitor) { - auto&& priority = service::get_local_memtable_flush_priority(); - sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); - cfg.backup = incremental_backups_enabled(); - auto f = write_memtable_to_sstable(*old, newtab, monitor, cfg, priority); + + return do_with(std::vector(), [this, old, permit = make_lw_shared(std::move(permit))] (auto& newtabs) { + auto metadata = mutation_source_metadata{}; + metadata.min_timestamp = old->get_min_timestamp(); + metadata.max_timestamp = old->get_max_timestamp(); + + auto consumer = _compaction_strategy.make_interposer_consumer(metadata, [this, old, permit, &newtabs] (flat_mutation_reader reader) mutable { + auto&& priority = service::get_local_memtable_flush_priority(); + sstables::sstable_writer_config cfg = get_sstables_manager().configure_writer(); + cfg.backup = incremental_backups_enabled(); + + auto newtab = make_sstable(); + newtabs.push_back(newtab); + tlogger.debug("Flushing to {}", newtab->get_filename()); + + auto monitor = database_sstable_write_monitor(permit, newtab, _compaction_manager, _compaction_strategy, + old->get_max_timestamp()); + + return do_with(std::move(monitor), [newtab, cfg = std::move(cfg), old, reader = std::move(reader), &priority] (auto& monitor) mutable { + // FIXME: certain writers may receive only a small subset of the partitions, so bloom filters will be + // bigger than needed, due to overestimation. That's eventually adjusted through compaction, though. + return write_memtable_to_sstable(std::move(reader), *old, newtab, monitor, cfg, priority); + }); + }); + + auto f = consumer(old->make_flush_reader(old->schema(), service::get_local_memtable_flush_priority())); + // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush // controller. Cache update does not affect the input of the memtable cpu controller, so it can be subject to // priority inversion. - return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), newtab = std::move(newtab), f = std::move(f)] () mutable { - return f.then([this, newtab, old] { - return newtab->open_data().then([this, old, newtab] () { - tlogger.debug("Flushing to {} done", newtab->get_filename()); - return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, newtab] { - return update_cache(old, newtab); + return with_scheduling_group(default_scheduling_group(), [this, old = std::move(old), &newtabs, f = std::move(f)] () mutable { + return f.then([this, &newtabs, old] { + return parallel_for_each(newtabs, [] (auto& newtab) { + return newtab->open_data().then([&newtab] { + tlogger.debug("Flushing to {} done", newtab->get_filename()); }); - }).then([this, old, newtab] () noexcept { + }).then([this, old, &newtabs] () { + return with_scheduling_group(_config.memtable_to_cache_scheduling_group, [this, old, &newtabs] { + return update_cache(old, newtabs); + }); + }).then([this, old, &newtabs] () noexcept { _memtables->erase(old); - tlogger.debug("Memtable for {} replaced", newtab->get_filename()); + tlogger.debug("Memtable for {}.{} replaced, into {} sstables", old->schema()->ks_name(), old->schema()->cf_name(), newtabs.size()); return stop_iteration::yes; }); - }).handle_exception([this, old, newtab] (auto e) { - newtab->mark_for_deletion(); + }).handle_exception([this, old, &newtabs] (auto e) { + for (auto& newtab : newtabs) { + newtab->mark_for_deletion(); + tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e); + } _config.cf_stats->failed_memtables_flushes_count++; - tlogger.error("failed to write sstable {}: {}", newtab->get_filename(), e); // If we failed this write we will try the write again and that will create a new flush reader // that will decrease dirty memory again. So we need to reset the accounting. old->revert_flushed_memory(); @@ -1631,19 +1662,27 @@ table::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle } future<> -write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, +write_memtable_to_sstable(flat_mutation_reader reader, + memtable& mt, sstables::shared_sstable sst, sstables::write_monitor& monitor, sstables::sstable_writer_config& cfg, const io_priority_class& pc) { cfg.replay_position = mt.replay_position(); cfg.monitor = &monitor; - return sst->write_components(mt.make_flush_reader(mt.schema(), pc), mt.partition_count(), - mt.schema(), cfg, mt.get_encoding_stats(), pc); + return sst->write_components(std::move(reader), mt.partition_count(), mt.schema(), cfg, mt.get_encoding_stats(), pc); +} + +future<> +write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, + sstables::write_monitor& monitor, + sstables::sstable_writer_config& cfg, + const io_priority_class& pc) { + return write_memtable_to_sstable(mt.make_flush_reader(mt.schema(), pc), mt, std::move(sst), monitor, cfg, pc); } future<> write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, sstables::sstable_writer_config cfg) { - return do_with(permit_monitor(sstable_write_permit::unconditional()), cfg, [&mt, sst] (auto& monitor, auto& cfg) { + return do_with(permit_monitor(make_lw_shared(sstable_write_permit::unconditional())), cfg, [&mt, sst] (auto& monitor, auto& cfg) { return write_memtable_to_sstable(mt, std::move(sst), monitor, cfg); }); } diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 5abd8ceaea..46f56d9042 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6745,3 +6745,72 @@ SEASTAR_TEST_CASE(stcs_reshape_test) { BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size()); }); } + +SEASTAR_TEST_CASE(test_twcs_interposer_on_memtable_flush) { + return test_env::do_with_async([] (test_env& env) { + storage_service_for_tests ssft; + + auto test_interposer_on_flush = [&] (bool split_during_flush) { + auto builder = schema_builder("tests", "test_twcs_interposer_on_flush") + .with_column("id", utf8_type, column_kind::partition_key) + .with_column("cl", int32_type, column_kind::clustering_key) + .with_column("value", int32_type); + builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window); + std::map opts = { + { time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" }, + { time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" }, + }; + builder.set_compaction_strategy_options(std::move(opts)); + auto s = builder.build(); + + auto next_timestamp = [] (auto step) { + using namespace std::chrono; + return (gc_clock::now().time_since_epoch() - duration_cast(step)).count(); + }; + auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count); + + auto make_row = [&] (std::chrono::hours step) { + static thread_local int32_t value = 1; + auto key_str = tokens[0].first; + auto key = partition_key::from_exploded(*s, {to_bytes(key_str)}); + + mutation m(s, key); + auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)}); + m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step)); + return m; + }; + + auto tmp = tmpdir(); + auto cm = make_lw_shared(); + column_family::config cfg = column_family_test_config(env.manager()); + cfg.datadir = tmp.path().string(); + cfg.enable_disk_writes = true; + cfg.enable_cache = false; + auto tracker = make_lw_shared(); + cell_locker_stats cl_stats; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker); + cf->mark_ready_for_writes(); + cf->start(); + + size_t target_windows_span = (split_during_flush) ? 10 : 1; + constexpr size_t rows_per_window = 10; + + auto mt = make_lw_shared(s); + for (auto i = 1; i <= target_windows_span; i++) { + for (auto j = 0; j < rows_per_window; j++) { + mt->apply(make_row(std::chrono::hours(i))); + } + } + + auto ret = column_family_test(cf).try_flush_memtable_to_sstable(mt).get0(); + BOOST_REQUIRE(ret == stop_iteration::yes); + + auto expected_ssts = (split_during_flush) ? target_windows_span : 1; + testlog.info("split_during_flush={}, actual={}, expected={}", split_during_flush, cf->get_sstables()->size(), expected_ssts); + BOOST_REQUIRE(cf->get_sstables()->size() == expected_ssts); + }; + + test_interposer_on_flush(true); + test_interposer_on_flush(false); + }); +} diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index e1065ce015..30790ab29c 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -66,6 +66,10 @@ public: static int64_t calculate_shard_from_sstable_generation(int64_t generation) { return column_family::calculate_shard_from_sstable_generation(generation); } + + future try_flush_memtable_to_sstable(lw_shared_ptr mt) { + return _cf->try_flush_memtable_to_sstable(mt, sstable_write_permit::unconditional()); + } }; namespace sstables {