/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include "service/direct_failure_detector/failure_detector.hh" #include "test/raft/helpers.hh" future<> ping_shards() { if (smp::count == 1) { return seastar::yield(); } return parallel_for_each(std::views::iota(0u, smp::count), [] (shard_id s) { return smp::submit_to(s, [](){}); }); } struct test_pinger: public direct_failure_detector::pinger { std::unordered_set _responding; std::unordered_map _pings; bool _block = false; virtual future ping(endpoint_id ep, direct_failure_detector::clock::timepoint_t timeout, abort_source& as, direct_failure_detector::clock& c) override { bool ret = false; co_await invoke_abortable_on(0, [this, ep, &ret] (abort_source& as) -> future<> { ++_pings[ep]; if (!_block) { ret = _responding.contains(ep); co_return; } // Simulate a blocking ping that only returns when aborted. co_await sleep_abortable(std::chrono::hours(1), as); }, as); co_return ret; } }; struct test_clock : public direct_failure_detector::clock { std::atomic _ticks{0}; condition_variable _cond; future<> tick() { ++_ticks; _cond.broadcast(); return ping_shards(); } virtual timepoint_t now() noexcept override { return _ticks; } virtual future<> sleep_until(timepoint_t tp, abort_source& as) override { try { co_await invoke_abortable_on(0, [this, tp] (abort_source& as) -> future<> { bool aborted = false; auto sub = as.subscribe([&] () noexcept { aborted = true; _cond.broadcast(); }); if (!sub) { throw sleep_aborted{}; } co_await _cond.wait([&] { return aborted || _ticks >= tp; }); if (_ticks < tp) { throw sleep_aborted{}; } }, as); } catch (abort_requested_exception&) { throw sleep_aborted{}; } } virtual std::chrono::milliseconds to_milliseconds(timepoint_t tp) const override { throw std::logic_error("to_milliseconds is not implemented"); } }; struct test_listener : public direct_failure_detector::listener { using endpoint_id = direct_failure_detector::pinger::endpoint_id; std::unordered_set _live; condition_variable _cond; std::unordered_map _notifications; virtual future<> mark_alive(endpoint_id ep) override { _notifications[ep]++; _live.insert(ep); _cond.signal(); co_return; } virtual future<> mark_dead(endpoint_id ep) override { _notifications[ep]++; _live.erase(ep); _cond.signal(); co_return; } bool is_alive(endpoint_id ep) { return _live.contains(ep); } future<> wait_for(endpoint_id ep, bool alive) { return _cond.wait([this, alive, ep] { return alive == is_alive(ep); }); } }; SEASTAR_TEST_CASE(failure_detector_test) { test_pinger pinger; test_clock clock; sharded fd; co_await fd.start(std::ref(pinger), std::ref(clock), 10, 30, seastar::current_scheduling_group()); test_listener l1, l2; auto sub1 = co_await fd.local().register_listener(l1, 95); auto sub2 = co_await fd.local().register_listener(l2, 45); direct_failure_detector::pinger::endpoint_id ep1{0, 1}; direct_failure_detector::pinger::endpoint_id ep2{0, 2}; BOOST_REQUIRE(!l1.is_alive(ep1)); BOOST_REQUIRE(!l2.is_alive(ep1)); fd.local().add_endpoint(ep1); fd.local().add_endpoint(ep2); auto tick = [&clock] (size_t n) -> future<> { for (size_t i = 0; i < n; ++i) { co_await clock.tick(); } }; auto tick_until_ping = [&] (direct_failure_detector::pinger::endpoint_id ep) -> future<> { auto p = pinger._pings[ep]; while (pinger._pings[ep] == p) { co_await tick(1); } }; co_await tick(200); BOOST_REQUIRE(!l1.is_alive(ep1)); BOOST_REQUIRE(!l2.is_alive(ep1)); pinger._responding.insert(ep1); // 10 ticks (ping_period) must be enough for the fd to mark ep1 alive. co_await tick(10); co_await l1.wait_for(ep1, true); co_await l2.wait_for(ep1, true); BOOST_REQUIRE(l1.is_alive(ep1)); BOOST_REQUIRE(l2.is_alive(ep1)); BOOST_REQUIRE(!l1.is_alive(ep2)); BOOST_REQUIRE(!l2.is_alive(ep2)); pinger._responding.insert(ep2); co_await tick(10); co_await l1.wait_for(ep2, true); co_await l2.wait_for(ep2, true); BOOST_REQUIRE(l1.is_alive(ep1)); BOOST_REQUIRE(l2.is_alive(ep1)); BOOST_REQUIRE(l1.is_alive(ep2)); BOOST_REQUIRE(l2.is_alive(ep2)); pinger._responding.erase(ep1); pinger._responding.erase(ep2); // Wait until the time from last successful ping to ep1 is >= 45 (l2's threshold) co_await tick_until_ping(ep1); co_await tick(45); co_await l2.wait_for(ep1, false); co_await l2.wait_for(ep2, false); BOOST_REQUIRE(l1.is_alive(ep1)); BOOST_REQUIRE(l1.is_alive(ep2)); BOOST_REQUIRE(!l2.is_alive(ep1)); BOOST_REQUIRE(!l2.is_alive(ep2)); pinger._responding.insert(ep2); co_await tick(10); co_await l1.wait_for(ep2, true); co_await l2.wait_for(ep2, true); BOOST_REQUIRE(l1.is_alive(ep1)); BOOST_REQUIRE(!l2.is_alive(ep1)); // >= 55 ticks passed since ep1 stopped responding, wait another 40 for l1's threshold to pass co_await tick(40); co_await l1.wait_for(ep1, false); BOOST_REQUIRE(!l2.is_alive(ep1)); // ep2 is currently marked alive by l2. // Let's periodically make ep2 unresponsive and then responsive, so that ~1 ping is successful // during each period of 45 ticks (= l2's threshold). // If the time between two successful pings does not exceed the threshold, // the fd should not mark the endpoint as dead. { BOOST_REQUIRE(l2.is_alive(ep2)); co_await tick_until_ping(ep2); auto last_successful_ping = clock.now(); auto n = l2._notifications[ep2]; for (int i = 0; i < 100; ++i) { pinger._responding.erase(ep2); co_await tick(30); pinger._responding.insert(ep2); co_await tick_until_ping(ep2); auto diff = clock.now() - last_successful_ping; if (diff < 45) { // The time between two successful pings did not exceed threshold. // There should have been no notifications in between. BOOST_REQUIRE_EQUAL(l2._notifications[ep2], n); } last_successful_ping = clock.now(); n = l2._notifications[ep2]; } } // Destroy the l2 subscription. std::optional sub_opt{std::move(sub2)}; sub_opt.reset(); // Remove a live and responsive endpoint. // l1 should receive a final mark_dead notification. l2 should not, since it's no longer subscribed. { auto n = l2._notifications[ep2]; BOOST_REQUIRE(l1.is_alive(ep2)); BOOST_REQUIRE(l2.is_alive(ep2)); fd.local().remove_endpoint(ep2); co_await l1.wait_for(ep2, false); BOOST_REQUIRE(l2.is_alive(ep2)); BOOST_REQUIRE_EQUAL(l2._notifications[ep2], n); } // Readd a responsive endpoint. // l1 should eventually receive a mark_live notification. BOOST_REQUIRE(!l1.is_alive(ep2)); fd.local().add_endpoint(ep2); co_await l1.wait_for(ep2, true); // Verify that the fd continues working even if a ping blocks until it is aborted. pinger._block = true; co_await tick(95); co_await l1.wait_for(ep2, false); // Destroy the l1 subscription sub_opt.emplace(std::move(sub1)); sub_opt.reset(); co_await fd.stop(); }