Files
scylladb/utils/sequential_producer.hh
Pawel Pery 8d3c33f74a utils: refactor sequential_producer as abortable
This patch is a part of vector_store_client sharded service
implementation for a communication with vector-store service.

There is a need for abortable sequention_producer operator(). The
existing operator() is changed to allow timeout argument with default
time_point::max() (as current default usage) and the new operator() is
created with abort_source parameter.

Reference: VS-47
2025-07-08 16:29:55 +02:00

55 lines
1.4 KiB
C++

/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <functional>
#include <seastar/core/shared_future.hh>
#include <stdexcept>
/// Invokes a factory to produce an object, but sequentially: only one fiber at a time may be executing the
/// factory. Any other fiber requesting the object will wait for the existing factory invocation to finish, then
/// copy the result.
///
/// TODO: Move to Seastar.
template<typename T>
class sequential_producer {
public:
using factory_t = std::function<seastar::future<T>()>;
using time_point = seastar::shared_future<T>::time_point;
private:
factory_t _factory;
seastar::shared_future<T> _churning; ///< Resolves when the previous _factory call completes.
public:
sequential_producer(factory_t&& f) : _factory(std::move(f))
{
clear();
}
seastar::future<T> operator()(time_point timeout = time_point::max()) {
if (_churning.available()) {
_churning = _factory();
}
return _churning.get_future(timeout);
}
seastar::future<T> operator()(seastar::abort_source& as) {
if (_churning.available()) {
_churning = _factory();
}
return _churning.get_future(as);
}
void clear() {
_churning = seastar::make_exception_future<T>(
std::logic_error("initial future used in sequential_producer"));
}
};