diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 6274ae14bf..0f78ee8320 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -502,6 +502,9 @@ public: void flush_segments(uint64_t size_to_remove); void check_no_data_older_than_allowed(); + // whitebox testing + std::function()> _oversized_pre_wait_memory_func; + private: class shutdown_marker{}; @@ -1597,8 +1600,15 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ scope_increment_counter allocating(totals.active_allocations); + // #27992 - whitebox testing. signal we are trying to lock out + // all allocators + if (_oversized_pre_wait_memory_func) { + co_await _oversized_pre_wait_memory_func(); + } + auto permit = co_await std::move(fut); - SCYLLA_ASSERT(_request_controller.available_units() == 0); + // #27992 - task reordering _can_ force the available units to negative. this is ok. + SCYLLA_ASSERT(_request_controller.available_units() <= 0); decltype(permit) fake_permit; // can't have allocate+sync release semaphore. bool failed = false; @@ -1859,13 +1869,15 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ } } } - SCYLLA_ASSERT(_request_controller.available_units() == 0); + + auto avail = _request_controller.available_units(); + SCYLLA_ASSERT(avail <= 0); SCYLLA_ASSERT(permit.count() == max_request_controller_units()); auto nw = _request_controller.waiters(); permit.return_all(); // #20633 cannot guarantee controller avail is now full, since we could have had waiters when doing // return all -> now will be less avail - SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == ssize_t(max_request_controller_units())); + SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == (avail + ssize_t(max_request_controller_units()))); if (!failed) { clogger.trace("Oversized allocation succeeded."); @@ -3949,6 +3961,9 @@ void db::commitlog::update_max_data_lifetime(std::optional commitlog_d _segment_manager->cfg.commitlog_data_max_lifetime_in_seconds = commitlog_data_max_lifetime_in_seconds; } +void db::commitlog::set_oversized_pre_wait_memory_func(std::function()> f) { + _segment_manager->_oversized_pre_wait_memory_func = std::move(f); +} future> db::commitlog::get_segments_to_replay() const { return _segment_manager->get_segments_to_replay(); diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 19a4369475..6f185854a1 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -385,6 +385,9 @@ public: // (Re-)set data mix lifetime. void update_max_data_lifetime(std::optional commitlog_data_max_lifetime_in_seconds); + // Whitebox testing. Do not use for production + void set_oversized_pre_wait_memory_func(std::function()>); + using commit_load_reader_func = std::function(buffer_and_replay_position)>; class segment_error : public std::exception {}; diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 529368359f..1d9d4c14fa 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -46,6 +46,7 @@ #include "test/lib/mutation_source_test.hh" #include "test/lib/key_utils.hh" #include "test/lib/test_utils.hh" +#include "utils/checked-file-impl.hh" BOOST_AUTO_TEST_SUITE(commitlog_test) @@ -1931,7 +1932,7 @@ SEASTAR_TEST_CASE(test_oversized_entry_large) { co_await do_test_oversized_entry(32*3); // bigger segments } -static future<> test_oversized(size_t n_entries, size_t max_size_mb) { +static future<> test_oversized(size_t n_entries, size_t max_size_mb, std::function(commitlog&)> pre_test = {}, db::extensions* exts = nullptr) { commitlog::config cfg; cfg.commitlog_segment_size_in_mb = max_size_mb; @@ -1939,6 +1940,7 @@ static future<> test_oversized(size_t n_entries, size_t max_size_mb) { cfg.allow_going_over_size_limit = false; cfg.allow_fragmented_entries = true; cfg.use_o_dsync = false; + cfg.extensions = exts; // not using cl_test, because we need to be able to abandon // the log. @@ -1955,6 +1957,10 @@ static future<> test_oversized(size_t n_entries, size_t max_size_mb) { auto log = co_await commitlog::create_commitlog(cfg); auto size = log.max_record_size() * 2 + dist(gen) * 1024 + dist(gen) * 64; + if (pre_test) { + co_await pre_test(log); + } + // TODO: we can't create multi-entries using current API. for (size_t i = 0; i < n_entries; ++i) { auto buf = fragmented_temporary_buffer::allocate_to_fit(size); @@ -1993,6 +1999,10 @@ static future<> test_oversized(size_t n_entries, size_t max_size_mb) { auto&& buf_in = buf_rp.buffer; auto&& rp_in = buf_rp.position; + if (!rp2buf.count(rp_in)) { + co_return; + } + auto& buf = rp2buf.at(rp_in); BOOST_CHECK_EQUAL(buf.size_bytes(), buf_in.size_bytes()); fragmented_temporary_buffer::view v1(buf); @@ -2024,14 +2034,114 @@ SEASTAR_TEST_CASE(test_oversized_several_small) { co_await test_oversized(8, 1); } +SEASTAR_TEST_CASE(test_oversized_many_small) { + co_await test_oversized(32, 1); +} + SEASTAR_TEST_CASE(test_oversized_several_medium) { co_await test_oversized(8, 8); } +SEASTAR_TEST_CASE(test_oversized_many_medium) { + co_await test_oversized(32, 8); +} + SEASTAR_TEST_CASE(test_oversized_several_large) { co_await test_oversized(8, 32); } +// Test for #27992. +// Does whiteboxing to provoke/fake the race condition when +// the semaphore wait in oversized_alloc has in fact acquired +// all units, bringing the sem count to zero, but task reordering +// causes a segment::terminate() call to allocate a new buffer +// before we do the sanity asserts -> crash +SEASTAR_TEST_CASE(test_oversized_with_terminate_in_buffer_wait) { + auto exts = std::make_unique(); + static auto nada = [](std::exception_ptr) {}; + + // use a file wrapper to enable us to block file writing + // and simulate task reordering like above + class my_file_impl : public checked_file_impl { + public: + promise<> **_promise, **_signal; + my_file_impl(file f, promise<> **p, promise<> **s) + : checked_file_impl(nada, std::move(f)) + , _promise(p) + , _signal(s) + {} + future write_dma(uint64_t pos, const void* buffer, size_t len, io_intent* intent) override { + if (auto p = std::exchange(*_promise, nullptr)) { + if (auto s = std::exchange(*_signal, nullptr)) { + s->set_value(); + } + co_await p->get_future(); + } + co_return co_await checked_file_impl::write_dma(pos, buffer, len, intent); + } + }; + // use extensions to wrap all cl files for this + class my_cl_ext : public commitlog_file_extension { + public: + promise<> **_promise, **_signal; + my_cl_ext(promise<> **p, promise<> **s) + : _promise(p) + , _signal(s) + {} + seastar::future wrap_file(const seastar::sstring&, seastar::file f, seastar::open_flags) override { + co_return make_shared(std::move(f), _promise, _signal); + } + seastar::future<> before_delete(const seastar::sstring& filename) override { + return make_ready_future<>(); + } + }; + // our signals + promise<> *waiter = nullptr; + promise<> *signal = nullptr; + exts->add_commitlog_file_extension("delay_files", std::make_unique(&waiter, &signal)); + + co_await test_oversized(1, 32, [&](commitlog& log) -> future<> { + // use whitebox callback interface. No other way to do this + // in any reliable way + log.set_oversized_pre_wait_memory_func([&] -> future<> { + //co_await log.sync_all_segments(); + // set up signals + promise<> p, s; + waiter = &p; + signal = &s; + auto f = s.get_future(); + // this will call segment::terminate(), which + // will find active segment has data but is + // not max size, so will try to write + // a terminating buffer to the file. + // This causes a buffer allocation, which, + // since current count now is zero (no waiters) + // will force _request_controller to negative + (void)log.force_new_active_segment(); + // wait for IO block + co_await std::move(f); + // release IO block. It will not run until next + // co_await. + p.set_value(); + waiter = nullptr; + signal = nullptr; + }); + + // ensure we have a small entry in the first segment + // so that the sync_all_segments above will actually do + // something. + static const char buster[] = "womprats"; + auto h = co_await log.add_mutation(make_table_id(), sizeof(buster), db::commitlog::force_sync::no, [&](db::commitlog::output& dst) { + dst.write(buster, sizeof(buster)); + }); + h.release(); + + // write all data to disk so only terminate + // will actually do IO + co_await log.sync_all_segments(); + }, exts.get()); +} + // tests #20862 - buffer usage counter not being updated correctly SEASTAR_TEST_CASE(test_commitlog_buffer_size_counter) { commitlog::config cfg;