Skip to content

assembler_service_keystone.py

ofrak.service.assembler.assembler_service_keystone

KeystoneAssemblerService (AssemblerServiceInterface)

An assembler service implementation using the keystone engine.

_get_keystone_instance(self, program_attributes, mode=<InstructionSetMode.NONE: 0>) private

Get or build a Keystone instance for the provided processor

Parameters:

Name Type Description Default
program_attributes ProgramAttributes required
mode InstructionSetMode <InstructionSetMode.NONE: 0>
Source code in ofrak/service/assembler/assembler_service_keystone.py
def _get_keystone_instance(
    self,
    program_attributes: ProgramAttributes,
    mode: InstructionSetMode = InstructionSetMode.NONE,
) -> Ks:
    """
    Get or build a Keystone instance for the provided processor
    :param program_attributes:
    :param mode:
    """
    ks = self._ks_by_processor.get((program_attributes, mode), None)
    if ks is None:
        ks = Ks(
            self._get_keystone_arch_flag(program_attributes, mode),
            self._get_keystone_mode_flag(program_attributes, mode),
        )
        self._ks_by_processor[(program_attributes, mode)] = ks
    return ks

assemble(self, assembly, vm_addr, program_attributes, mode=<InstructionSetMode.NONE: 0>) async

Assemble the given assembly code using keystone.

Parameters:

Name Type Description Default
assembly str required
vm_addr int required
program_attributes ProgramAttributes required
mode InstructionSetMode <InstructionSetMode.NONE: 0>

Returns:

Type Description
bytes

machine code

Source code in ofrak/service/assembler/assembler_service_keystone.py
async def assemble(
    self,
    assembly: str,
    vm_addr: int,
    program_attributes: ProgramAttributes,
    mode: InstructionSetMode = InstructionSetMode.NONE,
) -> bytes:
    """
    Assemble the given assembly code using keystone.

    :param assembly:
    :param vm_addr:
    :param program_attributes:
    :param mode:

    :return: machine code
    """
    # TODO: This is a very temporary fix to T395.
    # TODO: Figure out where to actaully handle situations like this
    assembly_parts = None
    bad_instruction = None
    for instruction in X86_64_SPECIAL_CASES.keys():
        if instruction in assembly:
            assembly_parts = assembly.split(instruction)
            bad_instruction = instruction
            break
    if assembly_parts is not None and bad_instruction is not None:
        machine_code_parts = []
        assembly_size = 0
        for assembly_part in assembly_parts:
            if assembly_part == "":
                machine_code_parts.append(b"")
            else:
                machine_code_part = await self.assemble(
                    assembly_part, vm_addr + assembly_size, program_attributes
                )
                machine_code_parts.append(machine_code_part)
                assembly_size += len(machine_code_part)
            assembly_size += 9
        machine_code = X86_64_SPECIAL_CASES[bad_instruction].join(machine_code_parts)
        if machine_code == "":
            return bytes(X86_64_SPECIAL_CASES[bad_instruction])
        return machine_code

    # special register prefix preprocessing for PPC
    preprocessed_assembly = assembly
    if program_attributes.isa is InstructionSet.PPC:
        for prefix in ["r", "f", "v"]:
            for n in range(32):
                register_operand = f"{prefix}{n}"
                preprocessed_assembly = preprocessed_assembly.replace(
                    r" %s," % register_operand, " %u," % n
                )
                preprocessed_assembly = preprocessed_assembly.replace(
                    r"(%s)" % register_operand, "(%u)" % n
                )
                preprocessed_assembly = preprocessed_assembly.replace(
                    r" %s" % register_operand, " %u" % n
                )

    try:
        ks = self._get_keystone_instance(program_attributes, mode)
        if program_attributes.isa in (InstructionSet.ARM, InstructionSet.AARCH64):
            # This place is a message... and part of a system of messages ...pay attention to it!
            # Sending this message was important to us. We considered ourselves to be a powerful culture.
            # This place is not a place of honor ... no highly esteemed deed is commemorated here... nothing valued is here.
            # What is here was dangerous and repulsive to us. This message is a warning about danger.
            # The danger is in a particular location... it increases towards a center... the center of danger is here... of a particular size and shape, and below us.
            # The danger is still present, in your time, as it was in ours.
            # The danger is to the body, and it can kill.
            # The form of the danger is an emanation of energy.
            # The danger is unleashed only if you substantially disturb this place. This place is best shunned and left uninhabited.
            # Check for warnings in Keystone standard error.
            # If they appear, reset the Ks objects, as bugs in Keystone error handling
            # sometimes cause segmentation faults at subsequent calls to ks.asm.
            # See T403.
            with StreamCapture(sys.stderr) as stream_capture:
                machine_code, _ = ks.asm(preprocessed_assembly, addr=vm_addr, as_bytes=True)
            if "warning:" in stream_capture.get_captured_stream():
                self._ks_by_processor = {}
        else:
            machine_code, _ = ks.asm(preprocessed_assembly, addr=vm_addr, as_bytes=True)
        return machine_code
    except KsError as error:

        assembly_vm_addr = vm_addr
        failing_instruction = None

        for assembly_line in preprocessed_assembly.splitlines():
            try:
                ks = self._get_keystone_instance(program_attributes, mode)
                machine_code, _ = ks.asm(assembly_line, addr=assembly_vm_addr, as_bytes=True)
                assembly_vm_addr += len(machine_code)

            except KsError:
                failing_instruction = assembly_line
                break

        raise Exception(
            "Keystone ERROR in {}:\n[0x{:x}]\n{}\nerror on instruction '{}' @ 0x{:x}: {}".format(
                program_attributes.isa,
                vm_addr,
                preprocessed_assembly,
                failing_instruction,
                assembly_vm_addr,
                error,
            )
        )

