Files
scylladb/utils/observable.hh
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

149 lines
4.4 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/util/noncopyable_function.hh>
#include <vector>
#include <algorithm>
namespace utils {
// observable/observer - a publish/subscribe utility
//
// An observable is an object that can broadcast notifications
// about changes in some state. An observer listens for such notifications
// in a particular observable it is connected to. Multiple observers can
// observe a single observable.
//
// A connection between an observer and an observable is established when
// the observer is constructed (using observable::observe()); from then
// on their life cycles are separate, either can be moved or destroyed
// without affecting the other.
//
// During construction, the observer specifies how to react to a change
// in the observable's state by specifying a function to be called on
// a state change. An observable causes the function to be executed
// by calling its operator()() method.
//
// All observers are called without preemption, so an observer should have
// a small number of observers.
template <typename... Args>
class observable {
public:
class observer;
private:
std::vector<observer*> _observers;
public:
class observer {
friend class observable;
observable* _observable;
seastar::noncopyable_function<void (Args...)> _callback;
private:
void moved(observer* from) {
if (_observable) {
_observable->moved(from, this);
}
}
public:
observer(observable* o, seastar::noncopyable_function<void (Args...)> callback) noexcept
: _observable(o), _callback(std::move(callback)) {
}
observer(observer&& o) noexcept
: _observable(std::exchange(o._observable, nullptr))
, _callback(std::move(o._callback)) {
moved(&o);
}
observer& operator=(observer&& o) noexcept {
if (this != &o) {
disconnect();
_observable = std::exchange(o._observable, nullptr);
_callback = std::move(o._callback);
moved(&o);
}
return *this;
}
~observer() {
disconnect();
}
// Stops observing the observable immediately, instead of
// during destruction.
void disconnect() {
if (_observable) {
_observable->destroyed(this);
}
_observable = nullptr;
}
};
friend class observer;
private:
void destroyed(observer* dead) {
_observers.erase(std::ranges::remove(_observers, dead).begin(), _observers.end());
}
void moved(observer* from, observer* to) {
std::ranges::replace(_observers, from, to);
}
void update_observers(observable* ob) {
for (auto&& c : _observers) {
c->_observable = ob;
}
}
public:
observable() = default;
observable(observable&& o) noexcept
: _observers(std::move(o._observers)) {
update_observers(this);
}
observable& operator=(observable&& o) noexcept {
if (this != &o) {
update_observers(nullptr);
_observers = std::move(o._observers);
update_observers(this);
}
return *this;
}
~observable() {
update_observers(nullptr);
}
// Send args to all connected observers
void operator()(Args... args) const {
std::exception_ptr e;
for (auto&& ob : _observers) {
try {
ob->_callback(args...);
} catch (...) {
if (!e) {
e = std::current_exception();
}
}
}
if (e) {
std::rethrow_exception(std::move(e));
}
}
// Adds an observer to an observable
observer observe(std::function<void (Args...)> callback) {
observer ob(this, std::move(callback));
_observers.push_back(&ob);
return ob;
}
};
// An observer<Args...> can receive notifications about changes
// in an observable<Args...>'s state.
template <typename... Args>
using observer = typename observable<Args...>::observer;
template <typename... Args>
inline observer<Args...> dummy_observer() {
return observer<Args...>(nullptr, seastar::noncopyable_function<void(Args...)>());
}
}