assembler_service_keystone.py
ofrak.service.assembler.assembler_service_keystone
KeystoneAssemblerService (AssemblerServiceInterface)
An assembler service implementation using the keystone engine.
_get_keystone_instance(self, program_attributes, mode=<InstructionSetMode.NONE: 0>)
private
Get or build a Keystone instance for the provided processor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
program_attributes |
ProgramAttributes |
required | |
mode |
InstructionSetMode |
<InstructionSetMode.NONE: 0> |
Source code in ofrak/service/assembler/assembler_service_keystone.py
def _get_keystone_instance(
self,
program_attributes: ProgramAttributes,
mode: InstructionSetMode = InstructionSetMode.NONE,
) -> Ks:
"""
Get or build a Keystone instance for the provided processor
:param program_attributes:
:param mode:
"""
ks = self._ks_by_processor.get((program_attributes, mode), None)
if ks is None:
ks = Ks(
self._get_keystone_arch_flag(program_attributes, mode),
self._get_keystone_mode_flag(program_attributes, mode),
)
self._ks_by_processor[(program_attributes, mode)] = ks
return ks
assemble(self, assembly, vm_addr, program_attributes, mode=<InstructionSetMode.NONE: 0>)
async
Assemble the given assembly code using keystone.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
assembly |
str |
required | |
vm_addr |
int |
required | |
program_attributes |
ProgramAttributes |
required | |
mode |
InstructionSetMode |
<InstructionSetMode.NONE: 0> |
Returns:
Type | Description |
---|---|
bytes |
machine code |
Source code in ofrak/service/assembler/assembler_service_keystone.py
async def assemble(
self,
assembly: str,
vm_addr: int,
program_attributes: ProgramAttributes,
mode: InstructionSetMode = InstructionSetMode.NONE,
) -> bytes:
"""
Assemble the given assembly code using keystone.
:param assembly:
:param vm_addr:
:param program_attributes:
:param mode:
:return: machine code
"""
# TODO: This is a very temporary fix to T395.
# TODO: Figure out where to actaully handle situations like this
assembly_parts = None
bad_instruction = None
for instruction in X86_64_SPECIAL_CASES.keys():
if instruction in assembly:
assembly_parts = assembly.split(instruction)
bad_instruction = instruction
break
if assembly_parts is not None and bad_instruction is not None:
machine_code_parts = []
assembly_size = 0
for assembly_part in assembly_parts:
if assembly_part == "":
machine_code_parts.append(b"")
else:
machine_code_part = await self.assemble(
assembly_part, vm_addr + assembly_size, program_attributes
)
machine_code_parts.append(machine_code_part)
assembly_size += len(machine_code_part)
assembly_size += 9
machine_code = X86_64_SPECIAL_CASES[bad_instruction].join(machine_code_parts)
if machine_code == "":
return bytes(X86_64_SPECIAL_CASES[bad_instruction])
return machine_code
# special register prefix preprocessing for PPC
preprocessed_assembly = assembly
if program_attributes.isa is InstructionSet.PPC:
for prefix in ["r", "f", "v"]:
for n in range(32):
register_operand = f"{prefix}{n}"
preprocessed_assembly = preprocessed_assembly.replace(
r" %s," % register_operand, " %u," % n
)
preprocessed_assembly = preprocessed_assembly.replace(
r"(%s)" % register_operand, "(%u)" % n
)
preprocessed_assembly = preprocessed_assembly.replace(
r" %s" % register_operand, " %u" % n
)
try:
ks = self._get_keystone_instance(program_attributes, mode)
if program_attributes.isa in (InstructionSet.ARM, InstructionSet.AARCH64):
# This place is a message... and part of a system of messages ...pay attention to it!
# Sending this message was important to us. We considered ourselves to be a powerful culture.
# This place is not a place of honor ... no highly esteemed deed is commemorated here... nothing valued is here.
# What is here was dangerous and repulsive to us. This message is a warning about danger.
# The danger is in a particular location... it increases towards a center... the center of danger is here... of a particular size and shape, and below us.
# The danger is still present, in your time, as it was in ours.
# The danger is to the body, and it can kill.
# The form of the danger is an emanation of energy.
# The danger is unleashed only if you substantially disturb this place. This place is best shunned and left uninhabited.
# Check for warnings in Keystone standard error.
# If they appear, reset the Ks objects, as bugs in Keystone error handling
# sometimes cause segmentation faults at subsequent calls to ks.asm.
# See T403.
with StreamCapture(sys.stderr) as stream_capture:
machine_code, _ = ks.asm(preprocessed_assembly, addr=vm_addr, as_bytes=True)
if "warning:" in stream_capture.get_captured_stream():
self._ks_by_processor = {}
else:
machine_code, _ = ks.asm(preprocessed_assembly, addr=vm_addr, as_bytes=True)
return machine_code
except KsError as error:
assembly_vm_addr = vm_addr
failing_instruction = None
for assembly_line in preprocessed_assembly.splitlines():
try:
ks = self._get_keystone_instance(program_attributes, mode)
machine_code, _ = ks.asm(assembly_line, addr=assembly_vm_addr, as_bytes=True)
assembly_vm_addr += len(machine_code)
except KsError:
failing_instruction = assembly_line
break
raise Exception(
"Keystone ERROR in {}:\n[0x{:x}]\n{}\nerror on instruction '{}' @ 0x{:x}: {}".format(
program_attributes.isa,
vm_addr,
preprocessed_assembly,
failing_instruction,
assembly_vm_addr,
error,
)
)