Used to atomically return a dirty object to the pool and then use the space freed by this object to get another object. Unlike `put(is_dirty=True)` followed by `get`, a concurrent waiter cannot take away our space from us. A piece of `get` was refactored to a private function `_build_and_get`, this piece is also used in `replace_dirty`.
166 lines
6.2 KiB
Python
166 lines
6.2 KiB
Python
import asyncio
|
|
from typing import Generic, Callable, Awaitable, TypeVar, AsyncContextManager, Final, Optional
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
class Pool(Generic[T]):
|
|
"""Asynchronous object pool.
|
|
You need a pool of up to N objects, but objects should be created
|
|
on demand, so that if you use less, you don't create anything upfront.
|
|
If there is no object in the pool and all N objects are in use, you want
|
|
to wait until one of the object is returned to the pool. Expects a
|
|
builder async function to build a new object and a destruction async
|
|
function to clean up after a 'dirty' object (see below).
|
|
|
|
Usage example:
|
|
async def start_server():
|
|
return Server()
|
|
async def destroy_server(server):
|
|
await server.free_resources()
|
|
pool = Pool(4, start_server, destroy_server)
|
|
|
|
server = await pool.get()
|
|
try:
|
|
await run_test(test, server)
|
|
finally:
|
|
await pool.put(server)
|
|
|
|
Alternatively:
|
|
async with pool.instance(dirty_on_exception=False) as server:
|
|
await run_test(test, server)
|
|
|
|
|
|
If the object is considered no longer usable by other users of the pool
|
|
you can pass `is_dirty=True` flag to `put`, which will cause the object
|
|
to be 'destroyed' (by calling the provided `destroy` function on it) and
|
|
will free up space in the pool.
|
|
server = await.pool.get()
|
|
dirty = True
|
|
try:
|
|
dirty = await run_test(test, server)
|
|
finally:
|
|
await pool.put(server, is_dirty=dirty)
|
|
|
|
Alternatively:
|
|
async with (cm := pool.instance(dirty_on_exception=True)) as server:
|
|
cm.dirty = await run_test(test, server)
|
|
# It will also be considered dirty if run_test throws an exception
|
|
|
|
|
|
To atomically return a dirty object and use the freed space to obtain
|
|
another object, you can use `replace_dirty`. This is different from a
|
|
`put(is_dirty=True)` call followed by a `get` call, where a concurrent
|
|
waiter might take the space freed up by `put`.
|
|
server = await.pool.get()
|
|
dirty = False
|
|
try:
|
|
for _ in range(num_runs):
|
|
if dirty:
|
|
srv = server
|
|
server = None
|
|
server = await pool.replace_dirty(srv)
|
|
dirty = await run_test(test, server)
|
|
finally:
|
|
if server:
|
|
await pool.put(is_dirty=dirty)
|
|
"""
|
|
def __init__(self, max_size: int,
|
|
build: Callable[..., Awaitable[T]],
|
|
destroy: Callable[[T], Awaitable[None]]):
|
|
assert(max_size >= 0)
|
|
self.max_size: Final[int] = max_size
|
|
self.build: Final[Callable[..., Awaitable[T]]] = build
|
|
self.destroy: Final[Callable[[T], Awaitable]] = destroy
|
|
self.cond: Final[asyncio.Condition] = asyncio.Condition()
|
|
self.pool: list[T] = []
|
|
self.total: int = 0 # len(self.pool) + leased objects
|
|
|
|
async def get(self, *args, **kwargs) -> T:
|
|
"""Borrow an object from the pool.
|
|
|
|
If a new object must be built first, *args and **kwargs
|
|
will be passed to the build function and the object built
|
|
in this way will be returned. However, remember that there
|
|
is no guarantee whether a new object will be built
|
|
or an existing one will be borrowed.
|
|
"""
|
|
async with self.cond:
|
|
await self.cond.wait_for(lambda: self.pool or self.total < self.max_size)
|
|
if self.pool:
|
|
return self.pool.pop()
|
|
|
|
# No object in pool, but total < max_size so we can construct one
|
|
self.total += 1
|
|
|
|
return await self._build_and_get(*args, **kwargs)
|
|
|
|
async def put(self, obj: T, is_dirty: bool):
|
|
"""Return a previously borrowed object to the pool
|
|
if it's not dirty, otherwise destroy the object
|
|
and free up space in the pool.
|
|
"""
|
|
if is_dirty:
|
|
await self.destroy(obj)
|
|
|
|
async with self.cond:
|
|
if is_dirty:
|
|
self.total -= 1
|
|
else:
|
|
self.pool.append(obj)
|
|
self.cond.notify()
|
|
|
|
async def replace_dirty(self, obj: T, *args, **kwargs) -> T:
|
|
"""Atomically `put` a previously borrowed dirty object and `get` another one.
|
|
The 'atomicity' guarantees that the space freed up by the returned object
|
|
is used to return another object to the caller. The caller doesn't need
|
|
to wait for space to be freed by another user of the pool.
|
|
|
|
Note: the returned object might have been constructed earlier or it might
|
|
be built right now, as in `get`.
|
|
*args and **kwargs are used as in `get`.
|
|
"""
|
|
await self.destroy(obj)
|
|
|
|
async with self.cond:
|
|
if self.pool:
|
|
self.total -= 1
|
|
return self.pool.pop()
|
|
|
|
# Need to construct a new object.
|
|
# The space for this object is already accounted for in self.total.
|
|
|
|
return await self._build_and_get(*args, **kwargs)
|
|
|
|
def instance(self, dirty_on_exception: bool, *args, **kwargs) -> AsyncContextManager[T]:
|
|
class Instance:
|
|
def __init__(self, pool: Pool[T], dirty_on_exception: bool):
|
|
self.pool = pool
|
|
self.dirty = False
|
|
self.dirty_on_exception = dirty_on_exception
|
|
|
|
async def __aenter__(self):
|
|
self.obj = await self.pool.get(*args, **kwargs)
|
|
return self.obj
|
|
|
|
async def __aexit__(self, exc_type, exc, obj):
|
|
if self.obj:
|
|
self.dirty |= self.dirty_on_exception and exc is not None
|
|
await self.pool.put(self.obj, is_dirty=self.dirty)
|
|
self.obj = None
|
|
|
|
return Instance(self, dirty_on_exception)
|
|
|
|
async def _build_and_get(self, *args, **kwargs) -> T:
|
|
"""Precondition: we allocated space for this object
|
|
(it's included in self.total).
|
|
"""
|
|
try:
|
|
obj = await self.build(*args, **kwargs)
|
|
except:
|
|
async with self.cond:
|
|
self.total -= 1
|
|
self.cond.notify()
|
|
raise
|
|
return obj
|