Skip to content

job_service.py

ofrak.service.job_service

JobService (JobServiceInterface)

run_component(self, request, job_context=None) async

Run a single component for a job.

Parameters:

Name Type Description Default
request JobComponentRequest required
job_context Optional[ofrak.model.job_model.JobRunContext]

Context of the job to run the component in.

None

Returns:

Type Description
ComponentRunResult

A data structure describing the component run and resources modified/created/deleted.

Source code in ofrak/service/job_service.py
async def run_component(
    self,
    request: JobComponentRequest,
    job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:
    component = self._component_locator.get_by_id(request.component_id)
    if job_context is None:
        job_context = self._job_context_factory.create()
    return await self._run_component(
        request.job_id,
        request.resource_id,
        component,
        job_context,
        request.config,
    )

run_analyzer_by_attribute(self, request, job_context=None) async

Choose one or more Analyzer components to analyze the requested attributes on the given resource.

Parameters:

Name Type Description Default
request JobAnalyzerRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, which attributes the Analyzers should output, and the tags of the target resource.

required
job_context Optional[ofrak.model.job_model.JobRunContext]

Context of the job to run the component in.

None

Returns:

Type Description
ComponentRunResult

A data structure describing the component(s) run and resources modified/created/deleted.

Exceptions:

Type Description
NotFoundError

If no Analyzers can be found targeting the specified tags and outputting the specified attributes.

Source code in ofrak/service/job_service.py
async def run_analyzer_by_attribute(
    self,
    request: JobAnalyzerRequest,
    job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:

    if job_context is None:
        job_context = self._job_context_factory.create()

    target_resource_model = await self._resource_service.get_by_id(request.resource_id)
    component_filter: ComponentFilter = ComponentAndMetaFilter(
        ComponentTypeFilter(Analyzer),  # type: ignore
        AnalyzerOutputFilter(request.attributes),
        _build_tag_filter(target_resource_model.get_tags()),
    )

    components_result = await self._auto_run_components(
        (
            _ComponentAutoRunRequest(
                request.resource_id,
                component_filter,
            ),
        ),
        request.job_id,
        job_context,
    )
    if components_result.components_run:
        return components_result
    else:
        raise NotFoundError(
            f"Unable to find any analyzer for attributes {request.attributes.__name__}"
        )

run_components(self, request) async

Automatically select one or more components to run on a resource. The components must match the provided component filters and target at least one of the tags of the resource.

Parameters:

Name Type Description Default
request JobMultiComponentRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, and filters for the components to run.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Exceptions:

Type Description
ComponentAutoRunFailure

if one of the automatically chosen components raises an error while running.

NoMatchingComponentException

if no components match the filters for the resource.

Source code in ofrak/service/job_service.py
async def run_components(
    self,
    request: JobMultiComponentRequest,
) -> ComponentRunResult:
    resource = await self._resource_service.get_by_id(request.resource_id)
    component_filter = _build_auto_run_filter(request)

    tags_to_target = tuple(resource.tags)
    components_result = ComponentRunResult()
    while len(tags_to_target) > 0:
        job_context = self._job_context_factory.create()
        component_tag_filter = _build_tag_filter(tags_to_target)
        final_filter = ComponentAndMetaFilter(component_filter, component_tag_filter)
        individual_component_results = await self._auto_run_components(
            (
                _ComponentAutoRunRequest(
                    request.resource_id,
                    final_filter,
                ),
            ),
            request.job_id,
            job_context,
        )

        components_result.update(individual_component_results)
        resource_tracker = job_context.trackers[request.resource_id]
        tags_added = resource_tracker.tags_added
        tags_to_target = tuple(tags_added)

    return components_result

run_components_recursively(self, request) async

Start from a resource and run components on it and then on any resources which have tags added as a result of that initial run, then run components on any resources with new tags from those subsequent runs, until an iteration of component runs results in no new tags being added. The component(s) run on each resource are chosen according to the provided filters and which tags were added to that resource in the previous iteration. That is, the filters are applied to the set of resource which target those new tags.

Parameters:

Name Type Description Default
request JobMultiComponentRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to start running recursively from, and filters for the components to run.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Exceptions:

Type Description
ComponentAutoRunFailure

if one of the automatically chosen components raises an error while running.

Source code in ofrak/service/job_service.py
async def run_components_recursively(
    self, request: JobMultiComponentRequest
) -> ComponentRunResult:
    components_result = ComponentRunResult()
    component_filter = _build_auto_run_filter(request)

    initial_target_resource_models = await self._get_initial_recursive_target_resources(
        request.resource_id, component_filter
    )

    # Create a mock context to match all existing tags
    previous_job_context: JobRunContext = self._job_context_factory.create()
    for existing_resource_model in initial_target_resource_models:
        previous_job_context.trackers[existing_resource_model.id].tags_added.update(
            existing_resource_model.tags
        )
    iterations = 0
    tags_added_count = 1  # initialize just so loop starts

    while tags_added_count > 0:
        job_context = self._job_context_factory.create()
        _run_components_requests = []
        for resource_id, previous_tracker in previous_job_context.trackers.items():
            final_filter = ComponentAndMetaFilter(
                component_filter,
                _build_tag_filter(previous_tracker.tags_added),
            )
            _run_components_requests.append(
                _ComponentAutoRunRequest(
                    resource_id,
                    final_filter,
                )
            )

        iteration_components_result = await self._auto_run_components(
            _run_components_requests,
            request.job_id,
            job_context,
        )
        components_result.update(iteration_components_result)

        tags_added_count = 0
        for resource_id, tracker in job_context.trackers.items():
            if len(tracker.tags_added) > 0:
                tags_added_count += len(tracker.tags_added)
        previous_job_context = job_context
        LOGGER.info(
            f"Completed iteration {iterations} of run_components_recursively on "
            f"{request.resource_id.hex()}. {len(components_result.resources_modified)} "
            f"resources modified and {tags_added_count} tags added."
        )
        iterations += 1
    return components_result

pack_recursively(self, job_id, resource_id) async

Call Packer components on the deepest descendants of a resource (the root of this search), then Packers on the next level up, etc. until the search root resource.

Parameters:

Name Type Description Default
job_id bytes

Job to run the component in.

required
resource_id bytes

ID of the search root resource.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Source code in ofrak/service/job_service.py
async def pack_recursively(
    self,
    job_id: bytes,
    resource_id: bytes,
) -> ComponentRunResult:
    packer_filter = ComponentTypeFilter(Packer)  # type: ignore
    target_cache = self._build_target_cache(packer_filter)
    all_components_result = ComponentRunResult()
    if len(target_cache) == 0:
        return all_components_result
    resources = await self._resource_service.get_descendants_by_id(
        resource_id,
        r_filter=ResourceFilter(
            include_self=True,
            tags=tuple(target_cache.keys()),
            tags_condition=ResourceFilterCondition.OR,
        ),
    )
    resources = list(resources)  # we'll need that Iterable more than once
    job_context = self._job_context_factory.create()

    # We want to start with the deepest packers. Packers at the same levels can run
    # concurrently. So we first ask for the relative depth of each returned resource.
    resource_depths = await self._resource_service.get_depths(
        [resource.id for resource in resources]
    )

    resources_by_depth = defaultdict(list)
    for resource, depth in zip(resources, resource_depths):
        resources_by_depth[depth].append(resource)

    for depth in sorted(resources_by_depth.keys(), reverse=True):
        for resource in resources_by_depth[depth]:
            component_filter: ComponentFilter = ComponentAndMetaFilter(
                ComponentTypeFilter(Packer),  # type: ignore
                _build_tag_filter(resource.get_tags()),
            )

            request = _ComponentAutoRunRequest(
                resource.id,
                component_filter,
            )

            component_result = await self._auto_run_components(
                [request],
                job_id,
                job_context,
            )
            n_packers_run = len(component_result.components_run)
            if n_packers_run == 0:
                all_components_result.update(component_result)
                break
            if n_packers_run > 1:
                raise ValueError(f"Multiple packers are targeting resource {resource.id.hex()}")
    return all_components_result

_ComponentAutoRunRequest dataclass private

_ComponentAutoRunRequest(target_resource_id: bytes, component_filter: ofrak.service.component_locator_i.ComponentFilter)

_wrap_task_return_error_and_metadata(async_task, metadata) async private

Wraps an async task before awaiting it so that, if it raises an error, the error is returned as an object rather than being raised. Whether an error is raised or not, some metadata about the task may be included which will also be returned.

This is useful for tasks which will go into mass gather or wait calls, where the metadata related to them might otherwise be lost, which is problematic for error reporting/handling.

Parameters:

Name Type Description Default
async_task Awaitable[~R]

A task to await

required
metadata ~M

Arbitrary object associated with that task

required

Returns:

Type Description
Tuple[Union[~R, BaseException], ~M]

Tuple containing the result of async_task, which may be an error, and the unchangded arbitrary metadata object.

Source code in ofrak/service/job_service.py
async def _wrap_task_return_error_and_metadata(
    async_task: Awaitable[R], metadata: M
) -> Tuple[Union[R, BaseException], M]:
    """
    Wraps an async task before awaiting it so that, if it raises an error, the error is returned
    as an object rather than being raised. Whether an error is raised or not, some metadata about
    the task may be included which will also be returned.

    This is useful for tasks which will go into mass `gather` or `wait` calls, where the metadata
    related to them might otherwise be lost, which is problematic for error reporting/handling.

    :param async_task: A task to await
    :param metadata: Arbitrary object associated with that task

    :return: Tuple containing the result of `async_task`, which may be an error, and the unchangded
    arbitrary metadata object.
    """
    result = next(iter(await asyncio.gather(async_task, return_exceptions=True)))
    return result, metadata

_build_tag_filter(tags) private

When auto-running components, most of the time only the most specific components should be run for a resource. For example, an APK resource is also a ZIP resource; we want to always run the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more specific tag. However, Identifiers are a special case because they have benign side-effects, so it is desirable to greedily run all Identifiers that could target a resource, not only the most specific Identifiers.

This function constructs a filter which allows only components that target at least one of the given tags, but for non-identifiers the filter is even stricter so that only the most specific components are filtered.

Parameters:

Name Type Description Default
tags Iterable[ofrak.model.tag_model.ResourceTag]

Tags to target, from the resource that is being auto-run on

required

Returns:

Type Description
ComponentFilter

A filter which allows a component to run if (it is an Identifier, AND it targets at least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most specific given tags that are targeted by components)

Source code in ofrak/service/job_service.py
def _build_tag_filter(tags: Iterable[ResourceTag]) -> ComponentFilter:
    """
    When auto-running components, most of the time only the *most specific* components should be
    run for a resource. For example, an APK resource is also a ZIP resource; we want to always run
    the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more
    specific tag. However, Identifiers are a special case because they have benign side-effects, so
    it is desirable to greedily run all Identifiers that could target a resource, not only the most
    specific Identifiers.

    This function constructs a filter which allows only components that target at least one of the
    given tags, but for non-identifiers the filter is even stricter so that only the most specific
    components are filtered.

    :param tags: Tags to target, from the resource that is being auto-run on

    :return: A filter which allows a component to run if (it is an Identifier, AND it targets at
    least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most
    specific given tags that are targeted by components)
    """
    tags_by_specificity = ResourceTag.sort_tags_into_tiers(tags)

    filters_prioritized_by_specificity = [
        ComponentTargetFilter(*tag_specificity_level)
        for tag_specificity_level in reversed(tags_by_specificity)
    ]
    return ComponentOrMetaFilter(
        ComponentAndMetaFilter(
            ComponentTypeFilter(Identifier),  # type: ignore
            ComponentTargetFilter(*tags),
        ),
        ComponentAndMetaFilter(
            ComponentNotMetaFilter(
                ComponentTypeFilter(Identifier),  # type: ignore
            ),
            ComponentPrioritySelectingMetaFilter(*filters_prioritized_by_specificity),
        ),
    )