This patch is a part of vector_store_client sharded service implementation for a communication with vector-store service. There is a need for abortable sequention_producer operator(). The existing operator() is changed to allow timeout argument with default time_point::max() (as current default usage) and the new operator() is created with abort_source parameter. Reference: VS-47
55 lines
1.4 KiB
C++
55 lines
1.4 KiB
C++
/*
|
|
* Copyright (C) 2021 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <functional>
|
|
#include <seastar/core/shared_future.hh>
|
|
#include <stdexcept>
|
|
|
|
/// Invokes a factory to produce an object, but sequentially: only one fiber at a time may be executing the
|
|
/// factory. Any other fiber requesting the object will wait for the existing factory invocation to finish, then
|
|
/// copy the result.
|
|
///
|
|
/// TODO: Move to Seastar.
|
|
template<typename T>
|
|
class sequential_producer {
|
|
public:
|
|
using factory_t = std::function<seastar::future<T>()>;
|
|
using time_point = seastar::shared_future<T>::time_point;
|
|
|
|
private:
|
|
factory_t _factory;
|
|
seastar::shared_future<T> _churning; ///< Resolves when the previous _factory call completes.
|
|
|
|
public:
|
|
sequential_producer(factory_t&& f) : _factory(std::move(f))
|
|
{
|
|
clear();
|
|
}
|
|
|
|
seastar::future<T> operator()(time_point timeout = time_point::max()) {
|
|
if (_churning.available()) {
|
|
_churning = _factory();
|
|
}
|
|
return _churning.get_future(timeout);
|
|
}
|
|
|
|
seastar::future<T> operator()(seastar::abort_source& as) {
|
|
if (_churning.available()) {
|
|
_churning = _factory();
|
|
}
|
|
return _churning.get_future(as);
|
|
}
|
|
|
|
void clear() {
|
|
_churning = seastar::make_exception_future<T>(
|
|
std::logic_error("initial future used in sequential_producer"));
|
|
}
|
|
};
|