Compare commits

..

2 Commits

Author SHA1 Message Date
Yaniv Kaul
034aaeb7fa Potential fix for code scanning alert no. 146: Workflow does not contain permissions
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2025-12-22 11:54:42 +02:00
Karol Nowacki
addac8b3f7 vector_search: test: Fix flaky DNS resolution test
The `vector_store_client_test_dns_resolving_repeated` test had race
conditions causing it to be flaky. Two main issues were identified:

1. Race between initial refresh and manual trigger: The test assumes
    a specific resolution sequence, but timing variations between the
    initial DNS refresh (on client creation) and the first manual
    trigger (in the test loop) can cause unexpected delayed scheduling.

2. Extra triggers from resolve_hostname fiber: During the client
    refresh phase, the background DNS fiber clears the client list.
    If resolve_hostname executes in the window after clearing but
    before the update completes, pending triggers are processed,
    incrementing the resolution count unexpectedly. At count 6, the
    mock resolver returns a valid address (count % 3 == 0), causing
    the test to fail.

The fix relaxes test assertions to verify retry behavior and client
clearing on DNS address loss, rather than enforcing exact resolution
counts.

Fixes: #27074

Closes scylladb/scylladb#27685
2025-12-21 20:02:16 +02:00
3 changed files with 58 additions and 52 deletions

View File

@@ -10,6 +10,8 @@ on:
jobs:
read-toolchain:
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
image: ${{ steps.read.outputs.image }}
steps:

View File

@@ -198,47 +198,62 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
auto cfg = config();
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
auto vs = vector_store_client{cfg};
auto count = 0;
bool fail_dns_resolution = true;
auto as = abort_source_timeout();
auto address = inet_address("127.0.0.1");
configure(vs)
.with_dns_refresh_interval(milliseconds(10))
.with_wait_for_client_timeout(milliseconds(20))
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
BOOST_CHECK_EQUAL(host, "good.authority.here");
count++;
if (count % 3 != 0) {
if (fail_dns_resolution) {
co_return std::nullopt;
}
co_return inet_address(format("127.0.0.{}", count));
co_return address;
});
vs.start_background_tasks();
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
BOOST_CHECK_EQUAL(count, 3);
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
vector_store_client_tester::trigger_dns_resolver(vs);
// Wait for the DNS resolution to fail
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
BOOST_CHECK_EQUAL(count, 6);
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
fail_dns_resolution = true;
// Trigger DNS resolver to check for address changes
// Resolver will not re-check automatically after successful resolution
vector_store_client_tester::trigger_dns_resolver(vs);
// Wait for the DNS resolution to fail again
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
// Resolve to a different address
address = inet_address("127.0.0.2");
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
co_await vs.stop();
}

View File

@@ -15,8 +15,8 @@
#include <vector>
// This class supports atomic inserts, removes, and iteration.
// All operations are synchronized using a read-write lock.
// This class supports atomic removes (by using a lock and returning a
// future) and non atomic insert and iteration (by using indexes).
template <typename T>
class atomic_vector {
std::vector<T> _vec;
@@ -24,10 +24,6 @@ class atomic_vector {
public:
void add(const T& value) {
auto lock = _vec_lock.for_write().lock().get();
auto unlock = seastar::defer([this] {
_vec_lock.for_write().unlock();
});
_vec.push_back(value);
}
seastar::future<> remove(const T& value) {
@@ -48,14 +44,11 @@ public:
auto unlock = seastar::defer([this] {
_vec_lock.for_read().unlock();
});
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
}
}
@@ -66,17 +59,16 @@ public:
void thread_for_each_nested(seastar::noncopyable_function<void(T)> func) const {
// When called in the context of thread_for_each, the read lock is
// already held, so we don't need to acquire it again. Acquiring it
// again could lead to a deadlock. This function must only be called
// while holding the read lock on _vec_lock.
// again could lead to a deadlock.
if (!_vec_lock.locked()) {
utils::on_internal_error("thread_for_each_nested called without holding the vector lock");
}
// Take a snapshot of the current contents while the read lock is held,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
}
}
@@ -87,14 +79,11 @@ public:
// preemption.
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
auto holder = co_await _vec_lock.hold_read_lock();
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
co_await func(snapshot[i]);
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
co_await func(_vec[i]);
}
}
};