Skip to content

job_service.py

ofrak.service.job_service

JobService (JobServiceInterface)

_run_component(self, metadata, job_id, resource_id, component, job_context, config) async private

Run a component, return the result as well as some (optional) metadata (such as the request that triggered the component to run). If it raises an error, the error is returned as an object rather than being raised.

Once the component finishes, log it and remove this task from the set of active tasks.

Source code in ofrak/service/job_service.py
async def _run_component(
    self,
    metadata: M,
    job_id: bytes,
    resource_id: bytes,
    component: ComponentInterface,
    job_context: JobRunContext,
    config: CC,
) -> _RunTaskResultT:
    """
    Run a component, return the result as well as some (optional) metadata (such as the request
    that triggered the component to run). If it raises an error, the error is returned
    as an object rather than being raised.

    Once the component finishes, log it and remove this task from the set of active tasks.
    """
    LOGGER.info(
        f"JOB {job_id.hex()} - Running {component.get_id().decode()} on "
        f"resource {resource_id.hex()}"
    )

    # Create a new resource context for every component
    fresh_resource_context = self._resource_context_factory.create()
    fresh_resource_view_context = ResourceViewContext()
    result: Union[ComponentRunResult, BaseException]
    try:
        result = await component.run(
            job_id,
            resource_id,
            job_context,
            fresh_resource_context,
            fresh_resource_view_context,
            config,
        )
        _log_component_run_result_info(job_id, resource_id, component, result)
    except Exception as e:
        result = e
    component_task_id = (resource_id, component.get_id())
    del self._active_component_tasks[component_task_id]

    return result, metadata

run_component(self, request, job_context=None) async

Run a single component for a job.

Parameters:

Name Type Description Default
request JobComponentRequest required
job_context Optional[ofrak.model.job_model.JobRunContext]

Context of the job to run the component in.

None

Returns:

Type Description
ComponentRunResult

A data structure describing the component run and resources modified/created/deleted.

