modifiers.py
ofrak.core.patch_maker.modifiers
FunctionReplacementModifier (Modifier)
Replace one or several functions in a Program
resource.
It takes a mapping from function names (to replace) to source code file paths (to use as replacements), then
for each function to replace, creates a Segment
from the corresponding complex block and uses the
PatchFromSourceModifier
to overwrite this segment with the code taken from the replacement source code file.
modify(self, resource, config)
async
Modify the given resource.
Users should not call this method directly; rather, they should run Resource.run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
FunctionReplacementModifierConfig |
Optional config for modification. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
required |
Source code in ofrak/core/patch_maker/modifiers.py
async def modify(self, resource: Resource, config: FunctionReplacementModifierConfig) -> None:
program = await resource.view_as(Program)
function_to_replace_cbs = {
func_name: await program.get_function_complex_block(func_name)
for func_name in config.new_function_sources.keys()
}
await self._verify_modes_are_the_same(list(function_to_replace_cbs.values()))
source_patches: Dict[str, Tuple[Segment, ...]] = {
config.new_function_sources[func_name]: (self._make_text_segment(complex_block),)
for func_name, complex_block in function_to_replace_cbs.items()
}
patch_from_source_config = PatchFromSourceModifierConfig(
config.source_code,
source_patches,
config.toolchain_config,
config.toolchain,
config.header_directories,
config.patch_name,
)
await resource.run(PatchFromSourceModifier, patch_from_source_config)
_make_text_segment(complex_block)
private
staticmethod
Return a new code Segment
corresponding to complex_block
.
Source code in ofrak/core/patch_maker/modifiers.py
@staticmethod
def _make_text_segment(complex_block: ComplexBlock) -> Segment:
"""Return a new code `Segment` corresponding to `complex_block`."""
return Segment(
segment_name=".text",
vm_address=complex_block.virtual_address,
offset=0,
is_entry=False,
length=complex_block.size,
access_perms=MemoryPermissions.RX,
)
_verify_modes_are_the_same(complex_blocks)
async
private
staticmethod
Verify that the InstructionSetMode
of all the complex_blocks
is the same.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
if several |
Source code in ofrak/core/patch_maker/modifiers.py
@staticmethod
async def _verify_modes_are_the_same(complex_blocks: List[ComplexBlock]) -> None:
"""
Verify that the `InstructionSetMode` of all the `complex_blocks` is the same.
:raises NotImplementedError: if several `InstructionSetMode` values are found
"""
modes = {await complex_block.get_mode() for complex_block in complex_blocks}
if len(modes) > 1:
raise NotImplementedError(
f"Several values of InstructionSetMode found in complex blocks {complex_blocks}: {modes}\n"
"This is not currently supported by this component: all complex blocks must have the same mode "
"in order to be processed in the same patch."
)
FunctionReplacementModifierConfig (ComponentConfig)
dataclass
Attributes:
Name | Type | Description |
---|---|---|
source_code |
SourceBundle |
Path to directory containing source code (ideally ONLY source code) |
new_function_sources |
Dict[str, str] |
a mapping from function names (to replace) to source code file paths (to use as replacements). The paths are relative paths within the source code FilesystemRoot. |
toolchain_config |
ToolchainConfig |
configuration for the Toolchain to use |
toolchain |
Type[ofrak_patch_maker.toolchain.abstract.Toolchain] |
the type of which type of Toolchain to use |
patch_name |
Optional[str] |
Optional name of patch |
header_directories |
Tuple[ofrak.core.patch_maker.modifiers.SourceBundle, ...] |
(Optional) paths to directories to search for header files in |
PatchFromSourceModifier (Modifier)
Modifier exposing some basic source code patching capabilities.
modify(self, resource, config)
async
Modify the given resource.
Users should not call this method directly; rather, they should run Resource.run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
PatchFromSourceModifierConfig |
Optional config for modification. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
required |
Source code in ofrak/core/patch_maker/modifiers.py
async def modify(self, resource: Resource, config: PatchFromSourceModifierConfig) -> None:
if config.patch_name is None:
patch_name = f"{resource.get_id().hex()}_patch"
else:
patch_name = config.patch_name
build_tmp_dir = tempfile.mkdtemp()
source_tmp_dir = os.path.join(build_tmp_dir, "src")
os.makedirs(source_tmp_dir)
config.source_code.dump(source_tmp_dir)
header_dirs = []
for header_directory in config.header_directories:
header_tmp_dir = os.path.join(build_tmp_dir, "include")
os.makedirs(build_tmp_dir)
header_directory.dump(header_tmp_dir)
header_dirs.append(header_tmp_dir)
absolute_source_list = [
os.path.join(source_tmp_dir, src_file) for src_file in config.source_patches.keys()
]
program_attributes = await resource.analyze(ProgramAttributes)
patch_maker = PatchMaker(
toolchain=config.toolchain(program_attributes, config.toolchain_config),
build_dir=build_tmp_dir,
)
patch_bom = patch_maker.make_bom(
name=patch_name,
source_list=absolute_source_list,
object_list=[],
header_dirs=header_dirs,
)
# Map each object file in the BOM to the segments associated with its source file
patch_bom_segment_mapping = {
patch_bom.object_map[os.path.join(source_tmp_dir, src_file)].path: src_segments
for src_file, src_segments in config.source_patches.items()
}
target_program = await resource.view_as(Program)
target_linkable_bom_info = await target_program.make_linkable_bom(
patch_maker,
build_tmp_dir,
patch_bom.unresolved_symbols,
)
# To support additional dynamic references in user space executables
# Create and use a modifier that will:
# 1. Extend .got, add new entry
# 2. Extend .got.plt, add new stub code
# 3. If the DSO is not already listed in the load list for executable it must be extended and added.
# 4. Provide the additional .got and .got.plt symbols to make_fem now that we have the locations
# NOTE: These external functions will probably be *UND*
p = PatchRegionConfig(patch_bom.name + "_patch", patch_bom_segment_mapping)
exec_path = os.path.join(build_tmp_dir, "output_exec")
fem = patch_maker.make_fem(
[(patch_bom, p), target_linkable_bom_info],
exec_path,
)
await resource.run(
SegmentInjectorModifier,
SegmentInjectorModifierConfig.from_fem(fem),
)
# Refresh LinkableBinary with the LinkableSymbols used in this patch
target_binary = await resource.view_as(LinkableBinary)
await target_binary.define_linkable_symbols_from_patch(
fem.executable.symbols, program_attributes
)
PatchFromSourceModifierConfig (ComponentConfig)
dataclass
Attributes:
Name | Type | Description |
---|---|---|
source_code |
SourceBundle |
Path to directory containing source code (ideally ONLY source code) |
source_patches |
Dict[str, Tuple[ofrak_patch_maker.toolchain.model.Segment, ...]] |
path of each source file to build and inject, with one or more segments defining where to inject one or more of the .text, .data, and .rodata from the build file |
toolchain_config |
ToolchainConfig |
configuration for the Toolchain to use |
toolchain |
Type[ofrak_patch_maker.toolchain.abstract.Toolchain] |
the type of which Toolchain to use to build patch |
header_directories |
Tuple[ofrak.core.patch_maker.modifiers.SourceBundle, ...] |
(Optional) paths to directories to search for header files in |
patch_name |
Optional[str] |
Optional name of patch |
SegmentInjectorModifier (Modifier)
Inject some segments into a Program resource. Only segments with non-zero length are injected, excluding .bss.
modify(self, resource, config)
async
Modify the given resource.
Users should not call this method directly; rather, they should run Resource.run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
SegmentInjectorModifierConfig |
Optional config for modification. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
required |
Source code in ofrak/core/patch_maker/modifiers.py
async def modify(self, resource: Resource, config: SegmentInjectorModifierConfig) -> None:
sorted_regions = list(
await resource.get_descendants_as_view(
MemoryRegion,
r_filter=ResourceFilter(
include_self=True,
tags=(MemoryRegion,),
),
r_sort=ResourceSort(
attribute=MemoryRegion.Size,
direction=ResourceSortDirection.DESCENDANT,
),
)
)
injection_tasks: List[Tuple[Resource, BinaryInjectorModifierConfig]] = []
for segment, segment_data in config.segments_and_data:
if segment.length == 0 or segment.vm_address == 0:
continue
if segment.length > 0:
LOGGER.debug(
f" Segment {segment.segment_name} - {segment.length} "
f"bytes @ {hex(segment.vm_address)}",
)
if segment.segment_name.startswith(".bss"):
continue
if segment.segment_name.startswith(".rela"):
continue
if segment.segment_name.startswith(".got"):
# Create new .got and .plt in the exec format here here once we begin supporting
# the addition of new dynamic references in our patches.
# For instance, a new call to kmalloc that never existed before.
# See PatchFromSourceModifier
continue
patches = [(segment.vm_address, segment_data)]
region = MemoryRegion.get_mem_region_with_vaddr_from_sorted(
segment.vm_address, sorted_regions
)
if region is None:
raise ValueError(
f"Cannot inject patch because the memory region at vaddr "
f"{hex(segment.vm_address)} is None"
)
injection_tasks.append((region.resource, BinaryInjectorModifierConfig(patches)))
for injected_resource, injection_config in injection_tasks:
result = await injected_resource.run(BinaryInjectorModifier, injection_config)
# The above can patch data of any of injected_resources' descendants or ancestors
# We don't want to delete injected_resources or its ancestors, so subtract them from the
# set of patched resources
patched_descendants = result.resources_modified.difference(
{
r.get_id()
for r in await injected_resource.get_ancestors(
ResourceFilter(include_self=True)
)
}
)
to_delete = [
r for r in await resource.get_descendants() if r.get_id() in patched_descendants
]
await asyncio.gather(*(r.delete() for r in to_delete))
SegmentInjectorModifierConfig (ComponentConfig)
dataclass
SegmentInjectorModifierConfig(segments_and_data: Tuple[Tuple[ofrak_patch_maker.toolchain.model.Segment, bytes], ...])
from_fem(fem)
staticmethod
Automatically build a config from a FEM by extracting each segment's bytes and metadata.
Source code in ofrak/core/patch_maker/modifiers.py
@staticmethod
def from_fem(fem: FEM) -> "SegmentInjectorModifierConfig":
"""
Automatically build a config from a FEM by extracting each segment's bytes and metadata.
"""
extracted_segments: List[Tuple[Segment, bytes]] = []
with open(fem.executable.path, "rb") as f:
exe_data = f.read()
for segment in fem.executable.segments:
if segment.length == 0:
continue
segment_data = exe_data[segment.offset : segment.offset + segment.length]
extracted_segments.append((segment, segment_data))
return SegmentInjectorModifierConfig(tuple(extracted_segments))
SourceBundle (dict, Generic)
Class used to store filesystem trees of source code as serializable in-memory trees, for transfer between components.
slurp(path)
classmethod
Slurp up a path into a SourceBundle, recursively getting all files and directories and storing them as a tree in memory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
required |
Returns:
Type | Description |
---|---|
SourceBundle |
Source code in ofrak/core/patch_maker/modifiers.py
@classmethod
def slurp(cls, path: str) -> "SourceBundle":
"""
Slurp up a path into a SourceBundle, recursively getting all files and directories and
storing them as a tree in memory.
:param path:
:return:
"""
root, dirs, files = next(os.walk(path, topdown=True))
pairs: List[Tuple[str, Union[bytes, SourceBundle]]] = []
for file_name in files:
file_path = os.path.join(root, file_name)
with open(file_path, "rb") as f:
file_contents = f.read()
pairs.append((file_name, file_contents))
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
pairs.append((dir_name, SourceBundle.slurp(dir_path)))
return cls(pairs)
dump(self, target_path)
Dump a SourceBundle tree back into the local filesystem, at the given target path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_path |
str |
required |
Returns:
Type | Description |
---|---|
Source code in ofrak/core/patch_maker/modifiers.py
def dump(self, target_path: str):
"""
Dump a SourceBundle tree back into the local filesystem, at the given target path.
:param target_path:
:return:
"""
os.makedirs(target_path, exist_ok=True)
for item_name, item_contents in self.items():
item_path = os.path.join(target_path, item_name)
if type(item_contents) is bytes:
# item is a file
with open(item_path, "wb") as f:
f.write(item_contents)
else:
# item is a directory
cast(SourceBundle, item_contents).dump(item_path)