batch_manager.py
ofrak_io.batch_manager
AbstractBatchManager (_BatchManagerImplementation, ABC)
An implementation of BatchManagerInterface
which allows for defining a pattern of
batch managers as a subclass. See that BatchManagerInterface
's documentation for details on
what this class does. This class is a Generic type with two type arguments, for the type of
each individual request and each individual return type.
Subclassing AbstractBatchManager is preferred when make_batch_manager
is called in multiple
places with the same arguments, or if the handler function needs some persistent state. For
making a one-off batch manager with minimal lines of code, use make_batch_manager
.
This class is implemented for asyncio, but is NOT GENERALLY THREAD-SAFE! That is, instances must not be shared between manually accessed threads.
Attributes:
Name | Type | Description |
---|---|---|
rate_limit |
ClassVar[int] |
maximum number of times |
handle_requests(self, requests)
async
Handle multiple requests at once and return the results as pairs of (request, result).
Equivalent to handler_function
argument to make_batch_manager
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
requests |
Tuple[~Request, ...] |
one or more objects that were passed to |
required |
Returns:
Type | Description |
---|---|
Iterable[Tuple[~Request, ~Result]] |
pairs of (request, result) |
Source code in ofrak_io/batch_manager.py
@abstractmethod
async def handle_requests(
self, requests: Tuple[Request, ...]
) -> Iterable[Tuple[Request, Result]]:
"""
Handle multiple requests at once and return the results as pairs of (request, result).
Equivalent to `handler_function` argument to `make_batch_manager`.
:param requests: one or more objects that were passed to `get_result`
:return: pairs of (request, result)
"""
raise NotImplementedError()
unique_key_for_request(request)
staticmethod
Function to uniquely identify requests. If the Request type is unhashable, this method must be overridden (the default function uses the hash).
Equivalent to request_key_f
argument to make_batch_manager
.
Source code in ofrak_io/batch_manager.py
@staticmethod
def unique_key_for_request(request: Request) -> _RequestKeyT:
"""
Function to uniquely identify requests. If the Request type is unhashable, this method
must be overridden (the default function uses the hash).
Equivalent to `request_key_f` argument to `make_batch_manager`.
"""
return _DEFAULT_REQUEST_KEY(request)
BatchManagerInterface (Generic, ABC)
Class which manages automatically batching async requests to some resource (like a remote server) to limit the number of individual requests.
get_result(self, request)
async
Get the result for a request. The request may be batched with one or more other pending
requests before being passed to the handler_function
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
~Request |
request to be passed in the argument of |
required |
Returns:
Type | Description |
---|---|
~Result |
result for the given request |
Exceptions:
Type | Description |
---|---|
NotAllRequestsHandledError |
if this or any other requests were not handled by the |
Source code in ofrak_io/batch_manager.py
@abstractmethod
async def get_result(self, request: Request) -> Result:
"""
Get the result for a request. The request may be batched with one or more other pending
requests before being passed to the `handler_function`.
:param request: request to be passed in the argument of `handler_function`
:return: result for the given request
:raises NotAllRequestsHandledError: if this or any other requests were not handled by the
`handler_function` passed to the constructor.
"""
raise NotImplementedError()
_BatchManagerImplementation (BatchManagerInterface)
private
get_result(self, request)
async
Get the result for a request. The request may be batched with one or more other pending
requests before being passed to the handler_function
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
~Request |
request to be passed in the argument of |
required |
Returns:
Type | Description |
---|---|
~Result |
result for the given request |
Exceptions:
Type | Description |
---|---|
NotAllRequestsHandledError |
if this or any other requests were not handled by the |
Source code in ofrak_io/batch_manager.py
async def get_result(self, request: Request) -> Result:
current_batch = self._current_batch
current_batch.add_request(request)
# Gives self._handler_loop_task a chance to raise its errors
done, _ = await asyncio.wait(
(current_batch.result(request), self._handler_loop_task),
return_when=asyncio.FIRST_COMPLETED,
)
return next(iter(done)).result()
make_batch_manager(handler_function, rate_limit=10, request_key_f=<function _DEFAULT_REQUEST_KEY at 0x7f15626b28b0>)
Construct an object which will automatically batch every call to get_result
into periodic
calls to handler_function
.
This function is the preferred way to make a one-off batch manager with minimal lines of
code. If you find yourself calling this function with the same arguments multiple times,
consider instead defining a subclass of AbstractBatchManager
. This is functionally
equivalent to calling make_batch_manager
with the same arguments, but the code is cleaner.
The returned BatchManagerInterface is implemented for asyncio, but is NOT GENERALLY THREAD-SAFE! That is, it must not be shared between manually accessed threads.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handler_function |
Callable[[Tuple[~Request, ...]], Awaitable[Iterable[Tuple[~Request, ~Result]]]] |
function to handle multiple requests at once and return the results as pairs of (request, result) |
required |
rate_limit |
int |
maximum number of times |
10 |
request_key_f |
Callable[[~Request], str] |
function this manager should use to uniquely identify requests |
<function _DEFAULT_REQUEST_KEY at 0x7f15626b28b0> |
Returns:
Type | Description |
---|---|
ofrak_io.batch_manager.BatchManagerInterface[~Request, ~Result] |
an instance of a |
Source code in ofrak_io/batch_manager.py
def make_batch_manager(
handler_function: _BatchHandlerFunctionT,
rate_limit: int = _DEFAULT_RATE_LIMIT,
request_key_f: Callable[[Request], _RequestKeyT] = _DEFAULT_REQUEST_KEY,
) -> BatchManagerInterface[Request, Result]:
"""
Construct an object which will automatically batch every call to `get_result` into periodic
calls to `handler_function`.
This function is the preferred way to make a one-off batch manager with minimal lines of
code. If you find yourself calling this function with the same arguments multiple times,
consider instead defining a subclass of `AbstractBatchManager`. This is functionally
equivalent to calling `make_batch_manager` with the same arguments, but the code is cleaner.
The returned BatchManagerInterface is implemented for asyncio, but is NOT GENERALLY
THREAD-SAFE! That is, it must not be shared between manually accessed threads.
:param handler_function: function to handle multiple requests at once and return the results
as pairs of (request, result)
:param rate_limit: maximum number of times `handler_function` will be called per second
:param request_key_f: function this manager should use to uniquely identify requests
:return: an instance of a `BatchManagerInterface` using `handler_function` to get results
"""
return _BatchManagerImplementation(handler_function, rate_limit, request_key_f)