mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge '[branch 4.4] Backport reader_permit: always forward resources to the semaphore ' from Botond Dénes
This is a backport of8aaa3a7to branch-4.4. The main conflicts were around Benny's reader close series (fa43d76), but it also turned out that an additional patch (2f1d65c) also has to backported to make sure admission on signaling resources doesn't deadlock. Refs: #8493 Closes #8571 * github.com:scylladb/scylla: test: mutation_reader_test: add test_reader_concurrency_semaphore_forward_progress test: mutation_reader_test: add test_reader_concurrency_semaphore_readmission_preserves_units reader_concurrency_semaphore: add dump_diagnostics() reader_permit: always forward resources test: multishard_mutation_query_test: fuzzy-test: don't consume resource up-front reader_concurrency_semaphore: make admission conditions consistent
This commit is contained in:
@@ -76,7 +76,7 @@ class reader_permit::impl : public boost::intrusive::list_base_hook<boost::intru
|
||||
sstring _op_name;
|
||||
std::string_view _op_name_view;
|
||||
reader_resources _resources;
|
||||
reader_permit::state _state = reader_permit::state::registered;
|
||||
reader_permit::state _state = reader_permit::state::active;
|
||||
|
||||
public:
|
||||
struct value_tag {};
|
||||
@@ -124,22 +124,17 @@ public:
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
_state = reader_permit::state::admitted;
|
||||
_semaphore.consume(_resources);
|
||||
_state = reader_permit::state::active;
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
_resources += res;
|
||||
if (_state == reader_permit::state::admitted) {
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
|
||||
void signal(reader_resources res) {
|
||||
_resources -= res;
|
||||
if (_state == reader_permit::state::admitted) {
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
|
||||
reader_resources resources() const {
|
||||
@@ -206,14 +201,11 @@ reader_resources reader_permit::consumed_resources() const {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
|
||||
switch (s) {
|
||||
case reader_permit::state::registered:
|
||||
os << "registered";
|
||||
break;
|
||||
case reader_permit::state::waiting:
|
||||
os << "waiting";
|
||||
break;
|
||||
case reader_permit::state::admitted:
|
||||
os << "admitted";
|
||||
case reader_permit::state::active:
|
||||
os << "active";
|
||||
break;
|
||||
}
|
||||
return os;
|
||||
@@ -250,7 +242,7 @@ struct permit_group_key_hash {
|
||||
|
||||
using permit_groups = std::unordered_map<permit_group_key, permit_stats, permit_group_key_hash>;
|
||||
|
||||
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) {
|
||||
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state) {
|
||||
struct permit_summary {
|
||||
const schema* s;
|
||||
std::string_view op_name;
|
||||
@@ -266,25 +258,17 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe
|
||||
}
|
||||
}
|
||||
|
||||
std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) {
|
||||
if (sort_by_memory) {
|
||||
return a.memory < b.memory;
|
||||
} else {
|
||||
return a.count < b.count;
|
||||
}
|
||||
std::ranges::sort(permit_summaries, [] (const permit_summary& a, const permit_summary& b) {
|
||||
return a.memory < b.memory;
|
||||
});
|
||||
|
||||
permit_stats total;
|
||||
|
||||
auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) {
|
||||
if (sort_by_memory) {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
|
||||
} else {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3);
|
||||
}
|
||||
auto print_line = [&os] (auto col1, auto col2, auto col3) {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
|
||||
};
|
||||
|
||||
fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count");
|
||||
fmt::print(os, "Permits with state {}\n", state);
|
||||
print_line("count", "memory", "name");
|
||||
for (const auto& summary : permit_summaries) {
|
||||
total.count += summary.count;
|
||||
@@ -310,11 +294,9 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con
|
||||
permit_stats total;
|
||||
|
||||
fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::active);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting);
|
||||
fmt::print(os, "\n");
|
||||
fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory));
|
||||
}
|
||||
@@ -375,7 +357,7 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(std::unique_ptr<inactive_read> ir) {
|
||||
// Implies _inactive_reads.empty(), we don't queue new readers before
|
||||
// evicting all inactive reads.
|
||||
if (_wait_list.empty()) {
|
||||
if (_wait_list.empty() && _resources.memory > 0) {
|
||||
const auto [it, _] = _inactive_reads.emplace(_next_id++, std::move(ir));
|
||||
(void)_;
|
||||
++_stats.inactive_reads;
|
||||
@@ -425,13 +407,13 @@ bool reader_concurrency_semaphore::try_evict_one_inactive_read() {
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::has_available_units(const resources& r) const {
|
||||
return bool(_resources) && _resources >= r;
|
||||
// Special case: when there is no active reader (based on count) admit one
|
||||
// regardless of availability of memory.
|
||||
return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count;
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::may_proceed(const resources& r) const {
|
||||
// Special case: when there is no active reader (based on count) admit one
|
||||
// regardless of availability of memory.
|
||||
return _wait_list.empty() && (has_available_units(r) || _resources.count == _initial_resources.count);
|
||||
return _wait_list.empty() && has_available_units(r);
|
||||
}
|
||||
|
||||
future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, size_t memory,
|
||||
@@ -482,6 +464,12 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
}
|
||||
}
|
||||
|
||||
std::string reader_concurrency_semaphore::dump_diagnostics() const {
|
||||
std::ostringstream os;
|
||||
do_dump_reader_permit_diagnostics(os, *this, *_permit_list, "user request");
|
||||
return os.str();
|
||||
}
|
||||
|
||||
// A file that tracks the memory usage of buffers resulting from read
|
||||
// operations.
|
||||
class tracking_file_impl : public file_impl {
|
||||
|
||||
@@ -237,4 +237,6 @@ public:
|
||||
}
|
||||
|
||||
void broken(std::exception_ptr ex);
|
||||
|
||||
std::string dump_diagnostics() const;
|
||||
};
|
||||
|
||||
@@ -91,9 +91,8 @@ public:
|
||||
class resource_units;
|
||||
|
||||
enum class state {
|
||||
registered, // read is registered, but didn't attempt admission yet
|
||||
waiting, // waiting for admission
|
||||
admitted,
|
||||
active,
|
||||
};
|
||||
|
||||
class impl;
|
||||
|
||||
@@ -977,14 +977,7 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
|
||||
const auto& partitions = pop_desc.partitions;
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] {
|
||||
auto s = gs.get();
|
||||
auto& sem = db->local().get_reader_concurrency_semaphore();
|
||||
|
||||
auto resources = sem.available_resources();
|
||||
resources -= reader_concurrency_semaphore::resources{1, 0};
|
||||
auto permit = sem.make_permit(s.get(), "fuzzy-test");
|
||||
|
||||
return run_fuzzy_test_workload(cfg, *db, std::move(s), partitions).finally([units = permit.consume_resources(resources)] {});
|
||||
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions);
|
||||
}).handle_exception([seed] (std::exception_ptr e) {
|
||||
testlog.error("Test workload failed with exception {}."
|
||||
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."
|
||||
|
||||
@@ -896,6 +896,221 @@ sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema&
|
||||
, mutations);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class generic_inactive_read : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
|
||||
private:
|
||||
explicit generic_inactive_read(flat_mutation_reader&& rd) : _reader(std::move(rd)) { }
|
||||
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
|
||||
public:
|
||||
static std::unique_ptr<inactive_read> make(flat_mutation_reader&& rd) {
|
||||
return std::make_unique<generic_inactive_read>(generic_inactive_read(std::move(rd)));
|
||||
}
|
||||
|
||||
static flat_mutation_reader_opt get_reader(std::unique_ptr<inactive_read>&& ir) {
|
||||
if (!ir) {
|
||||
return {};
|
||||
}
|
||||
auto gir = dynamic_cast<generic_inactive_read*>(ir.get());
|
||||
BOOST_REQUIRE(gir);
|
||||
return std::move(gir->_reader);
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
// This unit test passes a read through admission again-and-again, just
|
||||
// like an evictable reader would be during its lifetime. When readmitted
|
||||
// the read sometimes has to wait and sometimes not. This is to check that
|
||||
// the readmitting a previously admitted reader doesn't leak any units.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves_units) {
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
std::optional<reader_permit::resource_units> residue_units;
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
const auto have_residue_units = bool(residue_units);
|
||||
|
||||
auto current_resources = initial_resources;
|
||||
if (have_residue_units) {
|
||||
current_resources -= residue_units->resources();
|
||||
}
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
std::optional<reader_permit::resource_units> admitted_units;
|
||||
if (i % 2) {
|
||||
const auto consumed_resources = semaphore.available_resources();
|
||||
semaphore.consume(consumed_resources);
|
||||
|
||||
auto units_fut = permit.wait_admission(1024, db::no_timeout);
|
||||
BOOST_REQUIRE(!units_fut.available());
|
||||
|
||||
semaphore.signal(consumed_resources);
|
||||
admitted_units = units_fut.get();
|
||||
} else {
|
||||
admitted_units = permit.wait_admission(1024, db::no_timeout).get();
|
||||
}
|
||||
|
||||
current_resources -= admitted_units->resources();
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
residue_units.emplace(permit.consume_resources(reader_resources(0, 100)));
|
||||
if (!have_residue_units) {
|
||||
current_resources -= residue_units->resources();
|
||||
}
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(generic_inactive_read::make(make_empty_flat_reader(s.schema(), permit)));
|
||||
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(semaphore.available_resources() == initial_resources - residue_units->resources());
|
||||
|
||||
residue_units.reset();
|
||||
|
||||
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
|
||||
}
|
||||
|
||||
// This unit test checks that the semaphore doesn't get into a deadlock
|
||||
// when contended, in the presence of many memory-only reads (that don't
|
||||
// wait for admission). This is tested by simulating the 3 kind of reads we
|
||||
// currently have in the system:
|
||||
// * memory-only: reads that don't pass admission and only own memory.
|
||||
// * admitted: reads that pass admission.
|
||||
// * evictable: admitted reads that are furthermore evictable.
|
||||
//
|
||||
// The test creates and runs a large number of these reads in parallel,
|
||||
// read kinds being selected randomly, then creates a watchdog which
|
||||
// kills the test if no progress is being made.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
class reader {
|
||||
class skeleton_reader : public flat_mutation_reader::impl {
|
||||
reader_permit::resource_units _base_resources;
|
||||
std::optional<reader_permit::resource_units> _resources;
|
||||
public:
|
||||
skeleton_reader(schema_ptr s, reader_permit permit, reader_permit::resource_units res)
|
||||
: impl(std::move(s), std::move(permit)), _base_resources(std::move(res)) { }
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
_resources.emplace(_permit.consume_resources(reader_resources(0, tests::random::get_int(1024, 2048))));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual void next_partition() override { }
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
|
||||
};
|
||||
struct reader_visitor {
|
||||
reader& r;
|
||||
future<> operator()(std::monostate& ms) { return r.tick(ms); }
|
||||
future<> operator()(flat_mutation_reader& reader) { return r.tick(reader); }
|
||||
future<> operator()(reader_concurrency_semaphore::inactive_read_handle& handle) { return r.tick(handle); }
|
||||
};
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
bool _memory_only = true;
|
||||
bool _evictable = false;
|
||||
std::optional<reader_permit::resource_units> _units;
|
||||
std::variant<std::monostate, flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
|
||||
|
||||
private:
|
||||
future<> make_reader() {
|
||||
auto res = _permit.consume_memory();
|
||||
if (!_memory_only) {
|
||||
res = co_await _permit.wait_admission(1024, db::no_timeout);
|
||||
}
|
||||
_reader = make_flat_mutation_reader<skeleton_reader>(_schema, _permit, std::move(res));
|
||||
}
|
||||
future<> tick(std::monostate&) {
|
||||
co_await make_reader();
|
||||
co_await tick(std::get<flat_mutation_reader>(_reader));
|
||||
}
|
||||
future<> tick(flat_mutation_reader& reader) {
|
||||
co_await reader.fill_buffer(db::no_timeout);
|
||||
if (_evictable) {
|
||||
_reader = _permit.semaphore().register_inactive_read(generic_inactive_read::make(std::move(reader)));
|
||||
}
|
||||
}
|
||||
future<> tick(reader_concurrency_semaphore::inactive_read_handle& handle) {
|
||||
if (auto reader = generic_inactive_read::get_reader(_permit.semaphore().unregister_inactive_read(std::move(handle))); reader) {
|
||||
_reader = std::move(*reader);
|
||||
} else {
|
||||
co_await make_reader();
|
||||
}
|
||||
co_await tick(std::get<flat_mutation_reader>(_reader));
|
||||
}
|
||||
|
||||
public:
|
||||
reader(schema_ptr s, reader_permit permit, bool memory_only, bool evictable)
|
||||
: _schema(std::move(s))
|
||||
, _permit(std::move(permit))
|
||||
, _memory_only(memory_only)
|
||||
, _evictable(evictable)
|
||||
, _units(_permit.consume_memory(tests::random::get_int(128, 1024)))
|
||||
{
|
||||
}
|
||||
future<> tick() {
|
||||
return std::visit(reader_visitor{*this}, _reader);
|
||||
}
|
||||
};
|
||||
|
||||
const auto count = 10;
|
||||
const auto num_readers = 512;
|
||||
const auto ticks = 1000;
|
||||
|
||||
simple_schema s;
|
||||
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
|
||||
|
||||
std::list<std::optional<reader>> readers;
|
||||
unsigned nr_memory_only = 0;
|
||||
unsigned nr_admitted = 0;
|
||||
unsigned nr_evictable = 0;
|
||||
|
||||
for (auto i = 0; i < num_readers; ++i) {
|
||||
const auto memory_only = tests::random::get_bool();
|
||||
const auto evictable = !memory_only && tests::random::get_bool();
|
||||
if (memory_only) {
|
||||
++nr_memory_only;
|
||||
} else if (evictable) {
|
||||
++nr_evictable;
|
||||
} else {
|
||||
++nr_admitted;
|
||||
}
|
||||
readers.emplace_back(reader(s.schema(), semaphore.make_permit(s.schema().get(), fmt::format("reader{}", i)), memory_only, evictable));
|
||||
}
|
||||
|
||||
testlog.info("Created {} readers, memory_only={}, admitted={}, evictable={}", readers.size(), nr_memory_only, nr_admitted, nr_evictable);
|
||||
|
||||
bool watchdog_touched = false;
|
||||
auto watchdog = timer<db::timeout_clock>([&semaphore, &watchdog_touched] {
|
||||
if (!watchdog_touched) {
|
||||
testlog.error("Watchdog detected a deadlock, dumping diagnostics before killing the test: {}", semaphore.dump_diagnostics());
|
||||
semaphore.broken(std::make_exception_ptr(std::runtime_error("test killed by watchdog")));
|
||||
}
|
||||
watchdog_touched = false;
|
||||
});
|
||||
watchdog.arm_periodic(std::chrono::seconds(30));
|
||||
|
||||
parallel_for_each(readers, [&] (std::optional<reader>& r) -> future<> {
|
||||
for (auto i = 0; i < ticks; ++i) {
|
||||
watchdog_touched = true;
|
||||
co_await r->tick();
|
||||
}
|
||||
r.reset();
|
||||
watchdog_touched = true;
|
||||
}).get();
|
||||
}
|
||||
|
||||
static
|
||||
sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector<mutation> mutations) {
|
||||
static thread_local auto tmp = tmpdir();
|
||||
|
||||
@@ -712,7 +712,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
nullptr,
|
||||
db::no_timeout).get();
|
||||
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 1);
|
||||
// The second read might be evicted too if it consumes more
|
||||
// memory than the first and hence triggers memory control when
|
||||
// saved in the querier cache.
|
||||
BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1);
|
||||
|
||||
// We want to read the entire partition so that the querier
|
||||
// is not saved at the end and thus ensure it is destroyed.
|
||||
|
||||
Reference in New Issue
Block a user