unpackers.py
ofrak_ghidra.components.blocks.unpackers
GhidraCodeRegionUnpacker (CodeRegionUnpacker, OfrakGhidraMixin)
unpack(self, resource, config=None)
async
Unpack a code region, extracting all of the complex blocks within it.
The bounds of these complex blocks should include any trailing data literals which are considered part of the complex block only if there are only data references to them from within that complex block.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
the code region resource |
required |
config |
None |
Exceptions:
Type | Description |
---|---|
ComplexBlockStructureError |
if the unpacker tries to define a complex block which did not match our expectations of complex block structure |
Source code in ofrak_ghidra/components/blocks/unpackers.py
async def unpack(self, resource: Resource, config=None) -> None:
# Run the GetCodeRegions script for every CodeRegion to match with the backend.
# This is not efficient but shouldn't matter much since there shouldn't be too many CodeRegions.
code_region = await resource.view_as(CodeRegion)
await resource.run(GhidraCodeRegionModifier)
code_region_start = code_region.virtual_address
code_region_end = code_region_start + code_region.size
program = await resource.get_only_ancestor_as_view(
GhidraProject, ResourceFilter(tags=[GhidraProject], include_self=True)
)
program_attributes = await program.resource.analyze(ProgramAttributes)
complex_blocks = await self.get_complex_blocks_script.call_script(
resource,
hex(code_region_start),
hex(code_region_end),
)
complex_blocks_created = []
for complex_block in complex_blocks:
complex_block = ComplexBlock(
complex_block["loadAddress"], complex_block["size"], complex_block["name"]
)
complex_blocks_created.append(
code_region.create_child_region(
complex_block,
additional_attributes=(program_attributes,),
)
)
await asyncio.gather(*complex_blocks_created)
GhidraComplexBlockUnpacker (ComplexBlockUnpacker, OfrakGhidraMixin)
unpack(self, resource, config=None)
async
Unpack a complex block, identifying all of the basic blocks and data words which are a part of it.
The identified basic blocks and data words must be within the previously identified range of the complex block. If the analysis engine identifies basic blocks outside of this range, those are be ignored - i.e. not unpacked - and the rest of the basic blocks in the function are unpacked as usual.
Source code in ofrak_ghidra/components/blocks/unpackers.py
async def unpack(self, resource: Resource, config=None):
cb_view = await resource.view_as(ComplexBlock)
program_attrs = await resource.analyze(ProgramAttributes)
cb_data_range = await resource.get_data_range_within_root()
cb_start_vaddr = cb_view.virtual_address
children_created = []
basic_blocks = await self.get_bb_batch_manager.get_result(
(resource, cb_data_range.start, cb_start_vaddr)
)
for bb_info in basic_blocks:
bb_start_vaddr = bb_info["bb_start_vaddr"]
bb_size = bb_info["bb_size"]
is_exit_point = bb_info["is_exit_point"]
mode_string = bb_info["instr_mode"]
exit_vaddr = bb_info["exit_vaddr"]
# The Ghidra script initializes exit_vaddr to -1. If is_exit_point, we want exit_vaddr
# to be None; this is consistent with the docstring of BasicBlock
if is_exit_point:
exit_vaddr = None
if bb_size == 0:
raise Exception(f"Basic block 0x{bb_start_vaddr:x} has no size")
if (
bb_start_vaddr < cb_view.virtual_address
or (bb_start_vaddr + bb_size) > cb_view.end_vaddr()
):
logging.warning(
f"Basic Block 0x{bb_start_vaddr:x} does not fall within "
f"complex block {hex(cb_view.virtual_address)}-{hex(cb_view.end_vaddr())}"
)
continue
mode = InstructionSetMode[mode_string]
bb_view = BasicBlock(
bb_start_vaddr,
bb_size,
mode,
is_exit_point,
exit_vaddr,
)
children_created.append(
cb_view.create_child_region(bb_view, additional_attributes=(program_attrs,))
)
data_words = await self.get_dw_batch_manager.get_result(
(resource, cb_view.virtual_address, cb_view.end_vaddr())
)
for data_word_info in data_words:
word_vaddr = data_word_info["word_vaddr"]
word_size = data_word_info["word_size"]
xrefs = [xref for xref in data_word_info["xrefs"]]
if (
word_vaddr < cb_view.virtual_address
or (word_vaddr + word_size) > cb_view.end_vaddr()
):
logging.warning(
f"Data Word 0x{word_vaddr:x} does not fall within "
f"complex block {hex(cb_view.virtual_address)}-{hex(cb_view.end_vaddr())}"
)
continue
num_words = 1
if word_size == 1:
size_flag = "B"
elif word_size == 2:
size_flag = "H"
elif word_size == 4:
size_flag = "L"
elif word_size == 8:
size_flag = "Q"
else:
size_flag = "B"
num_words = word_size
word_size = 1
format_string = program_attrs.endianness.get_struct_flag() + size_flag
for word in range(num_words):
dw_view = DataWord(
word_vaddr + word,
word_size,
format_string,
tuple(xrefs),
)
children_created.append(
cb_view.create_child_region(
dw_view,
additional_attributes=(program_attrs,),
)
)
await asyncio.gather(*children_created)