Merge 'reader_permit: always forward resources to the semaphore' from Botond

This series is a conceptual revert of 4c8ab10, which turned out to be a
misguided defense mechanism that proved to be a hotbed for bugs. This
protection was superseded by 0fe75571d9 which guarantees forward
progress at all times without all the gotchas and bad interactions
introduced by 4c8ab10.
The latest instance of bad interaction that triggered this series is a
case of resource units being leaked when a previously evicted reader is
re-admitted, leaking already owned resources on each re-admission.
To prove that neither the resource leak, nor the deadlock 4c8ab10 was
supposed to guard against exists after this series, it includes two unit
tests stressing the respective areas: readmission and admission on a
highly contested semaphore.

Fixes: #8493

Also on: https://github.com/denesb/scylla.git
reader-permit-resource-leak-v2

Changelog

v2:
* Rebase over the recently merged reader close series. Fix merge
  conflicts and an exposed bug.

* 'reader-permit-resource-leak-v2' of https://github.com/denesb/scylla:
  test: mutation_reader_test: add test_reader_concurrency_semaphore_forward_progress
  test: mutation_reader_test: add test_reader_concurrency_semaphore_readmission_preserves_units
  reader_concurrency_semaphore: add dump_diagnostics()
  reader_permit: always forward resources
  reader_concurrency_semaphore: inactive_read_handle: abandon(): close reader
This commit is contained in:
Piotr Sarna
2021-04-26 16:30:18 +02:00
5 changed files with 251 additions and 55 deletions

View File

@@ -78,7 +78,7 @@ class reader_permit::impl : public boost::intrusive::list_base_hook<boost::intru
sstring _op_name;
std::string_view _op_name_view;
reader_resources _resources;
reader_permit::state _state = reader_permit::state::registered;
reader_permit::state _state = reader_permit::state::active;
public:
struct value_tag {};
@@ -125,40 +125,25 @@ public:
}
void on_admission() {
_state = reader_permit::state::admitted;
_semaphore.consume(_resources);
_state = reader_permit::state::active;
}
void on_register_as_inactive() {
if (_state != reader_permit::state::admitted) {
_state = reader_permit::state::inactive;
_semaphore.consume(_resources);
}
_state = reader_permit::state::inactive;
}
void on_unregister_as_inactive() {
if (_state == reader_permit::state::inactive) {
_state = reader_permit::state::registered;
_semaphore.signal(_resources);
}
}
bool should_forward_cost() const {
return _state == reader_permit::state::admitted || _state == reader_permit::state::inactive;
_state = reader_permit::state::active;
}
void consume(reader_resources res) {
_resources += res;
if (should_forward_cost()) {
_semaphore.consume(res);
}
_semaphore.consume(res);
}
void signal(reader_resources res) {
_resources -= res;
if (should_forward_cost()) {
_semaphore.signal(res);
}
_semaphore.signal(res);
}
reader_resources resources() const {
@@ -236,14 +221,11 @@ sstring reader_permit::description() const {
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
switch (s) {
case reader_permit::state::registered:
os << "registered";
break;
case reader_permit::state::waiting:
os << "waiting";
break;
case reader_permit::state::admitted:
os << "admitted";
case reader_permit::state::active:
os << "active";
break;
case reader_permit::state::inactive:
os << "inactive";
@@ -283,7 +265,7 @@ struct permit_group_key_hash {
using permit_groups = std::unordered_map<permit_group_key, permit_stats, permit_group_key_hash>;
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) {
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state) {
struct permit_summary {
const schema* s;
std::string_view op_name;
@@ -299,25 +281,17 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe
}
}
std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) {
if (sort_by_memory) {
return a.memory < b.memory;
} else {
return a.count < b.count;
}
std::ranges::sort(permit_summaries, [] (const permit_summary& a, const permit_summary& b) {
return a.memory < b.memory;
});
permit_stats total;
auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) {
if (sort_by_memory) {
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
} else {
fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3);
}
auto print_line = [&os] (auto col1, auto col2, auto col3) {
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
};
fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count");
fmt::print(os, "Permits with state {}\n", state);
print_line("count", "memory", "name");
for (const auto& summary : permit_summaries) {
total.count += summary.count;
@@ -343,13 +317,11 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con
permit_stats total;
fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem);
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true);
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::active);
fmt::print(os, "\n");
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive, false);
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive);
fmt::print(os, "\n");
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false);
fmt::print(os, "\n");
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false);
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting);
fmt::print(os, "\n");
fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory));
}
@@ -385,7 +357,10 @@ void reader_concurrency_semaphore::inactive_read::detach() noexcept {
}
void reader_concurrency_semaphore::inactive_read_handle::abandon() noexcept {
delete std::exchange(_irp, nullptr);
if (_irp) {
_sem->close_reader(std::move(_irp->reader));
delete std::exchange(_irp, nullptr);
}
}
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
@@ -429,11 +404,9 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
auto& permit_impl = *reader.permit()._impl;
// Implies _inactive_reads.empty(), we don't queue new readers before
// evicting all inactive reads.
// FIXME: #4758, workaround for keeping tabs on un-admitted reads that are
// still registered as inactive. Without the below check, these can
// accumulate without limit. The real fix is #4758 -- that is to make all
// reads pass admission before getting started.
if (_wait_list.empty() && (permit_impl.get_state() == reader_permit::state::admitted || _resources >= permit_impl.resources())) {
// Checking the _wait_list covers the count resources only, so check memory
// separately.
if (_wait_list.empty() && _resources.memory > 0) {
try {
auto irp = std::make_unique<inactive_read>(std::move(reader));
auto& ir = *irp;
@@ -634,6 +607,12 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
}
}
std::string reader_concurrency_semaphore::dump_diagnostics() const {
std::ostringstream os;
do_dump_reader_permit_diagnostics(os, *this, *_permit_list, "user request");
return os.str();
}
// A file that tracks the memory usage of buffers resulting from read
// operations.
class tracking_file_impl : public file_impl {

View File

@@ -310,4 +310,6 @@ public:
}
void broken(std::exception_ptr ex = {});
std::string dump_diagnostics() const;
};

