job_service.py
ofrak.service.job_service
JobService (JobServiceInterface)
_run_component(self, metadata, job_id, resource_id, component, job_context, config)
async
private
Run a component, return the result as well as some (optional) metadata (such as the request that triggered the component to run). If it raises an error, the error is returned as an object rather than being raised.
Once the component finishes, log it and remove this task from the set of active tasks.
Source code in ofrak/service/job_service.py
async def _run_component(
self,
metadata: M,
job_id: bytes,
resource_id: bytes,
component: ComponentInterface,
job_context: JobRunContext,
config: CC,
) -> _RunTaskResultT:
"""
Run a component, return the result as well as some (optional) metadata (such as the request
that triggered the component to run). If it raises an error, the error is returned
as an object rather than being raised.
Once the component finishes, log it and remove this task from the set of active tasks.
"""
LOGGER.info(
f"JOB {job_id.hex()} - Running {component.get_id().decode()} on "
f"resource {resource_id.hex()}"
)
# Create a new resource context for every component
fresh_resource_context = self._resource_context_factory.create()
fresh_resource_view_context = ResourceViewContext()
result: Union[ComponentRunResult, BaseException]
try:
result = await component.run(
job_id,
resource_id,
job_context,
fresh_resource_context,
fresh_resource_view_context,
config,
)
_log_component_run_result_info(job_id, resource_id, component, result)
except Exception as e:
result = e
component_task_id = (resource_id, component.get_id())
del self._active_component_tasks[component_task_id]
return result, metadata
run_component(self, request, job_context=None)
async
Run a single component for a job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
JobComponentRequest |
required | |
job_context |
Optional[ofrak.model.job_model.JobRunContext] |
Context of the job to run the component in. |
None |
Returns:
Type | Description |
---|---|
ComponentRunResult |
A data structure describing the component run and resources modified/created/deleted. |
Source code in ofrak/service/job_service.py
async def run_component(
self,
request: JobComponentRequest,
job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:
component = self._component_locator.get_by_id(request.component_id)
if job_context is None:
job_context = self._job_context_factory.create()
result, _ = await self._create_run_component_task(
request,
request.job_id,
request.resource_id,
component,
job_context,
request.config,
)
if isinstance(result, BaseException):
raise result
else:
return result
run_analyzer_by_attribute(self, request, job_context=None)
async
Choose one or more Analyzer components to analyze the requested attributes on the given resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
JobAnalyzerRequest |
Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, which attributes the Analyzers should output, and the tags of the target resource. |
required |
job_context |
Optional[ofrak.model.job_model.JobRunContext] |
Context of the job to run the component in. |
None |
Returns:
Type | Description |
---|---|
ComponentRunResult |
A data structure describing the component(s) run and resources modified/created/deleted. |
Exceptions:
Type | Description |
---|---|
NotFoundError |
If no Analyzers can be found targeting the specified tags and outputting the specified attributes. |
Source code in ofrak/service/job_service.py
async def run_analyzer_by_attribute(
self,
request: JobAnalyzerRequest,
job_context: Optional[JobRunContext] = None,
) -> ComponentRunResult:
if job_context is None:
job_context = self._job_context_factory.create()
target_resource_model = await self._resource_service.get_by_id(request.resource_id)
# There may be an analyzer that outputs ALL the requested attributes at once
# If there is (as is usually the case for views), only run that one
one_analyzer_for_all_attributes: ComponentFilter = ComponentAndMetaFilter(
AnalyzerOutputFilter(*request.attributes),
_build_tag_filter(tuple(target_resource_model.get_tags())),
)
# Otherwise, look for individual analyzers for each attributes type
analyzer_for_each_attribute: List[ComponentFilter] = [
ComponentAndMetaFilter(
AnalyzerOutputFilter(attr_t),
_build_tag_filter(tuple(target_resource_model.get_tags())),
)
for attr_t in request.attributes
]
component_filter: ComponentFilter = ComponentAndMetaFilter(
ANALYZERS_FILTER,
ComponentPrioritySelectingMetaFilter(
one_analyzer_for_all_attributes,
ComponentOrMetaFilter(*analyzer_for_each_attribute),
),
)
components_result = await self._auto_run_components(
(
_ComponentAutoRunRequest(
request.resource_id,
component_filter,
),
),
request.job_id,
job_context,
)
if components_result.components_run:
return components_result
else:
raise NotFoundError(
f"Unable to find any analyzer for attributes "
f"{', '.join(attr_t.__name__ for attr_t in request.attributes)}"
f"\nFilter: {component_filter}"
)
run_components(self, request)
async
Automatically select one or more components to run on a resource. The components must match the provided component filters and target at least one of the tags of the resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
JobMultiComponentRequest |
Data structure containing the ID of the job to run the components in, the ID of the resource to run components on, and filters for the components to run. |
required |
Returns:
Type | Description |
---|---|
ComponentRunResult |
A data structure describing the components run and resources modified/created/deleted. |
Exceptions:
Type | Description |
---|---|
ComponentAutoRunFailure |
if one of the automatically chosen components raises an error while running. |
NoMatchingComponentException |
if no components match the filters for the resource. |
Source code in ofrak/service/job_service.py
async def run_components(
self,
request: JobMultiComponentRequest,
) -> ComponentRunResult:
resource = await self._resource_service.get_by_id(request.resource_id)
component_filter = _build_auto_run_filter(request)
tags_to_target = tuple(resource.tags)
components_result = ComponentRunResult()
while len(tags_to_target) > 0:
job_context = self._job_context_factory.create()
component_tag_filter = _build_tag_filter(tags_to_target)
final_filter = ComponentAndMetaFilter(component_filter, component_tag_filter)
individual_component_results = await self._auto_run_components(
(
_ComponentAutoRunRequest(
request.resource_id,
final_filter,
),
),
request.job_id,
job_context,
)
components_result.update(individual_component_results)
resource_tracker = job_context.trackers[request.resource_id]
tags_added = resource_tracker.tags_added
tags_to_target = tuple(tags_added)
return components_result
run_components_recursively(self, request)
async
Start from a resource and run components on it and then on any resources which have tags added as a result of that initial run, then run components on any resources with new tags from those subsequent runs, until an iteration of component runs results in no new tags being added. The component(s) run on each resource are chosen according to the provided filters and which tags were added to that resource in the previous iteration. That is, the filters are applied to the set of resource which target those new tags.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
JobMultiComponentRequest |
Data structure containing the ID of the job to run the components in, the ID of the resource to start running recursively from, and filters for the components to run. |
required |
Returns:
Type | Description |
---|---|
ComponentRunResult |
A data structure describing the components run and resources modified/created/deleted. |
Exceptions:
Type | Description |
---|---|
ComponentAutoRunFailure |
if one of the automatically chosen components raises an error while running. |
Source code in ofrak/service/job_service.py
async def run_components_recursively(
self, request: JobMultiComponentRequest
) -> ComponentRunResult:
components_result = ComponentRunResult()
component_filter = _build_auto_run_filter(request)
initial_target_resource_models = await self._get_initial_recursive_target_resources(
request.resource_id, component_filter
)
# Create a mock context to match all existing tags
previous_job_context: JobRunContext = self._job_context_factory.create()
for existing_resource_model in initial_target_resource_models:
previous_job_context.trackers[existing_resource_model.id].tags_added.update(
existing_resource_model.tags
)
iterations = 0
tags_added_count = 1 # initialize just so loop starts
while tags_added_count > 0:
job_context = self._job_context_factory.create()
_run_components_requests = []
for resource_id, previous_tracker in previous_job_context.trackers.items():
final_filter = ComponentAndMetaFilter(
component_filter,
_build_tag_filter(tuple(previous_tracker.tags_added)),
)
_run_components_requests.append(
_ComponentAutoRunRequest(
resource_id,
final_filter,
)
)
iteration_components_result = await self._auto_run_components(
_run_components_requests,
request.job_id,
job_context,
)
components_result.update(iteration_components_result)
tags_added_count = 0
for resource_id, tracker in job_context.trackers.items():
if len(tracker.tags_added) > 0:
tags_added_count += len(tracker.tags_added)
previous_job_context = job_context
LOGGER.info(
f"Completed iteration {iterations} of run_components_recursively on "
f"{request.resource_id.hex()}. {len(components_result.resources_modified)} "
f"resources modified and {tags_added_count} tags added."
)
iterations += 1
return components_result
pack_recursively(self, job_id, resource_id)
async
Call Packer components on the deepest descendants of a resource (the root of this search), then Packers on the next level up, etc. until the search root resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_id |
bytes |
Job to run the component in. |
required |
resource_id |
bytes |
ID of the search root resource. |
required |
Returns:
Type | Description |
---|---|
ComponentRunResult |
A data structure describing the components run and resources modified/created/deleted. |
Source code in ofrak/service/job_service.py
async def pack_recursively(
self,
job_id: bytes,
resource_id: bytes,
) -> ComponentRunResult:
packer_filter = PACKERS_FILTER
target_cache = self._build_target_cache(packer_filter)
all_components_result = ComponentRunResult()
if len(target_cache) == 0:
return all_components_result
resources = await self._resource_service.get_descendants_by_id(
resource_id,
r_filter=ResourceFilter(
include_self=True,
tags=tuple(target_cache.keys()),
tags_condition=ResourceFilterCondition.OR,
),
)
resources = list(resources) # we'll need that Iterable more than once
job_context = self._job_context_factory.create()
# We want to start with the deepest packers. Packers at the same levels can run
# concurrently. So we first ask for the relative depth of each returned resource.
resource_depths = await self._resource_service.get_depths(
[resource.id for resource in resources]
)
resources_by_depth = defaultdict(list)
for resource, depth in zip(resources, resource_depths):
resources_by_depth[depth].append(resource)
for depth in sorted(resources_by_depth.keys(), reverse=True):
for resource in resources_by_depth[depth]:
component_filter: ComponentFilter = ComponentAndMetaFilter(
PACKERS_FILTER,
_build_tag_filter(tuple(resource.get_tags())),
)
request = _ComponentAutoRunRequest(
resource.id,
component_filter,
)
component_result = await self._auto_run_components(
[request],
job_id,
job_context,
)
n_packers_run = len(component_result.components_run)
if n_packers_run == 0:
all_components_result.update(component_result)
break
if n_packers_run > 1:
raise ValueError(f"Multiple packers are targeting resource {resource.id.hex()}")
return all_components_result
_ComponentAutoRunRequest
dataclass
private
_ComponentAutoRunRequest(target_resource_id: bytes, component_filter: ofrak.service.component_locator_i.ComponentFilter)
_build_tag_filter(tags)
private
When auto-running components, most of the time only the most specific components should be run for a resource. For example, an APK resource is also a ZIP resource; we want to always run the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more specific tag. However, Identifiers are a special case because they have benign side-effects, so it is desirable to greedily run all Identifiers that could target a resource, not only the most specific Identifiers.
This function constructs a filter which allows only components that target at least one of the given tags, but for non-identifiers the filter is even stricter so that only the most specific components are filtered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tags |
Tuple[ofrak.model.tag_model.ResourceTag] |
Tags to target, from the resource that is being auto-run on |
required |
Returns:
Type | Description |
---|---|
ComponentFilter |
A filter which allows a component to run if (it is an Identifier, AND it targets at least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most specific given tags that are targeted by components) |
Source code in ofrak/service/job_service.py
@lru_cache(None)
def _build_tag_filter(tags: Tuple[ResourceTag]) -> ComponentFilter:
"""
When auto-running components, most of the time only the *most specific* components should be
run for a resource. For example, an APK resource is also a ZIP resource; we want to always run
the APK Unpacker on resources that are tagged as both ZIP and APK, because APK is a more
specific tag. However, Identifiers are a special case because they have benign side-effects, so
it is desirable to greedily run all Identifiers that could target a resource, not only the most
specific Identifiers.
This function constructs a filter which allows only components that target at least one of the
given tags, but for non-identifiers the filter is even stricter so that only the most specific
components are filtered.
:param tags: Tags to target, from the resource that is being auto-run on
:return: A filter which allows a component to run if (it is an Identifier, AND it targets at
least one of the given tags) OR (it is NOT an Identifier, AND it targets one of the most
specific given tags that are targeted by components)
"""
tags_by_specificity = ResourceTag.sort_tags_into_tiers(tags)
filters_prioritized_by_specificity = tuple(
ComponentTargetFilter(*tag_specificity_level)
for tag_specificity_level in tags_by_specificity
)
return ComponentOrMetaFilter(
ComponentAndMetaFilter(
IDENTIFIERS_FILTER,
ComponentTargetFilter(*tags),
),
ComponentAndMetaFilter(
ComponentNotMetaFilter(
IDENTIFIERS_FILTER,
),
ComponentPrioritySelectingMetaFilter(*filters_prioritized_by_specificity),
),
)