Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
1303567fa4 Fix race conditions in atomic_vector by adding synchronization and snapshots
- Add write lock to add() method to prevent concurrent modifications
- Remove insufficient locked() check in thread_for_each_nested()
- Add vector snapshots in all iteration methods to prevent races
- Update class documentation to reflect atomic operations

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-21 16:06:11 +00:00
copilot-swe-agent[bot]
e7d10b5be0 Initial plan 2025-12-21 16:00:55 +00:00
3 changed files with 46 additions and 52 deletions

View File

@@ -6,8 +6,6 @@ on:
jobs:
call-jira-status-in-review:
permissions:
contents: read
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -198,62 +198,47 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
auto cfg = config();
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
auto vs = vector_store_client{cfg};
bool fail_dns_resolution = true;
auto count = 0;
auto as = abort_source_timeout();
auto address = inet_address("127.0.0.1");
configure(vs)
.with_dns_refresh_interval(milliseconds(10))
.with_wait_for_client_timeout(milliseconds(20))
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
BOOST_CHECK_EQUAL(host, "good.authority.here");
if (fail_dns_resolution) {
count++;
if (count % 3 != 0) {
co_return std::nullopt;
}
co_return address;
co_return inet_address(format("127.0.0.{}", count));
});
vs.start_background_tasks();
// Wait for the DNS resolution to fail
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
BOOST_CHECK_EQUAL(count, 3);
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
fail_dns_resolution = true;
// Trigger DNS resolver to check for address changes
// Resolver will not re-check automatically after successful resolution
vector_store_client_tester::trigger_dns_resolver(vs);
// Wait for the DNS resolution to fail again
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
// Resolve to a different address
address = inet_address("127.0.0.2");
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
BOOST_CHECK_EQUAL(count, 6);
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
co_await vs.stop();
}

View File

@@ -15,8 +15,8 @@
#include <vector>
// This class supports atomic removes (by using a lock and returning a
// future) and non atomic insert and iteration (by using indexes).
// This class supports atomic inserts, removes, and iteration.
// All operations are synchronized using a read-write lock.
template <typename T>
class atomic_vector {
std::vector<T> _vec;
@@ -24,6 +24,10 @@ class atomic_vector {
public:
void add(const T& value) {
auto lock = _vec_lock.for_write().lock().get();
auto unlock = seastar::defer([this] {
_vec_lock.for_write().unlock();
});
_vec.push_back(value);
}
seastar::future<> remove(const T& value) {
@@ -44,11 +48,14 @@ public:
auto unlock = seastar::defer([this] {
_vec_lock.for_read().unlock();
});
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
}
}
@@ -59,16 +66,17 @@ public:
void thread_for_each_nested(seastar::noncopyable_function<void(T)> func) const {
// When called in the context of thread_for_each, the read lock is
// already held, so we don't need to acquire it again. Acquiring it
// again could lead to a deadlock.
if (!_vec_lock.locked()) {
utils::on_internal_error("thread_for_each_nested called without holding the vector lock");
}
// again could lead to a deadlock. This function must only be called
// while holding the read lock on _vec_lock.
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
// Take a snapshot of the current contents while the read lock is held,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
}
}
@@ -79,11 +87,14 @@ public:
// preemption.
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
auto holder = co_await _vec_lock.hold_read_lock();
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
co_await func(_vec[i]);
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
co_await func(snapshot[i]);
}
}
};