Source code in ofrak/service/job_service.py
async def run_component(
    self,
    request: JobComponentRequest,
    job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:
    component = self._component_locator.get_by_id(request.component_id)
    if job_context is None:
        job_context = self._job_context_factory.create()
    result, _ = await self._create_run_component_task(
        request,
        request.job_id,
        request.resource_id,
        component,
        job_context,
        request.config,
    )
    if isinstance(result, BaseException):
        raise result
    else:
        return result

run_analyzer_by_attribute(self, request, job_context=None) async

Choose one or more Analyzer components to analyze the requested attributes on the given resource.

Parameters:

Name Type Description Default
request JobAnalyzerRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, which attributes the Analyzers should output, and the tags of the target resource.

required
job_context Optional[ofrak.model.job_model.JobRunContext]

Context of the job to run the component in.

None

Returns:

Type Description
ComponentRunResult

A data structure describing the component(s) run and resources modified/created/deleted.

Exceptions:

Type Description
NotFoundError

If no Analyzers can be found targeting the specified tags and outputting the specified attributes.

Source code in ofrak/service/job_service.py
async def run_analyzer_by_attribute(
    self,
    request: JobAnalyzerRequest,
    job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:
    if job_context is None:
        job_context = self._job_context_factory.create()

    target_resource_model = await self._resource_service.get_by_id(request.resource_id)
    # There may be an analyzer that outputs ALL the requested attributes at once
    # If there is (as is usually the case for views), only run that one
    one_analyzer_for_all_attributes: ComponentFilter = ComponentAndMetaFilter(
        AnalyzerOutputFilter(*request.attributes),
        _build_tag_filter(tuple(target_resource_model.get_tags())),
    )
    # Otherwise, look for individual analyzers for each attributes type
    analyzer_for_each_attribute: List[ComponentFilter] = [
        ComponentAndMetaFilter(
            AnalyzerOutputFilter(attr_t),
            _build_tag_filter(tuple(target_resource_model.get_tags())),
        )
        for attr_t in request.attributes
    ]

    component_filter: ComponentFilter = ComponentAndMetaFilter(
        ANALYZERS_FILTER,
        ComponentPrioritySelectingMetaFilter(
            one_analyzer_for_all_attributes,
            ComponentOrMetaFilter(*analyzer_for_each_attribute),
        ),
    )

    components_result = await self._auto_run_components(
        (
            _ComponentAutoRunRequest(
                request.resource_id,
                component_filter,
            ),
        ),
        request.job_id,
        job_context,
    )
    if components_result.components_run:
        return components_result
    else:
        raise NotFoundError(
            f"Unable to find any analyzer for attributes "
            f"{', '.join(attr_t.__name__ for attr_t in request.attributes)}"
            f"\nFilter: {component_filter}"
        )

run_components(self, request) async

Automatically select one or more components to run on a resource. The components must match the provided component filters and target at least one of the tags of the resource.

Parameters:

Name Type Description Default
request JobMultiComponentRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, and filters for the components to run.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Exceptions:

Type Description
ComponentAutoRunFailure

if one of the automatically chosen components raises an error while running.

NoMatchingComponentException

if no components match the filters for the resource.

Source code in ofrak/service/job_service.py
async def run_components(
    self,
    request: JobMultiComponentRequest,
) -> ComponentRunResult:
    resource = await self._resource_service.get_by_id(request.resource_id)
    component_filter = _build_auto_run_filter(request)

    tags_to_target = tuple(resource.tags)
    components_result = ComponentRunResult()
    while len(tags_to_target) > 0:
        job_context = self._job_context_factory.create()
        component_tag_filter = _build_tag_filter(tags_to_target)
        final_filter = ComponentAndMetaFilter(component_filter, component_tag_filter)
        individual_component_results = await self._auto_run_components(
            (
                _ComponentAutoRunRequest(
                    request.resource_id,
                    final_filter,
                ),
            ),
            request.job_id,
            job_context,
        )

        components_result.update(individual_component_results)
        resource_tracker = job_context.trackers[request.resource_id]
        tags_added = resource_tracker.tags_added
        tags_to_target = tuple(tags_added)

    return components_result

run_components_recursively(self, request) async

Start from a resource and run components on it and then on any resources which have tags added as a result of that initial run, then run components on any resources with new tags from those subsequent runs, until an iteration of component runs results in no new tags being added. The component(s) run on each resource are chosen according to the provided filters and which tags were added to that resource in the previous iteration. That is, the filters are applied to the set of resource which target those new tags.

Parameters:

Name Type Description Default
request JobMultiComponentRequest

Data structure containing the ID of the job to run the components in, the ID of the resource to start running recursively from, and filters for the components to run.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Exceptions:

Type Description
ComponentAutoRunFailure

if one of the automatically chosen components raises an error while running.

Source code in ofrak/service/job_service.py
async def run_components_recursively(
    self, request: JobMultiComponentRequest
) -> ComponentRunResult:
    components_result = ComponentRunResult()
    component_filter = _build_auto_run_filter(request)

    initial_target_resource_models = await self._get_initial_recursive_target_resources(
        request.resource_id, component_filter
    )

    # Create a mock context to match all existing tags
    previous_job_context: JobRunContext = self._job_context_factory.create()
    for existing_resource_model in initial_target_resource_models:
        previous_job_context.trackers[existing_resource_model.id].tags_added.update(
            existing_resource_model.tags
        )
    iterations = 0
    tags_added_count = 1  # initialize just so loop starts

    while tags_added_count > 0:
        job_context = self._job_context_factory.create()
        _run_components_requests = []
        for resource_id, previous_tracker in previous_job_context.trackers.items():
            final_filter = ComponentAndMetaFilter(
                component_filter,
                _build_tag_filter(tuple(previous_tracker.tags_added)),
            )
            _run_components_requests.append(
                _ComponentAutoRunRequest(
                    resource_id,
                    final_filter,
                )
            )

        iteration_components_result = await self._auto_run_components(
            _run_components_requests,
            request.job_id,
            job_context,
        )
        components_result.update(iteration_components_result)

        tags_added_count = 0
        for resource_id, tracker in job_context.trackers.items():
            if len(tracker.tags_added) > 0:
                tags_added_count += len(tracker.tags_added)
        previous_job_context = job_context
        LOGGER.info(
            f"Completed iteration {iterations} of run_components_recursively on "
            f"{request.resource_id.hex()}. {len(components_result.resources_modified)} "
            f"resources modified and {tags_added_count} tags added."
        )
        iterations += 1
    return components_result

pack_recursively(self, job_id, resource_id) async

Call Packer components on the deepest descendants of a resource (the root of this search), then Packers on the next level up, etc. until the search root resource.

Parameters:

Name Type Description Default
job_id bytes

Job to run the component in.

required
resource_id bytes

ID of the search root resource.

required

Returns:

Type Description
ComponentRunResult

A data structure describing the components run and resources modified/created/deleted.

Source code in ofrak/service/job_service.py
async def pack_recursively(
    self,
    job_id: bytes,
    resource_id: bytes,
) -> ComponentRunResult:
    packer_filter = PACKERS_FILTER
    target_cache = self._build_target_cache(packer_filter)
    all_components_result = ComponentRunResult()
    if len(target_cache) == 0:
        return all_components_result
    resources = await self._resource_service.get_descendants_by_id(
        resource_id,
        r_filter=ResourceFilter(
            include_self=True,
            tags=tuple(target_cache.keys()),
            tags_condition=ResourceFilterCondition.OR,
        ),
    )
    resources = list(resources)  # we'll need that Iterable more than once
    job_context = self._job_context_factory.create()

    # We want to start with the deepest packers. Packers at the same levels can run
    # concurrently. So we first ask for the relative depth of each returned resource.
    resource_depths = await self._resource_service.get_depths(
        [resource.id for resource in resources]
    )

    resources_by_depth = defaultdict(list)
    for resource, depth in zip(resources, resource_depths):
        resources_by_depth[depth].append(resource)

    for depth in sorted(resources_by_depth.keys(), reverse=True):
        for resource in resources_by_depth[depth]:
            component_filter: ComponentFilter = ComponentAndMetaFilter(
                PACKERS_FILTER,
                _build_tag_filter(tuple(resource.get_tags())),
            )

            request = _ComponentAutoRunRequest(
                resource.id,
                component_filter,
            )

            component_result = await self._auto_run_components(
                [request],
                job_id,
                job_context,
            )
            n_packers_run = len(component_result.components_run)
            if n_packers_run == 0:
                all_components_result.update(component_result)
                break
            if n_packers_run > 1:
                raise ValueError(f"Multiple packers are targeting resource {resource.id.hex()}")
    return all_components_result

_ComponentAutoRunRequest dataclass private

_ComponentAutoRunRequest(target_resource_id: bytes, component_filter: ofrak.service.component_locator_i.ComponentFilter)

_build_tag_filter(tags) private

When auto-running components, most of the time only the most specific components should be run for a resource. For example, an APK resource is also a ZIP resource; we want to always run the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more specific tag. However, Identifiers and Analyzers are a special case because they have benign side-effects, so it is desirable to greedily run all Identifiers and Analzyers that could target a resource, not only the most specific Identifiers and Analyzers.

This function constructs a filter which allows only components that target at least one of the given tags, but for non-identifiers the filter is even stricter so that only the most specific components are filtered.

Parameters:

Name Type Description Default
tags Tuple[ofrak.model.tag_model.ResourceTag]

Tags to target, from the resource that is being auto-run on

required

Returns:

Type Description
ComponentFilter

A filter which allows a component to run if (it is an Identifier, AND it targets at least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most specific given tags that are targeted by components)

Source code in ofrak/service/job_service.py
@lru_cache(None)
def _build_tag_filter(tags: Tuple[ResourceTag]) -> ComponentFilter:
    """
    When auto-running components, most of the time only the *most specific* components should be
    run for a resource. For example, an APK resource is also a ZIP resource; we want to always run
    the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more
    specific tag. However, Identifiers and Analyzers are a special case because they have benign side-effects, so
    it is desirable to greedily run all Identifiers and Analzyers that could target a resource, not only the most
    specific Identifiers and Analyzers.

    This function constructs a filter which allows only components that target at least one of the
    given tags, but for non-identifiers the filter is even stricter so that only the most specific
    components are filtered.

    :param tags: Tags to target, from the resource that is being auto-run on

    :return: A filter which allows a component to run if (it is an Identifier, AND it targets at
    least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most
    specific given tags that are targeted by components)
    """
    tags_by_specificity = ResourceTag.sort_tags_into_tiers(tags)

    filters_prioritized_by_specificity = tuple(
        ComponentTargetFilter(*tag_specificity_level)
        for tag_specificity_level in tags_by_specificity
    )
    return ComponentOrMetaFilter(
        ComponentAndMetaFilter(
            IDENTIFIERS_FILTER,
            ComponentTargetFilter(*tags),
        ),
        ComponentAndMetaFilter(
            ANALYZERS_FILTER,
            ComponentTargetFilter(*tags),
        ),
        ComponentAndMetaFilter(
            ComponentNotMetaFilter(
                IDENTIFIERS_FILTER,
            ),
            ComponentNotMetaFilter(
                ANALYZERS_FILTER,
            ),
            ComponentPrioritySelectingMetaFilter(*filters_prioritized_by_specificity),
        ),
    )