Skip to content

batch_manager.py

ofrak_io.batch_manager

AbstractBatchManager (_BatchManagerImplementation, ABC)

An implementation of BatchManagerInterface which allows for defining a pattern of batch managers as a subclass. See that BatchManagerInterface's documentation for details on what this class does. This class is a Generic type with two type arguments, for the type of each individual request and each individual return type.

Subclassing AbstractBatchManager is preferred when make_batch_manager is called in multiple places with the same arguments, or if the handler function needs some persistent state. For making a one-off batch manager with minimal lines of code, use make_batch_manager.

This class is implemented for asyncio, but is NOT GENERALLY THREAD-SAFE! That is, instances must not be shared between manually accessed threads.

Attributes:

Name Type Description
rate_limit ClassVar[int]

maximum number of times handle_requests will be called per second (equivalent to rate_limit argument to make_batch_manager)

handle_requests(self, requests) async

Handle multiple requests at once and return the results as pairs of (request, result). Equivalent to handler_function argument to make_batch_manager.

Parameters:

Name Type Description Default
requests Tuple[~Request, ...]

one or more objects that were passed to get_result

required

Returns:

Type Description
Iterable[Tuple[~Request, ~Result]]

pairs of (request, result)

Source code in ofrak_io/batch_manager.py
@abstractmethod
async def handle_requests(
    self, requests: Tuple[Request, ...]
) -> Iterable[Tuple[Request, Result]]:
    """
    Handle multiple requests at once and return the results as pairs of (request, result).
    Equivalent to `handler_function` argument to `make_batch_manager`.

    :param requests: one or more objects that were passed to `get_result`

    :return: pairs of (request, result)
    """
    raise NotImplementedError()

unique_key_for_request(request) staticmethod

Function to uniquely identify requests. If the Request type is unhashable, this method must be overridden (the default function uses the hash).

Equivalent to request_key_f argument to make_batch_manager.

Source code in ofrak_io/batch_manager.py
@staticmethod
def unique_key_for_request(request: Request) -> _RequestKeyT:
    """
    Function to uniquely identify requests. If the Request type is unhashable, this method
    must be overridden (the default function uses the hash).

    Equivalent to `request_key_f` argument to `make_batch_manager`.
    """
    return _DEFAULT_REQUEST_KEY(request)

BatchManagerInterface (Generic, ABC)

Class which manages automatically batching async requests to some resource (like a remote server) to limit the number of individual requests.

get_result(self, request) async

Get the result for a request. The request may be batched with one or more other pending requests before being passed to the handler_function.

Parameters:

Name Type Description Default
request ~Request

request to be passed in the argument of handler_function

required

Returns:

Type Description
~Result

result for the given request

Exceptions:

Type Description
NotAllRequestsHandledError

if this or any other requests were not handled by the handler_function passed to the constructor.

Source code in ofrak_io/batch_manager.py
@abstractmethod
async def get_result(self, request: Request) -> Result:
    """
    Get the result for a request. The request may be batched with one or more other pending
    requests before being passed to the `handler_function`.

    :param request: request to be passed in the argument of `handler_function`

    :return: result for the given request

    :raises NotAllRequestsHandledError: if this or any other requests were not handled by the
    `handler_function` passed to the constructor.
    """
    raise NotImplementedError()

_BatchManagerImplementation (BatchManagerInterface) private

get_result(self, request) async

Get the result for a request. The request may be batched with one or more other pending requests before being passed to the handler_function.

Parameters:

Name Type Description Default
request ~Request

request to be passed in the argument of handler_function

required

Returns:

Type Description
~Result

result for the given request

Exceptions:

Type Description
NotAllRequestsHandledError

if this or any other requests were not handled by the handler_function passed to the constructor.

Source code in ofrak_io/batch_manager.py
async def get_result(self, request: Request) -> Result:
    current_batch = self._current_batch
    current_batch.add_request(request)
    # Gives self._handler_loop_task a chance to raise its errors
    done, _ = await asyncio.wait(
        (current_batch.result(request), self._handler_loop_task),
        return_when=asyncio.FIRST_COMPLETED,
    )
    return next(iter(done)).result()

make_batch_manager(handler_function, rate_limit=10, request_key_f=<function _DEFAULT_REQUEST_KEY at 0x7f1516c5d8b0>)

Construct an object which will automatically batch every call to get_result into periodic calls to handler_function.

This function is the preferred way to make a one-off batch manager with minimal lines of code. If you find yourself calling this function with the same arguments multiple times, consider instead defining a subclass of AbstractBatchManager. This is functionally equivalent to calling make_batch_manager with the same arguments, but the code is cleaner.

The returned BatchManagerInterface is implemented for asyncio, but is NOT GENERALLY THREAD-SAFE! That is, it must not be shared between manually accessed threads.

Parameters:

Name Type Description Default
handler_function Callable[[Tuple[~Request, ...]], Awaitable[Iterable[Tuple[~Request, ~Result]]]]

function to handle multiple requests at once and return the results as pairs of (request, result)

required
rate_limit int

maximum number of times handler_function will be called per second

10
request_key_f Callable[[~Request], str]

function this manager should use to uniquely identify requests

<function _DEFAULT_REQUEST_KEY at 0x7f1516c5d8b0>

Returns:

Type Description
ofrak_io.batch_manager.BatchManagerInterface[~Request, ~Result]

an instance of a BatchManagerInterface using handler_function to get results

Source code in ofrak_io/batch_manager.py
def make_batch_manager(
    handler_function: _BatchHandlerFunctionT,
    rate_limit: int = _DEFAULT_RATE_LIMIT,
    request_key_f: Callable[[Request], _RequestKeyT] = _DEFAULT_REQUEST_KEY,
) -> BatchManagerInterface[Request, Result]:
    """
    Construct an object which will automatically batch every call to `get_result` into periodic
    calls to `handler_function`.

    This function is the preferred way to make a one-off batch manager with minimal lines of
    code. If you find yourself calling this function with the same arguments multiple times,
    consider instead defining a subclass of `AbstractBatchManager`. This is functionally
    equivalent to calling `make_batch_manager` with the same arguments, but the code is cleaner.

    The returned BatchManagerInterface is implemented for asyncio, but is NOT GENERALLY
    THREAD-SAFE! That is, it must not be shared between manually accessed threads.

    :param handler_function: function to handle multiple requests at once and return the results
    as pairs of (request, result)
    :param rate_limit:  maximum number of times `handler_function` will be called per second
    :param request_key_f: function this manager should use to uniquely identify requests

    :return: an instance of a `BatchManagerInterface` using `handler_function` to get results
    """
    return _BatchManagerImplementation(handler_function, rate_limit, request_key_f)