/* * Copyright (C) 2021-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include "service/address_map.hh" #include "gms/inet_address.hh" #include "utils/UUID.hh" #include #include #include #include #include #include using namespace service; using namespace std::chrono_literals; using namespace seastar::testing; using server_id = locator::host_id; SEASTAR_THREAD_TEST_CASE(test_address_map_operations) { server_id id1{utils::UUID(0, 1)}; server_id id2{utils::UUID(0, 2)}; gms::inet_address addr1("127.0.0.1"); gms::inet_address addr2("127.0.0.2"); // Expiring entries stay in the cache for 1 hour, so take a bit larger value // in order to trigger cleanup auto expiration_time = 1h + 1s; using seastar::manual_clock; { sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); m.set_nonexpiring(id1); // Check that regular entries don't expire manual_clock::advance(expiration_time); { const auto found = m.find(id1); BOOST_CHECK(!!found && *found == addr1); } m.set_expiring(id1); manual_clock::advance(expiration_time); BOOST_CHECK(!m.find(id1)); } { // m.add_or_update_entry() adds an expiring entry sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); auto found = m.find(id1); BOOST_CHECK(!!found && *found == addr1); // The entry stay in the cache for 1 hour manual_clock::advance(expiration_time); BOOST_CHECK(!m.find(id1)); } { // Set two expirable entries with different timestamps, check for // automatic rearming of expiration timer after the first one expires. sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); manual_clock::advance(30min); m.add_or_update_entry(id2, addr2); // Here the expiration timer will rearm itself automatically since id2 // hasn't expired yet and need to be collected some time later manual_clock::advance(30min + 1s); BOOST_CHECK(!m.find(id1)); BOOST_CHECK(!!m.find(id2) && *m.find(id2) == addr2); // wait for another cleanup period manual_clock::advance(expiration_time); BOOST_CHECK(!m.find(id2)); } { // Do not throw on re-mapping address for the same id // - happens when IP address changes after a node restart sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); m.set_nonexpiring(id1); BOOST_CHECK_NO_THROW(m.add_or_update_entry(id1, addr2)); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr2); } { // Check that add_or_update_entry() doesn't transition the entry type // to expiring. sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); m.set_nonexpiring(id1); m.add_or_update_entry(id1, addr2); manual_clock::advance(expiration_time); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr2); } { // Check that set_expiring() doesn't insert a new entry sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.set_expiring(id1); BOOST_CHECK(!m.find(id1)); } { // Check that set_nonexpiring() inserts a new non-expiring entry if the // entry is missing sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.set_nonexpiring(id1); manual_clock::advance(expiration_time); // find() returns std::nullopt when an entry doesn't contain an address // or it is not present in the address map, so we have to update the // inserted entry's address and advance the clock by expiration_time // before checking if the entry is in the address map. If set_nonexpiring() // didn't add the entry, add_or_update_entry() would add a new expiring entry. m.add_or_update_entry(id1, addr1); manual_clock::advance(expiration_time); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); } { // Check that add_or_update_entry() throws when called without an actual IP address sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.add_or_update_entry(id1, gms::inet_address{}), std::runtime_error); } { // Check that opt_add_entry() throws when called without an actual IP address sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.opt_add_entry(id1, gms::inet_address{}), std::runtime_error); } { // Check that add_or_update_entry() doesn't overwrite a new IP address // with an obsolete one sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1, gms::generation_type(1)); m.add_or_update_entry(id1, addr2, gms::generation_type(0)); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); } { // Check that add_or_update_entry() adds an IP address if the entry // doesn't have it regardless of the generation number sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.set_nonexpiring(id1); m.add_or_update_entry(id1, addr1, gms::generation_type(-1)); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); } { // Check the basic functionality of find_by_addr() sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); BOOST_CHECK(!m.find_by_addr(addr1)); m.set_nonexpiring(id1); BOOST_CHECK(!m.find_by_addr(addr1)); m.add_or_update_entry(id1, addr1); BOOST_CHECK(m.find_by_addr(addr1) && *m.find_by_addr(addr1) == id1); } { // Check that find_by_addr() properly updates timestamps of entries sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); manual_clock::advance(30min); BOOST_CHECK(m.find_by_addr(addr1) && *m.find_by_addr(addr1) == id1); manual_clock::advance(30min + 1s); BOOST_CHECK(m.find_by_addr(addr1) && *m.find_by_addr(addr1) == id1); manual_clock::advance(expiration_time); BOOST_CHECK(!m.find_by_addr(addr1)); } { // Check that find_by_addr() throws when called without an actual IP address sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.find_by_addr(gms::inet_address{}), std::runtime_error); } { // Check that an update with smaller generation will not overwrite update with larger one // but other way around works sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1, gms::generation_type{2}); m.add_or_update_entry(id1, addr2, gms::generation_type{1}); BOOST_CHECK(m.find(id1).value() == addr1); m.add_or_update_entry(id1, addr2, gms::generation_type{3}); BOOST_CHECK(m.find(id1).value() == addr2); } } SEASTAR_THREAD_TEST_CASE(test_address_map_replication) { if (smp::count < 2) { std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl; return; } static const server_id id1{utils::UUID(0, 1)}; static const server_id id2{utils::UUID(0, 2)}; static const gms::inet_address addr1("127.0.0.1"); static const gms::inet_address addr2("127.0.0.2"); // Expiring entries stay in the cache for 1 hour, so take a bit larger value // in order to trigger cleanup static const auto expiration_time = 1h + 1s; using seastar::manual_clock; { sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); // Add entry on both shards, replicate non-expiration // flag, ensure it doesn't expire on the other shard m.add_or_update_entry(id1, addr1); m.set_nonexpiring(id1); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); manual_clock::advance(expiration_time); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); }).get(); // Set it to expiring, ensure it expires on both shards m.set_expiring(id1); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); manual_clock::advance(expiration_time); BOOST_CHECK(!m.find(id1)); return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired. }).get(); BOOST_CHECK(!m.find(id1)); // Expiring entries are replicated m.add_or_update_entry(id1, addr1); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1)); }).get(); // Can't call add_or_update_entry on shard other than 0 m_svc.invoke_on(1, [] (address_map_t& m) { scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.add_or_update_entry(id1, addr2), std::runtime_error); }).get(); // Can add expiring entries on shard other than 0 - and they indeed expire m_svc.invoke_on(1, [] (address_map_t& m) { m.opt_add_entry(id2, addr2); BOOST_CHECK(m.find(id2) && *m.find(id2) == addr2); manual_clock::advance(expiration_time); BOOST_CHECK(!m.find(id2)); }).get(); // Add entry on two shards, make it non-expiring on shard 0, // the non-expiration must be replicated m_svc.invoke_on(1, [] (address_map_t& m) { m.opt_add_entry(id2, addr2); }).get(); m.set_nonexpiring(id2); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { manual_clock::advance(expiration_time); BOOST_CHECK(m.find(id2) && *m.find(id2) == addr2); }).get(); // Cannot set it to expiring on shard 1 m_svc.invoke_on(1, [] (address_map_t& m) { scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.set_expiring(id2), std::runtime_error); }).get(); // Cannot set it to non-expiring on shard 1 m.set_expiring(id2); m_svc.invoke_on(1, [] (address_map_t& m) { scoped_no_abort_on_internal_error abort_guard; BOOST_CHECK_THROW(m.set_nonexpiring(id2), std::runtime_error); }).get(); } } SEASTAR_THREAD_TEST_CASE(test_address_map_replication_efficiency) { if (smp::count < 2) { std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl; return; } static const server_id id1{utils::UUID(0, 1)}; static const server_id id2{utils::UUID(0, 2)}; static const gms::inet_address addr1("127.0.0.1"); static const gms::inet_address addr2("127.0.0.2"); sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); std::atomic stopped; auto cpu_hogger = smp::submit_to(1, [&] -> future<> { while (!stopped.load()) { co_await coroutine::maybe_yield(); } }); for (int i = 0; i < 1000; ++i) { m.add_or_update_entry(id1, addr1); } m.add_or_update_entry(id2, addr2); // One round trip should be enough for the replicator to execute the updates, // but there could be reordering in the smp queue, and in the task queue on the destination. // So three times to be safe. for (auto i = 0; i < 3; ++i) { smp::submit_to(1, [] {}).get(); } m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); BOOST_CHECK(m.find(id2) && *m.find(id2) == addr2); }).get(); stopped.store(true); cpu_hogger.get(); } SEASTAR_THREAD_TEST_CASE(test_address_map_expiry_change_keeps_addr_mapping) { if (smp::count < 2) { std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl; return; } static const server_id id1{utils::UUID(0, 1)}; static const gms::inet_address addr1("127.0.0.1"); static const auto expiration_time = 1h + 1s; sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); m.add_or_update_entry(id1, addr1); m.set_nonexpiring(id1); manual_clock::advance(expiration_time); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); }).get(); } SEASTAR_THREAD_TEST_CASE(test_address_map_exception_safety) { static const server_id id1{utils::UUID(0, 1)}; static const gms::inet_address addr1("127.0.0.1"); sharded> m_svc; m_svc.start().get(); auto stop_map = defer([&m_svc] { m_svc.stop().get(); }); auto& m = m_svc.local(); seastar::memory::with_allocation_failures([&] { try { m.add_or_update_entry(id1, addr1); } catch (...) { BOOST_CHECK(!m.find(id1)); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(!m.find(id1)); }).get(); throw; } }); BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); m.barrier().get(); m_svc.invoke_on(1, [] (address_map_t& m) { BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1); }).get(); }