From 419fe652598f01fd0cfc19d642313bbb5380f630 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 6 Jul 2022 12:19:02 +0300 Subject: [PATCH] =?UTF-8?q?Revert=20"Merge=20'Block=20flush=20until=20comp?= =?UTF-8?q?action=20finishes=20if=20sstables=20accumulate'=20from=20Miko?= =?UTF-8?q?=C5=82aj=20Sielu=C5=BCycki"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit aa8f135f64402c42def75912fe23dae759c80640, reversing changes made to 9a88bc260ce4d539ecb48efa92f1717d2f75a112. The patch causes hangs during flush. Also reverts parts of 411231da75b that impacted the unit test. Fixes #10897. --- replica/database.hh | 2 - replica/table.cc | 31 ------------- test/boost/memtable_test.cc | 79 -------------------------------- test/lib/mutation_source_test.cc | 18 ++++---- test/lib/mutation_source_test.hh | 2 +- 5 files changed, 10 insertions(+), 122 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index 2dd30e3227..64ddde1756 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -503,7 +503,6 @@ private: class table_state; std::unique_ptr _table_state; - condition_variable _sstables_changed; public: data_dictionary::table as_data_dictionary() const; @@ -568,7 +567,6 @@ private: void backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables); lw_shared_ptr new_memtable(); future try_flush_memtable_to_sstable(lw_shared_ptr memt, sstable_write_permit&& permit); - future<> maybe_wait_for_sstable_count_reduction(); // Caller must keep m alive. future<> update_cache(lw_shared_ptr m, std::vector ssts); struct merge_comparator; diff --git a/replica/table.cc b/replica/table.cc index ba32386454..4bd2fa2e10 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -102,7 +102,6 @@ lw_shared_ptr table::make_maintenance_sstable_set() const void table::refresh_compound_sstable_set() { _sstables = make_compound_sstable_set(); - _sstables_changed.signal(); } // Exposed for testing, not performance critical. @@ -698,7 +697,6 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ co_return stop_iteration(_async_gate.is_closed()); } - co_await maybe_wait_for_sstable_count_reduction(); auto f = consumer(std::move(reader)); // Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush @@ -735,35 +733,6 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush)); } -future<> table::maybe_wait_for_sstable_count_reduction() { - if (_async_gate.is_closed() || is_auto_compaction_disabled_by_user()) { - co_return; - } - auto sstable_count_below_threshold = [this] { - const auto sstable_runs_with_memtable_origin = boost::copy_range>( - *_main_sstables->all() - | boost::adaptors::filtered([] (const sstables::shared_sstable& sst) { - return sst->get_origin() == "memtable"; - }) - | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))); - const auto threshold = std::max(schema()->max_compaction_threshold(), 32); - return sstable_runs_with_memtable_origin.size() <= threshold; - }; - if (sstable_count_below_threshold()) { - co_return; - } - // Reduce the chances of falling into an endless wait, if compaction - // wasn't scheduled for the table due to a problem. - trigger_compaction(); - using namespace std::chrono_literals; - auto start = db_clock::now(); - co_await _sstables_changed.wait(sstable_count_below_threshold); - auto end = db_clock::now(); - auto elapsed_ms = (end - start) / 1ms; - tlogger.warn("Memtable flush of {}.{} was blocked for {}ms waiting for compaction to catch up on newly created files", - schema()->ks_name(), schema()->cf_name(), elapsed_ms); -} - void table::start() { // FIXME: add option to disable automatic compaction. diff --git a/test/boost/memtable_test.cc b/test/boost/memtable_test.cc index 985cafbbaa..e41977817f 100644 --- a/test/boost/memtable_test.cc +++ b/test/boost/memtable_test.cc @@ -32,7 +32,6 @@ #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/simple_schema.hh" #include "utils/error_injection.hh" -#include "db/commitlog/commitlog.hh" #include "test/lib/make_random_string.hh" static api::timestamp_type next_timestamp() { @@ -1005,81 +1004,3 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) { }); #endif } - -SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) { - BOOST_ASSERT(smp::count == 2); - // The test simulates a situation where 2 threads issue flushes to 2 - // tables. Both issue small flushes, but one has injected reactor stalls. - // This can lead to a situation where lots of small sstables accumulate on - // disk, and, if compaction never has a chance to keep up, resources can be - // exhausted. - return do_with_cql_env([](cql_test_env& env) -> future<> { - struct flusher { - cql_test_env& env; - const int num_flushes; - const int sleep_ms; - - static sstring cf_name(unsigned thread_id) { - return format("cf_{}", thread_id); - } - - static sstring ks_name() { - return "ks"; - } - - future<> create_table(schema_ptr s) { - return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> { - auto group0_guard = co_await mm.start_group0_operation(); - auto ts = group0_guard.write_timestamp(); - auto announcement = co_await mm.prepare_new_column_family_announcement(s, ts); - co_await mm.announce(std::move(announcement), std::move(group0_guard)); - }); - } - - future<> drop_table() { - return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> { - auto group0_guard = co_await mm.start_group0_operation(); - auto ts = group0_guard.write_timestamp(); - auto announcement = co_await mm.prepare_column_family_drop_announcement(ks_name(), cf_name(shard), ts); - co_await mm.announce(std::move(announcement), std::move(group0_guard)); - }); - } - - future<> operator()() { - const sstring ks_name = this->ks_name(); - const sstring cf_name = this->cf_name(this_shard_id()); - random_mutation_generator gen{ - random_mutation_generator::generate_counters::no, - local_shard_only::yes, - random_mutation_generator::generate_uncompactable::no, - std::nullopt, - ks_name.c_str(), - cf_name.c_str() - }; - schema_ptr s = gen.schema(); - - co_await create_table(s); - replica::database& db = env.local_db(); - replica::table& t = db.find_column_family(ks_name, cf_name); - - for (int value : boost::irange(0, num_flushes)) { - ::usleep(sleep_ms * 1000); - co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout); - co_await t.flush(); - BOOST_ASSERT(t.sstables_count() < t.schema()->max_compaction_threshold() * 2); - } - co_await drop_table(); - } - }; - - int sleep_ms = 2; - for (int i : boost::irange(8)) { - future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0}); - future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms}); - co_await std::move(f0); - co_await std::move(f1); - sleep_ms *= 2; - } - }); -} - diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index accc149848..a06ce7888e 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -1963,8 +1963,8 @@ private: return gc_clock::time_point() + std::chrono::seconds(dist(gen)); } - schema_ptr do_make_schema(data_type type, const char* ks_name, const char* cf_name) { - auto builder = schema_builder(ks_name, cf_name) + schema_ptr do_make_schema(data_type type) { + auto builder = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck1", bytes_type, column_kind::clustering_key) .with_column("ck2", bytes_type, column_kind::clustering_key); @@ -1981,9 +1981,9 @@ private: return builder.build(); } - schema_ptr make_schema(const char* ks_name, const char* cf_name) { - return _generate_counters ? do_make_schema(counter_type, ks_name, cf_name) - : do_make_schema(bytes_type, ks_name, cf_name); + schema_ptr make_schema() { + return _generate_counters ? do_make_schema(counter_type) + : do_make_schema(bytes_type); } api::timestamp_type gen_timestamp(timestamp_level l) { @@ -2001,13 +2001,13 @@ private: } public: explicit impl(generate_counters counters, local_shard_only lso = local_shard_only::yes, - generate_uncompactable uc = generate_uncompactable::no, std::optional seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf") : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) { + generate_uncompactable uc = generate_uncompactable::no, std::optional seed_opt = std::nullopt) : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) { // In case of errors, reproduce using the --random-seed command line option with the test_runner seed. auto seed = seed_opt.value_or(tests::random::get_int()); std::cout << "random_mutation_generator seed: " << seed << "\n"; _gen = std::mt19937(seed); - _schema = make_schema(ks_name, cf_name); + _schema = make_schema(); auto keys = _local_shard_only ? make_local_keys(n_blobs, _schema, _external_blob_size) : make_keys(n_blobs, _schema, _external_blob_size); _blobs = boost::copy_range>(keys | boost::adaptors::transformed([this] (sstring& k) { return to_bytes(k); })); @@ -2300,8 +2300,8 @@ public: random_mutation_generator::~random_mutation_generator() {} -random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional seed_opt, const char* ks_name, const char* cf_name) - : _impl(std::make_unique(counters, lso, uc, seed_opt, ks_name, cf_name)) +random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional seed_opt) + : _impl(std::make_unique(counters, lso, uc, seed_opt)) { } mutation random_mutation_generator::operator()() { diff --git a/test/lib/mutation_source_test.hh b/test/lib/mutation_source_test.hh index 7aa00cdae9..c397333aa0 100644 --- a/test/lib/mutation_source_test.hh +++ b/test/lib/mutation_source_test.hh @@ -52,7 +52,7 @@ public: // tombstone will cover data, i.e. compacting the mutation will not result // in any changes. explicit random_mutation_generator(generate_counters, local_shard_only lso = local_shard_only::yes, - generate_uncompactable uc = generate_uncompactable::no, std::optional seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf"); + generate_uncompactable uc = generate_uncompactable::no, std::optional seed_opt = std::nullopt); random_mutation_generator(generate_counters gc, uint32_t seed) : random_mutation_generator(gc, local_shard_only::yes, generate_uncompactable::no, seed) {} ~random_mutation_generator();