/* * Copyright (C) 2020-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include "utils/on_internal_error.hh" #include #include #include #include // This class supports atomic removes (by using a lock and returning a // future) and non atomic insert and iteration (by using indexes). template class atomic_vector { std::vector _vec; mutable seastar::rwlock _vec_lock; public: void add(const T& value) { _vec.push_back(value); } seastar::future<> remove(const T& value) { return with_lock(_vec_lock.for_write(), [this, value] { _vec.erase(std::remove(_vec.begin(), _vec.end(), value), _vec.end()); }); } // This must be called on a thread. The callback function must not // call remove or thread_for_each. If the callback function needs to // call this, use thread_for_each_nested instead. // // We would take callbacks that take a T&, but we had bugs in the // past with some of those callbacks holding that reference past a // preemption. void thread_for_each(seastar::noncopyable_function func) const { _vec_lock.for_read().lock().get(); auto unlock = seastar::defer([this] { _vec_lock.for_read().unlock(); }); // We grab a lock in remove(), but not in add(), so we // iterate using indexes to guard against the vector being // reallocated. for (size_t i = 0, n = _vec.size(); i < n; ++i) { func(_vec[i]); } } // This must be called on a thread. This should be used only from // the context of a thread_for_each callback, when the read lock is // already held. The callback function must not call remove or // thread_for_each. void thread_for_each_nested(seastar::noncopyable_function func) const { // When called in the context of thread_for_each, the read lock is // already held, so we don't need to acquire it again. Acquiring it // again could lead to a deadlock. if (!_vec_lock.locked()) { utils::on_internal_error("thread_for_each_nested called without holding the vector lock"); } // We grab a lock in remove(), but not in add(), so we // iterate using indexes to guard against the vector being // reallocated. for (size_t i = 0, n = _vec.size(); i < n; ++i) { func(_vec[i]); } } // The callback function must not call remove. // // We would take callbacks that take a T&, but we had bugs in the // past with some of those callbacks holding that reference past a // preemption. seastar::future<> for_each(seastar::noncopyable_function(T)> func) const { auto holder = co_await _vec_lock.hold_read_lock(); // We grab a lock in remove(), but not in add(), so we // iterate using indexes to guard against the vector being // reallocated. for (size_t i = 0, n = _vec.size(); i < n; ++i) { co_await func(_vec[i]); } } };