View File

@@ -91,10 +91,9 @@ public:
class resource_units;
enum class state {
registered, // read is registered, but didn't attempt admission yet
waiting, // waiting for admission
admitted,
inactive, // un-admitted reads that are registered as inactive
active,
inactive,
};
class impl;

View File

@@ -971,6 +971,219 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_abandoned_handle_closes_reader) {
simple_schema s;
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
auto permit = semaphore.make_permit(s.schema().get(), get_name());
{
auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit));
// The handle is destroyed here, triggering the destrution of the inactive read.
// If the test fails an assert() is triggered due to the reader being
// destroyed without having been closed before.
}
}
// This unit test passes a read through admission again-and-again, just
// like an evictable reader would be during its lifetime. When readmitted
// the read sometimes has to wait and sometimes not. This is to check that
// the readmitting a previously admitted reader doesn't leak any units.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves_units) {
simple_schema s;
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto stop_sem = deferred_stop(semaphore);
std::optional<reader_permit::resource_units> residue_units;
for (int i = 0; i < 10; ++i) {
const auto have_residue_units = bool(residue_units);
auto current_resources = initial_resources;
if (have_residue_units) {
current_resources -= residue_units->resources();
}
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
std::optional<reader_permit::resource_units> admitted_units;
if (i % 2) {
const auto consumed_resources = semaphore.available_resources();
semaphore.consume(consumed_resources);
auto units_fut = permit.wait_admission(1024, db::no_timeout);
BOOST_REQUIRE(!units_fut.available());
semaphore.signal(consumed_resources);
admitted_units = units_fut.get();
} else {
admitted_units = permit.wait_admission(1024, db::no_timeout).get();
}
current_resources -= admitted_units->resources();
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
residue_units.emplace(permit.consume_resources(reader_resources(0, 100)));
if (!have_residue_units) {
current_resources -= residue_units->resources();
}
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit));
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
}
BOOST_REQUIRE(semaphore.available_resources() == initial_resources - residue_units->resources());
residue_units.reset();
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
}
// This unit test checks that the semaphore doesn't get into a deadlock
// when contended, in the presence of many memory-only reads (that don't
// wait for admission). This is tested by simulating the 3 kind of reads we
// currently have in the system:
// * memory-only: reads that don't pass admission and only own memory.
// * admitted: reads that pass admission.
// * evictable: admitted reads that are furthermore evictable.
//
// The test creates and runs a large number of these reads in parallel,
// read kinds being selected randomly, then creates a watchdog which
// kills the test if no progress is being made.
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
class reader {
class skeleton_reader : public flat_mutation_reader::impl {
reader_permit::resource_units _base_resources;
std::optional<reader_permit::resource_units> _resources;
public:
skeleton_reader(schema_ptr s, reader_permit permit, reader_permit::resource_units res)
: impl(std::move(s), std::move(permit)), _base_resources(std::move(res)) { }
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
_resources.emplace(_permit.consume_resources(reader_resources(0, tests::random::get_int(1024, 2048))));
return make_ready_future<>();
}
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
virtual future<> close() noexcept override {
_resources.reset();
return make_ready_future<>();
}
};
struct reader_visitor {
reader& r;
future<> operator()(std::monostate& ms) { return r.tick(ms); }
future<> operator()(flat_mutation_reader& reader) { return r.tick(reader); }
future<> operator()(reader_concurrency_semaphore::inactive_read_handle& handle) { return r.tick(handle); }
};
private:
schema_ptr _schema;
reader_permit _permit;
bool _memory_only = true;
bool _evictable = false;
std::optional<reader_permit::resource_units> _units;
std::variant<std::monostate, flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
private:
future<> make_reader() {
auto res = _permit.consume_memory();
if (!_memory_only) {
res = co_await _permit.wait_admission(1024, db::no_timeout);
}
_reader = make_flat_mutation_reader<skeleton_reader>(_schema, _permit, std::move(res));
}
future<> tick(std::monostate&) {
co_await make_reader();
co_await tick(std::get<flat_mutation_reader>(_reader));
}
future<> tick(flat_mutation_reader& reader) {
co_await reader.fill_buffer(db::no_timeout);
if (_evictable) {
_reader = _permit.semaphore().register_inactive_read(std::move(reader));
}
}
future<> tick(reader_concurrency_semaphore::inactive_read_handle& handle) {
if (auto reader = _permit.semaphore().unregister_inactive_read(std::move(handle)); reader) {
_reader = std::move(*reader);
} else {
co_await make_reader();
}
co_await tick(std::get<flat_mutation_reader>(_reader));
}
public:
reader(schema_ptr s, reader_permit permit, bool memory_only, bool evictable)
: _schema(std::move(s))
, _permit(std::move(permit))
, _memory_only(memory_only)
, _evictable(evictable)
, _units(_permit.consume_memory(tests::random::get_int(128, 1024)))
{
}
future<> tick() {
return std::visit(reader_visitor{*this}, _reader);
}
future<> close() noexcept {
if (auto reader = std::get_if<flat_mutation_reader>(&_reader)) {
return reader->close();
}
return make_ready_future<>();
}
};
const auto count = 10;
const auto num_readers = 512;
const auto ticks = 1000;
simple_schema s;
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
auto stop_sem = deferred_stop(semaphore);
std::list<std::optional<reader>> readers;
unsigned nr_memory_only = 0;
unsigned nr_admitted = 0;
unsigned nr_evictable = 0;
for (auto i = 0; i < num_readers; ++i) {
const auto memory_only = tests::random::get_bool();
const auto evictable = !memory_only && tests::random::get_bool();
if (memory_only) {
++nr_memory_only;
} else if (evictable) {
++nr_evictable;
} else {
++nr_admitted;
}
readers.emplace_back(reader(s.schema(), semaphore.make_permit(s.schema().get(), fmt::format("reader{}", i)), memory_only, evictable));
}
testlog.info("Created {} readers, memory_only={}, admitted={}, evictable={}", readers.size(), nr_memory_only, nr_admitted, nr_evictable);
bool watchdog_touched = false;
auto watchdog = timer<db::timeout_clock>([&semaphore, &watchdog_touched] {
if (!watchdog_touched) {
testlog.error("Watchdog detected a deadlock, dumping diagnostics before killing the test: {}", semaphore.dump_diagnostics());
semaphore.broken(std::make_exception_ptr(std::runtime_error("test killed by watchdog")));
}
watchdog_touched = false;
});
watchdog.arm_periodic(std::chrono::seconds(30));
parallel_for_each(readers, [&] (std::optional<reader>& r) -> future<> {
for (auto i = 0; i < ticks; ++i) {
watchdog_touched = true;
co_await r->tick();
}
co_await r->close();
r.reset();
watchdog_touched = true;
}).get();
}
static
sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector<mutation> mutations) {
static thread_local auto tmp = tmpdir();

View File

@@ -714,7 +714,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
nullptr,
db::no_timeout).get();
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 1);
// The second read might be evicted too if it consumes more
// memory than the first and hence triggers memory control when
// saved in the querier cache.
BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1);
// We want to read the entire partition so that the querier
// is not saved at the end and thus ensure it is destroyed.