assemble_file(self, assembly_file, vm_addr, program_attributes, mode=<InstructionSetMode.NONE: 0>) async

Assemble the given assembly file.

Parameters:

Name Type Description Default
assembly_file str

The path to the assembly file.

required
vm_addr int

The virtual address at which the assembly file should be assembled.

required
program_attributes ProgramAttributes

The processor targeted by the assembly

required
mode

The mode of the processor for the assembly

<InstructionSetMode.NONE: 0>

Returns:

Type Description
bytes

The assembled machine code

Source code in ofrak/service/assembler/assembler_service_keystone.py
async def assemble_file(
    self,
    assembly_file: str,
    vm_addr: int,
    program_attributes: ProgramAttributes,
    mode=InstructionSetMode.NONE,
) -> bytes:
    """
    Assemble the given assembly file.

    :param assembly_file: The path to the assembly file.
    :param vm_addr: The virtual address at which the assembly file should be assembled.
    :param program_attributes: The processor targeted by the assembly
    :param mode: The mode of the processor for the assembly

    :return: The assembled machine code
    """
    with open(assembly_file) as file_handle:
        assembly = file_handle.read()
    # Keystone Seg faults when trying to assemble '.text'.
    # It is therefore stripped from the assembly here.
    assembly = assembly.replace(".text\n", "")
    return await self.assemble(assembly, vm_addr, program_attributes, mode)

assemble_files(self, assembly_files, vm_addrs, program_attributes, mode=<InstructionSetMode.NONE: 0>)

Assemble the given assembly files.

Parameters:

Name Type Description Default
assembly_files Iterable[str]

The path to the assembly file.

required
vm_addrs Iterable[int]

The virtual address at which the assembly file should be assembled.

required
program_attributes ProgramAttributes

The processor targeted by the assembly

required
mode InstructionSetMode

The mode of the processor for the assembly

<InstructionSetMode.NONE: 0>

Returns:

Type Description
AsyncIterator[bytes]

The assembled machine code

Source code in ofrak/service/assembler/assembler_service_keystone.py
async def assemble_files(
    self,
    assembly_files: Iterable[str],
    vm_addrs: Iterable[int],
    program_attributes: ProgramAttributes,
    mode: InstructionSetMode = InstructionSetMode.NONE,
) -> AsyncIterator[bytes]:
    for assembly_file, vm_addr in zip(assembly_files, vm_addrs):
        yield await self.assemble_file(assembly_file, vm_addr, program_attributes, mode)