Skip to content

unpackers.py

ofrak_ghidra.components.blocks.unpackers

GhidraCodeRegionUnpacker (CodeRegionUnpacker, OfrakGhidraMixin)

unpack(self, resource, config=None) async

Unpack a code region, extracting all of the complex blocks within it.

The bounds of these complex blocks should include any trailing data literals which are considered part of the complex block only if there are only data references to them from within that complex block.

Parameters:

Name Type Description Default
resource Resource

the code region resource

required
config None

Exceptions:

Type Description
ComplexBlockStructureError

if the unpacker tries to define a complex block which did not match our expectations of complex block structure

Source code in ofrak_ghidra/components/blocks/unpackers.py
async def unpack(self, resource: Resource, config=None) -> None:
    # Run the GetCodeRegions script for every CodeRegion to match with the backend.
    # This is not efficient but shouldn't matter much since there shouldn't be too many CodeRegions.
    code_region = await resource.view_as(CodeRegion)
    await resource.run(GhidraCodeRegionModifier)

    code_region_start = code_region.virtual_address
    code_region_end = code_region_start + code_region.size

    program = await resource.get_only_ancestor_as_view(
        GhidraProject, ResourceFilter(tags=[GhidraProject], include_self=True)
    )
    program_attributes = await program.resource.analyze(ProgramAttributes)

    complex_blocks = await self.get_complex_blocks_script.call_script(
        resource,
        hex(code_region_start),
        hex(code_region_end),
    )

    complex_blocks_created = []

    for complex_block in complex_blocks:
        complex_block = ComplexBlock(
            complex_block["loadAddress"], complex_block["size"], complex_block["name"]
        )

        complex_blocks_created.append(
            code_region.create_child_region(
                complex_block,
                additional_attributes=(program_attributes,),
            )
        )

    await asyncio.gather(*complex_blocks_created)

GhidraComplexBlockUnpacker (ComplexBlockUnpacker, OfrakGhidraMixin)

unpack(self, resource, config=None) async

Unpack a complex block, identifying all of the basic blocks and data words which are a part of it.

The identified basic blocks and data words must be within the previously identified range of the complex block. If the analysis engine identifies basic blocks outside of this range, those are be ignored - i.e. not unpacked - and the rest of the basic blocks in the function are unpacked as usual.

Source code in ofrak_ghidra/components/blocks/unpackers.py
async def unpack(self, resource: Resource, config=None):
    cb_view = await resource.view_as(ComplexBlock)

    program_attrs = await resource.analyze(ProgramAttributes)

    cb_data_range = await resource.get_data_range_within_root()
    cb_start_vaddr = cb_view.virtual_address

    children_created = []

    basic_blocks = await self.get_bb_batch_manager.get_result(
        (resource, cb_data_range.start, cb_start_vaddr)
    )

    for bb_info in basic_blocks:
        bb_start_vaddr = bb_info["bb_start_vaddr"]
        bb_size = bb_info["bb_size"]
        is_exit_point = bb_info["is_exit_point"]
        mode_string = bb_info["instr_mode"]
        exit_vaddr = bb_info["exit_vaddr"]
        # The Ghidra script initializes exit_vaddr to -1. If is_exit_point, we want exit_vaddr
        # to be None; this is consistent with the docstring of BasicBlock
        if is_exit_point:
            exit_vaddr = None

        if bb_size == 0:
            raise Exception(f"Basic block 0x{bb_start_vaddr:x} has no size")

        if (
            bb_start_vaddr < cb_view.virtual_address
            or (bb_start_vaddr + bb_size) > cb_view.end_vaddr()
        ):
            logging.warning(
                f"Basic Block 0x{bb_start_vaddr:x} does not fall within "
                f"complex block {hex(cb_view.virtual_address)}-{hex(cb_view.end_vaddr())}"
            )
            continue

        mode = InstructionSetMode[mode_string]

        bb_view = BasicBlock(
            bb_start_vaddr,
            bb_size,
            mode,
            is_exit_point,
            exit_vaddr,
        )

        children_created.append(
            cb_view.create_child_region(bb_view, additional_attributes=(program_attrs,))
        )

    data_words = await self.get_dw_batch_manager.get_result(
        (resource, cb_view.virtual_address, cb_view.end_vaddr())
    )

    for data_word_info in data_words:
        word_vaddr = data_word_info["word_vaddr"]
        word_size = data_word_info["word_size"]
        xrefs = [xref for xref in data_word_info["xrefs"]]

        if (
            word_vaddr < cb_view.virtual_address
            or (word_vaddr + word_size) > cb_view.end_vaddr()
        ):
            logging.warning(
                f"Data Word 0x{word_vaddr:x} does not fall within "
                f"complex block {hex(cb_view.virtual_address)}-{hex(cb_view.end_vaddr())}"
            )
            continue

        num_words = 1
        if word_size == 1:
            size_flag = "B"
        elif word_size == 2:
            size_flag = "H"
        elif word_size == 4:
            size_flag = "L"
        elif word_size == 8:
            size_flag = "Q"
        else:
            size_flag = "B"
            num_words = word_size
            word_size = 1

        format_string = program_attrs.endianness.get_struct_flag() + size_flag

        for word in range(num_words):
            dw_view = DataWord(
                word_vaddr + word,
                word_size,
                format_string,
                tuple(xrefs),
            )

            children_created.append(
                cb_view.create_child_region(
                    dw_view,
                    additional_attributes=(program_attrs,),
                )
            )

    await asyncio.gather(*children_created)