/* * Copyright 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include #include #include #include "seastarx.hh" #include "test/lib/scylla_test_case.hh" #include #include "utils/flush_queue.hh" SEASTAR_TEST_CASE(test_queue_ordering_random_ops) { struct env { env(size_t n) : promises(n) {} utils::flush_queue queue; std::vector> promises; std::vector result; }; auto r = std::views::iota(0, 100); return do_for_each(r, [](int) { constexpr size_t num_ops = 1000; auto e = make_lw_shared(num_ops); int i = 0; for (auto& p : e->promises) { // FIXME: discarded future. (void)e->queue.run_with_ordered_post_op(i, [&p, i] { return p.get_future().then([i] { return make_ready_future(i); }); }, [e](int i) { e->result.emplace_back(i); }); ++i; } auto res = e->queue.wait_for_pending(); std::uniform_int_distribution dist(0, num_ops - 1); std::bitset set; while (!set.all()) { auto& e1 = seastar::testing::local_random_engine; size_t i = dist(e1); if (!set.test(i)) { set[i] = true; e->promises[i].set_value(); } } return res.then([e] { BOOST_CHECK_EQUAL(e->result.size(), e->promises.size()); BOOST_REQUIRE(std::is_sorted(e->result.begin(), e->result.end())); }).finally([e] { return e->queue.close().finally([e] { }); }); }); } SEASTAR_TEST_CASE(test_queue_ordering_multi_ops) { struct env { env() : sem(0) {} utils::flush_queue queue; std::vector result; semaphore sem; size_t n = 0; }; auto r = std::views::iota(0, 100); return do_for_each(r, [](int) { constexpr size_t num_ops = 1000; auto e = make_lw_shared(); std::uniform_int_distribution dist(0, num_ops - 1); for (size_t k = 0; k < num_ops*10; ++k) { auto& e1 = seastar::testing::local_random_engine; int i = dist(e1); if (e->queue.has_operation(i) || (!e->queue.empty() && e->queue.highest_key() < i)) { // FIXME: discarded future. (void)e->queue.run_with_ordered_post_op(i, [e, i] { return e->sem.wait().then([i] { return make_ready_future(i); }); }, [e](int i) { e->result.emplace_back(i); }); ++e->n; } } auto res = e->queue.wait_for_pending(); e->sem.signal(e->n); return res.then([e] { BOOST_CHECK_EQUAL(e->result.size(), e->n); BOOST_REQUIRE(std::is_sorted(e->result.begin(), e->result.end())); }).finally([e] { return e->queue.close().finally([e] { }); }); }); } static future<> test_propagation(bool propagate, noncopyable_function ()> func, noncopyable_function ()> post, noncopyable_function thn, bool want_except_in_run, bool want_except_in_wait) { auto queue = ::make_shared>(propagate); auto sem = ::make_shared(0); auto xr = ::make_shared(false); auto xw = ::make_shared(false); auto f1 = queue->run_with_ordered_post_op(0, [sem, func = std::move(func)]() mutable { return sem->wait().then(std::move(func)); }, std::move(post)).handle_exception([xr](auto p) { *xr = true; }).discard_result(); auto f2 = queue->wait_for_pending(0).then(std::move(thn)).handle_exception([xw](auto p) { *xw = true; }).discard_result(); sem->signal(); return seastar::when_all_succeed(std::move(f1), std::move(f2)).finally([sem, queue, want_except_in_run, want_except_in_wait, xr, xw] { BOOST_CHECK_EQUAL(want_except_in_run, *xr); BOOST_CHECK_EQUAL(want_except_in_wait, *xw); }).discard_result().finally([queue] { return queue->close().finally([queue] { }); }); } SEASTAR_TEST_CASE(test_propagate_exception_in_op) { return test_propagation(true, // propagate exception to waiter [] { return make_exception_future(std::runtime_error("hej")); }, // ex in op [] { BOOST_FAIL("should not reach (1)"); return make_ready_future(); }, // should not reach post [] { BOOST_FAIL("should not reach (2)"); }, // should not reach waiter "then" true, true ); } SEASTAR_TEST_CASE(test_propagate_exception_in_post) { return test_propagation(true, // propagate exception to waiter [] { return make_ready_future(); }, // ok func [] { return make_exception_future(std::runtime_error("hej")); }, // ex in post [] { BOOST_FAIL("should not reach"); }, // should not reach waiter "then" true, true ); } SEASTAR_TEST_CASE(test_no_propagate_exception_in_op) { return test_propagation(false, // do not propagate exception to waiter [] { return make_exception_future(std::runtime_error("hej")); }, // ex in op [] { BOOST_FAIL("should not reach"); return make_ready_future(); }, // should not reach post [] {}, // should reach waiter "then" true, false ); } SEASTAR_TEST_CASE(test_no_propagate_exception_in_post) { return test_propagation(false, // do not propagate exception to waiter [] { return make_ready_future(); }, // ok func [] { return make_exception_future(std::runtime_error("hej")); }, // ex in post [] {}, // should reach waiter "then" true, false ); }