Compare commits
2 Commits
copilot/up
...
alert-auto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
034aaeb7fa | ||
|
|
addac8b3f7 |
2
.github/workflows/read-toolchain.yaml
vendored
2
.github/workflows/read-toolchain.yaml
vendored
@@ -10,6 +10,8 @@ on:
|
||||
jobs:
|
||||
read-toolchain:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
image: ${{ steps.read.outputs.image }}
|
||||
steps:
|
||||
|
||||
@@ -198,47 +198,62 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
|
||||
auto cfg = config();
|
||||
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
|
||||
auto vs = vector_store_client{cfg};
|
||||
auto count = 0;
|
||||
bool fail_dns_resolution = true;
|
||||
auto as = abort_source_timeout();
|
||||
auto address = inet_address("127.0.0.1");
|
||||
configure(vs)
|
||||
.with_dns_refresh_interval(milliseconds(10))
|
||||
.with_wait_for_client_timeout(milliseconds(20))
|
||||
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
|
||||
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
|
||||
BOOST_CHECK_EQUAL(host, "good.authority.here");
|
||||
count++;
|
||||
if (count % 3 != 0) {
|
||||
if (fail_dns_resolution) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
co_return inet_address(format("127.0.0.{}", count));
|
||||
co_return address;
|
||||
});
|
||||
|
||||
vs.start_background_tasks();
|
||||
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
BOOST_CHECK_EQUAL(count, 3);
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
|
||||
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
BOOST_CHECK_EQUAL(count, 6);
|
||||
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
|
||||
|
||||
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
|
||||
fail_dns_resolution = true;
|
||||
// Trigger DNS resolver to check for address changes
|
||||
// Resolver will not re-check automatically after successful resolution
|
||||
vector_store_client_tester::trigger_dns_resolver(vs);
|
||||
|
||||
// Wait for the DNS resolution to fail again
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.empty();
|
||||
}));
|
||||
|
||||
// Resolve to a different address
|
||||
address = inet_address("127.0.0.2");
|
||||
fail_dns_resolution = false;
|
||||
|
||||
// Wait for the DNS resolution to succeed
|
||||
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
|
||||
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
co_return addrs.size() == 1;
|
||||
}));
|
||||
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
|
||||
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
|
||||
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
|
||||
|
||||
co_await vs.stop();
|
||||
}
|
||||
|
||||
@@ -15,8 +15,8 @@
|
||||
|
||||
#include <vector>
|
||||
|
||||
// This class supports atomic inserts, removes, and iteration.
|
||||
// All operations are synchronized using a read-write lock.
|
||||
// This class supports atomic removes (by using a lock and returning a
|
||||
// future) and non atomic insert and iteration (by using indexes).
|
||||
template <typename T>
|
||||
class atomic_vector {
|
||||
std::vector<T> _vec;
|
||||
@@ -24,10 +24,6 @@ class atomic_vector {
|
||||
|
||||
public:
|
||||
void add(const T& value) {
|
||||
auto lock = _vec_lock.for_write().lock().get();
|
||||
auto unlock = seastar::defer([this] {
|
||||
_vec_lock.for_write().unlock();
|
||||
});
|
||||
_vec.push_back(value);
|
||||
}
|
||||
seastar::future<> remove(const T& value) {
|
||||
@@ -48,14 +44,11 @@ public:
|
||||
auto unlock = seastar::defer([this] {
|
||||
_vec_lock.for_read().unlock();
|
||||
});
|
||||
// Take a snapshot of the current contents while holding the read lock,
|
||||
// so that concurrent add() calls and possible reallocations won't
|
||||
// affect our iteration.
|
||||
auto snapshot = _vec;
|
||||
// We grab locks in both add() and remove(), so we iterate using
|
||||
// indexes on the snapshot to avoid concurrent modifications.
|
||||
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
|
||||
func(snapshot[i]);
|
||||
// We grab a lock in remove(), but not in add(), so we
|
||||
// iterate using indexes to guard against the vector being
|
||||
// reallocated.
|
||||
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
|
||||
func(_vec[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,17 +59,16 @@ public:
|
||||
void thread_for_each_nested(seastar::noncopyable_function<void(T)> func) const {
|
||||
// When called in the context of thread_for_each, the read lock is
|
||||
// already held, so we don't need to acquire it again. Acquiring it
|
||||
// again could lead to a deadlock. This function must only be called
|
||||
// while holding the read lock on _vec_lock.
|
||||
// again could lead to a deadlock.
|
||||
if (!_vec_lock.locked()) {
|
||||
utils::on_internal_error("thread_for_each_nested called without holding the vector lock");
|
||||
}
|
||||
|
||||
// Take a snapshot of the current contents while the read lock is held,
|
||||
// so that concurrent add() calls and possible reallocations won't
|
||||
// affect our iteration.
|
||||
auto snapshot = _vec;
|
||||
// We grab locks in both add() and remove(), so we iterate using
|
||||
// indexes on the snapshot to avoid concurrent modifications.
|
||||
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
|
||||
func(snapshot[i]);
|
||||
// We grab a lock in remove(), but not in add(), so we
|
||||
// iterate using indexes to guard against the vector being
|
||||
// reallocated.
|
||||
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
|
||||
func(_vec[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,14 +79,11 @@ public:
|
||||
// preemption.
|
||||
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
|
||||
auto holder = co_await _vec_lock.hold_read_lock();
|
||||
// Take a snapshot of the current contents while holding the read lock,
|
||||
// so that concurrent add() calls and possible reallocations won't
|
||||
// affect our iteration.
|
||||
auto snapshot = _vec;
|
||||
// We grab locks in both add() and remove(), so we iterate using
|
||||
// indexes on the snapshot to avoid concurrent modifications.
|
||||
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
|
||||
co_await func(snapshot[i]);
|
||||
// We grab a lock in remove(), but not in add(), so we
|
||||
// iterate using indexes to guard against the vector being
|
||||
// reallocated.
|
||||
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
|
||||
co_await func(_vec[i]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user