Revert "Merge 'Block flush until compaction finishes if sstables accumulate' from Mikołaj Sielużycki"
This reverts commitaa8f135f64, reversing changes made to9a88bc260c. The patch causes hangs during flush. Also reverts parts of411231da75that impacted the unit test. Fixes #10897.
This commit is contained in:
@@ -503,7 +503,6 @@ private:
|
||||
|
||||
class table_state;
|
||||
std::unique_ptr<table_state> _table_state;
|
||||
condition_variable _sstables_changed;
|
||||
public:
|
||||
data_dictionary::table as_data_dictionary() const;
|
||||
|
||||
@@ -568,7 +567,6 @@ private:
|
||||
void backlog_tracker_adjust_charges(const std::vector<sstables::shared_sstable>& old_sstables, const std::vector<sstables::shared_sstable>& new_sstables);
|
||||
lw_shared_ptr<memtable> new_memtable();
|
||||
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
|
||||
future<> maybe_wait_for_sstable_count_reduction();
|
||||
// Caller must keep m alive.
|
||||
future<> update_cache(lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts);
|
||||
struct merge_comparator;
|
||||
|
||||
@@ -102,7 +102,6 @@ lw_shared_ptr<sstables::sstable_set> table::make_maintenance_sstable_set() const
|
||||
|
||||
void table::refresh_compound_sstable_set() {
|
||||
_sstables = make_compound_sstable_set();
|
||||
_sstables_changed.signal();
|
||||
}
|
||||
|
||||
// Exposed for testing, not performance critical.
|
||||
@@ -698,7 +697,6 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
co_return stop_iteration(_async_gate.is_closed());
|
||||
}
|
||||
|
||||
co_await maybe_wait_for_sstable_count_reduction();
|
||||
auto f = consumer(std::move(reader));
|
||||
|
||||
// Switch back to default scheduling group for post-flush actions, to avoid them being staved by the memtable flush
|
||||
@@ -735,35 +733,6 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
|
||||
co_return co_await with_scheduling_group(_config.memtable_scheduling_group, std::ref(try_flush));
|
||||
}
|
||||
|
||||
future<> table::maybe_wait_for_sstable_count_reduction() {
|
||||
if (_async_gate.is_closed() || is_auto_compaction_disabled_by_user()) {
|
||||
co_return;
|
||||
}
|
||||
auto sstable_count_below_threshold = [this] {
|
||||
const auto sstable_runs_with_memtable_origin = boost::copy_range<std::unordered_set<utils::UUID>>(
|
||||
*_main_sstables->all()
|
||||
| boost::adaptors::filtered([] (const sstables::shared_sstable& sst) {
|
||||
return sst->get_origin() == "memtable";
|
||||
})
|
||||
| boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier)));
|
||||
const auto threshold = std::max(schema()->max_compaction_threshold(), 32);
|
||||
return sstable_runs_with_memtable_origin.size() <= threshold;
|
||||
};
|
||||
if (sstable_count_below_threshold()) {
|
||||
co_return;
|
||||
}
|
||||
// Reduce the chances of falling into an endless wait, if compaction
|
||||
// wasn't scheduled for the table due to a problem.
|
||||
trigger_compaction();
|
||||
using namespace std::chrono_literals;
|
||||
auto start = db_clock::now();
|
||||
co_await _sstables_changed.wait(sstable_count_below_threshold);
|
||||
auto end = db_clock::now();
|
||||
auto elapsed_ms = (end - start) / 1ms;
|
||||
tlogger.warn("Memtable flush of {}.{} was blocked for {}ms waiting for compaction to catch up on newly created files",
|
||||
schema()->ks_name(), schema()->cf_name(), elapsed_ms);
|
||||
}
|
||||
|
||||
void
|
||||
table::start() {
|
||||
// FIXME: add option to disable automatic compaction.
|
||||
|
||||
@@ -32,7 +32,6 @@
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "test/lib/make_random_string.hh"
|
||||
|
||||
static api::timestamp_type next_timestamp() {
|
||||
@@ -1005,81 +1004,3 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
|
||||
});
|
||||
#endif
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
|
||||
BOOST_ASSERT(smp::count == 2);
|
||||
// The test simulates a situation where 2 threads issue flushes to 2
|
||||
// tables. Both issue small flushes, but one has injected reactor stalls.
|
||||
// This can lead to a situation where lots of small sstables accumulate on
|
||||
// disk, and, if compaction never has a chance to keep up, resources can be
|
||||
// exhausted.
|
||||
return do_with_cql_env([](cql_test_env& env) -> future<> {
|
||||
struct flusher {
|
||||
cql_test_env& env;
|
||||
const int num_flushes;
|
||||
const int sleep_ms;
|
||||
|
||||
static sstring cf_name(unsigned thread_id) {
|
||||
return format("cf_{}", thread_id);
|
||||
}
|
||||
|
||||
static sstring ks_name() {
|
||||
return "ks";
|
||||
}
|
||||
|
||||
future<> create_table(schema_ptr s) {
|
||||
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
auto announcement = co_await mm.prepare_new_column_family_announcement(s, ts);
|
||||
co_await mm.announce(std::move(announcement), std::move(group0_guard));
|
||||
});
|
||||
}
|
||||
|
||||
future<> drop_table() {
|
||||
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
auto announcement = co_await mm.prepare_column_family_drop_announcement(ks_name(), cf_name(shard), ts);
|
||||
co_await mm.announce(std::move(announcement), std::move(group0_guard));
|
||||
});
|
||||
}
|
||||
|
||||
future<> operator()() {
|
||||
const sstring ks_name = this->ks_name();
|
||||
const sstring cf_name = this->cf_name(this_shard_id());
|
||||
random_mutation_generator gen{
|
||||
random_mutation_generator::generate_counters::no,
|
||||
local_shard_only::yes,
|
||||
random_mutation_generator::generate_uncompactable::no,
|
||||
std::nullopt,
|
||||
ks_name.c_str(),
|
||||
cf_name.c_str()
|
||||
};
|
||||
schema_ptr s = gen.schema();
|
||||
|
||||
co_await create_table(s);
|
||||
replica::database& db = env.local_db();
|
||||
replica::table& t = db.find_column_family(ks_name, cf_name);
|
||||
|
||||
for (int value : boost::irange<int>(0, num_flushes)) {
|
||||
::usleep(sleep_ms * 1000);
|
||||
co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout);
|
||||
co_await t.flush();
|
||||
BOOST_ASSERT(t.sstables_count() < t.schema()->max_compaction_threshold() * 2);
|
||||
}
|
||||
co_await drop_table();
|
||||
}
|
||||
};
|
||||
|
||||
int sleep_ms = 2;
|
||||
for (int i : boost::irange<int>(8)) {
|
||||
future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0});
|
||||
future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms});
|
||||
co_await std::move(f0);
|
||||
co_await std::move(f1);
|
||||
sleep_ms *= 2;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1963,8 +1963,8 @@ private:
|
||||
return gc_clock::time_point() + std::chrono::seconds(dist(gen));
|
||||
}
|
||||
|
||||
schema_ptr do_make_schema(data_type type, const char* ks_name, const char* cf_name) {
|
||||
auto builder = schema_builder(ks_name, cf_name)
|
||||
schema_ptr do_make_schema(data_type type) {
|
||||
auto builder = schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column("ck1", bytes_type, column_kind::clustering_key)
|
||||
.with_column("ck2", bytes_type, column_kind::clustering_key);
|
||||
@@ -1981,9 +1981,9 @@ private:
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
schema_ptr make_schema(const char* ks_name, const char* cf_name) {
|
||||
return _generate_counters ? do_make_schema(counter_type, ks_name, cf_name)
|
||||
: do_make_schema(bytes_type, ks_name, cf_name);
|
||||
schema_ptr make_schema() {
|
||||
return _generate_counters ? do_make_schema(counter_type)
|
||||
: do_make_schema(bytes_type);
|
||||
}
|
||||
|
||||
api::timestamp_type gen_timestamp(timestamp_level l) {
|
||||
@@ -2001,13 +2001,13 @@ private:
|
||||
}
|
||||
public:
|
||||
explicit impl(generate_counters counters, local_shard_only lso = local_shard_only::yes,
|
||||
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf") : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) {
|
||||
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt) : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc) {
|
||||
// In case of errors, reproduce using the --random-seed command line option with the test_runner seed.
|
||||
auto seed = seed_opt.value_or(tests::random::get_int<uint32_t>());
|
||||
std::cout << "random_mutation_generator seed: " << seed << "\n";
|
||||
_gen = std::mt19937(seed);
|
||||
|
||||
_schema = make_schema(ks_name, cf_name);
|
||||
_schema = make_schema();
|
||||
|
||||
auto keys = _local_shard_only ? make_local_keys(n_blobs, _schema, _external_blob_size) : make_keys(n_blobs, _schema, _external_blob_size);
|
||||
_blobs = boost::copy_range<std::vector<bytes>>(keys | boost::adaptors::transformed([this] (sstring& k) { return to_bytes(k); }));
|
||||
@@ -2300,8 +2300,8 @@ public:
|
||||
|
||||
random_mutation_generator::~random_mutation_generator() {}
|
||||
|
||||
random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional<uint32_t> seed_opt, const char* ks_name, const char* cf_name)
|
||||
: _impl(std::make_unique<random_mutation_generator::impl>(counters, lso, uc, seed_opt, ks_name, cf_name))
|
||||
random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional<uint32_t> seed_opt)
|
||||
: _impl(std::make_unique<random_mutation_generator::impl>(counters, lso, uc, seed_opt))
|
||||
{ }
|
||||
|
||||
mutation random_mutation_generator::operator()() {
|
||||
|
||||
@@ -52,7 +52,7 @@ public:
|
||||
// tombstone will cover data, i.e. compacting the mutation will not result
|
||||
// in any changes.
|
||||
explicit random_mutation_generator(generate_counters, local_shard_only lso = local_shard_only::yes,
|
||||
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf");
|
||||
generate_uncompactable uc = generate_uncompactable::no, std::optional<uint32_t> seed_opt = std::nullopt);
|
||||
random_mutation_generator(generate_counters gc, uint32_t seed)
|
||||
: random_mutation_generator(gc, local_shard_only::yes, generate_uncompactable::no, seed) {}
|
||||
~random_mutation_generator();
|
||||
|
||||
Reference in New Issue
Block a user