Compare commits

...

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
1303567fa4 Fix race conditions in atomic_vector by adding synchronization and snapshots
- Add write lock to add() method to prevent concurrent modifications
- Remove insufficient locked() check in thread_for_each_nested()
- Add vector snapshots in all iteration methods to prevent races
- Update class documentation to reflect atomic operations

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-21 16:06:11 +00:00
copilot-swe-agent[bot]
e7d10b5be0 Initial plan 2025-12-21 16:00:55 +00:00

View File

@@ -15,8 +15,8 @@
#include <vector>
// This class supports atomic removes (by using a lock and returning a
// future) and non atomic insert and iteration (by using indexes).
// This class supports atomic inserts, removes, and iteration.
// All operations are synchronized using a read-write lock.
template <typename T>
class atomic_vector {
std::vector<T> _vec;
@@ -24,6 +24,10 @@ class atomic_vector {
public:
void add(const T& value) {
auto lock = _vec_lock.for_write().lock().get();
auto unlock = seastar::defer([this] {
_vec_lock.for_write().unlock();
});
_vec.push_back(value);
}
seastar::future<> remove(const T& value) {
@@ -44,11 +48,14 @@ public:
auto unlock = seastar::defer([this] {
_vec_lock.for_read().unlock();
});
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
}
}
@@ -59,16 +66,17 @@ public:
void thread_for_each_nested(seastar::noncopyable_function<void(T)> func) const {
// When called in the context of thread_for_each, the read lock is
// already held, so we don't need to acquire it again. Acquiring it
// again could lead to a deadlock.
if (!_vec_lock.locked()) {
utils::on_internal_error("thread_for_each_nested called without holding the vector lock");
}
// again could lead to a deadlock. This function must only be called
// while holding the read lock on _vec_lock.
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
func(_vec[i]);
// Take a snapshot of the current contents while the read lock is held,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
func(snapshot[i]);
}
}
@@ -79,11 +87,14 @@ public:
// preemption.
seastar::future<> for_each(seastar::noncopyable_function<seastar::future<>(T)> func) const {
auto holder = co_await _vec_lock.hold_read_lock();
// We grab a lock in remove(), but not in add(), so we
// iterate using indexes to guard against the vector being
// reallocated.
for (size_t i = 0, n = _vec.size(); i < n; ++i) {
co_await func(_vec[i]);
// Take a snapshot of the current contents while holding the read lock,
// so that concurrent add() calls and possible reallocations won't
// affect our iteration.
auto snapshot = _vec;
// We grab locks in both add() and remove(), so we iterate using
// indexes on the snapshot to avoid concurrent modifications.
for (size_t i = 0, n = snapshot.size(); i < n; ++i) {
co_await func(snapshot[i]);
}
}
};