dtb.py
ofrak.core.dtb
Device Tree Blob (or Flattened Device Tree) OFRAK Utilities For more information see: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html
DeviceTreeBlob (GenericBinary)
A Device Tree Blob (DTB).
DeviceTreeBlobIdentifier (Identifier)
Identify Device Tree Blob files.
identify(self, resource, config=None)
async
Identify DTB files based on the first four bytes being "d00dfeed".
Source code in ofrak/core/dtb.py
async def identify(self, resource: Resource, config: ComponentConfig = None) -> None:
"""
Identify DTB files based on the first four bytes being "d00dfeed".
"""
data = await resource.get_data(Range(0, 4))
if data == struct.pack("<I", DTB_MAGIC_SIGNATURE):
resource.add_tag(DeviceTreeBlob)
DeviceTreeBlobPacker (Packer)
Device Tree Blob Packer
Repacks the Device Tree Blob tree structure into the binary format and patches the original resource.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
ComponentConfig |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/dtb.py
async def pack(self, resource: Resource, config: ComponentConfig = None):
header = fdt.Header()
header_view = await resource.get_only_descendant_as_view(
v_type=DtbHeader, r_filter=ResourceFilter(tags=[DtbHeader])
)
header.version = header_view.version
header.total_size = header_view.totalsize
header.off_dt_struct = header_view.off_dt_struct
header.last_comp_version = header_view.last_comp_version
header.boot_cpuid_phys = header_view.boot_cpuid_phys
dtb = fdt.FDT(header=header)
dtb.entries = [
{"address": entry.address, "size": entry.size}
for entry in await resource.get_descendants_as_view(
v_type=DtbEntry, r_filter=ResourceFilter(tags=[DtbEntry])
)
]
root_node_view = await resource.get_only_child_as_view(
DtbNode, r_filter=ResourceFilter(tags=[DtbNode])
)
dtb.root = fdt.Node(name=await root_node_view.get_path())
for prop in await root_node_view.resource.get_children_as_view(
v_type=DtbProperty,
r_filter=ResourceFilter(tags=[DtbProperty]),
r_sort=ResourceSort(DtbProperty.DtbPropertyName),
):
dtb.add_item(await _prop_to_fdt(prop), await root_node_view.get_path())
for node in await root_node_view.resource.get_descendants_as_view(
v_type=DtbNode,
r_filter=ResourceFilter(tags=[DtbNode]),
r_sort=ResourceSort(DtbNode.DtbNodeName),
):
# By default, add_item adds the missing nodes to complete the path of a previous node
if not dtb.exist_node(await node.get_path()):
dtb.add_item(fdt.Node(node.name), posixpath.dirname(await node.get_path()))
for prop in await node.resource.get_children_as_view(
v_type=DtbProperty,
r_filter=ResourceFilter(tags=[DtbProperty]),
r_sort=ResourceSort(DtbProperty.DtbPropertyName),
):
dtb.add_item(await _prop_to_fdt(prop), await node.get_path())
original_size = await resource.get_data_length()
resource.queue_patch(Range(0, original_size), dtb.to_dtb())
DeviceTreeBlobUnpacker (Unpacker)
Unpacks a DeviceTreeBlob:
A DeviceTreeBlob consists of: - 1 DtbHeader - 0 or more DtbEntry instances - 1 root DtbNode which can contain 0 or more DtbNode and DtbProperty children - Each DtbNode can have 0 or more DtbProperty children and 0 or more further nested DtbNode children in it
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
ComponentConfig |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/dtb.py
async def unpack(self, resource: Resource, config: ComponentConfig = None):
dtb_data = await resource.get_data()
dtb_view = await resource.view_as(DeviceTreeBlob)
dtb = fdt.parse_dtb(dtb_data)
# Create DtbHeader
await resource.create_child(
tags=(DtbHeader,),
data=dtb.header.export(),
)
# Create DtbEntry instances
for dtb_entry in dtb.entries:
await resource.create_child_from_view(
DtbEntry(
address=dtb_entry["address"],
size=dtb_entry["size"],
),
data=b"",
)
# Create root node
await resource.create_child_from_view(
DtbNode(name=dtb.root.name),
data=b"",
)
# Create DtbNode and DtbProperty instances and structure by walking the DeviceTreeBlob
for path, nodes, props in dtb.walk():
# Get parent
parent_node = await dtb_view.get_node_by_path(path)
for node in nodes:
await parent_node.resource.create_child_from_view(DtbNode(name=node.name), data=b"")
for prop in props:
p_type, p_data = _prop_from_fdt(prop)
await parent_node.resource.create_child_from_view(
DtbProperty(
name=prop.name,
p_type=p_type,
),
data=p_data,
)
DtbEntry (GenericBinary)
dataclass
Device Tree Entry
DtbHeader (GenericBinary)
dataclass
Device Tree Header
DtbHeaderAnalyzer (Analyzer)
Analyze Device Tree Blob header information and return a DtbHeader
analyze(self, resource, config)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
None |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
required |
Returns:
Type | Description |
---|---|
DtbHeader |
The analysis results |
Source code in ofrak/core/dtb.py
async def analyze(self, resource: Resource, config: None) -> DtbHeader:
header_data = await resource.get_data()
(
dtb_magic,
totalsize,
off_dt_struct,
off_dt_strings,
off_mem_rsvmap,
version,
last_comp_version,
) = struct.unpack(">IIIIIII", header_data[:28])
assert dtb_magic == DTB_MAGIC_SIGNATURE, (
f"DTB Magic bytes not matching."
f"Expected: {DTB_MAGIC_SIGNATURE} "
f"Unpacked: {dtb_magic}"
)
boot_cpuid_phys = 0
dtb_strings_size = 0
dtb_struct_size = 0
if version >= 2:
boot_cpuid_phys = struct.unpack(">I", header_data[28:32])[0]
if version >= 3:
dtb_strings_size = struct.unpack(">I", header_data[32:36])[0]
if version >= 17:
dtb_struct_size = struct.unpack(">I", header_data[36:40])[0]
return DtbHeader(
dtb_magic,
totalsize,
off_dt_struct,
off_dt_strings,
off_mem_rsvmap,
version,
last_comp_version,
boot_cpuid_phys,
dtb_strings_size,
dtb_struct_size,
)
DtbNode (GenericBinary)
dataclass
Device Tree Node
get_path(self)
async
Get the path of a DtbNode within a DeviceTreeBlob. Root node is always "/" per DTB specifications.
Source code in ofrak/core/dtb.py
async def get_path(self) -> str:
"""
Get the path of a DtbNode within a DeviceTreeBlob.
Root node is always "/" per DTB specifications.
"""
if self.name == "/":
return self.name
parent_node = await self.resource.get_parent_as_view(v_type=DtbNode)
return posixpath.join(await parent_node.get_path(), self.name)
DtbProperty (GenericBinary)
dataclass
DTB Property
DtbPropertyType (Enum)
An enumeration.
_prop_to_fdt(p)
async
private
Generates an fdt.items.property corresponding to a DtbProperty.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
DtbProperty |
required |
Returns:
Type | Description |
---|---|
Property |
Source code in ofrak/core/dtb.py
async def _prop_to_fdt(p: DtbProperty) -> fdt.items.Property:
"""
Generates an fdt.items.property corresponding to a DtbProperty.
:param p:
:return:
"""
value = await p.get_value()
if p.p_type is DtbPropertyType.DtbPropNoValue:
return fdt.items.Property(name=p.name)
elif p.p_type is DtbPropertyType.DtbBytes:
return fdt.items.PropBytes(name=p.name, data=await p.resource.get_data())
elif p.p_type is DtbPropertyType.DtbInt:
return fdt.items.PropWords(p.name, value)
elif p.p_type is DtbPropertyType.DtbIntList:
return fdt.items.PropWords(p.name, *value)
elif p.p_type is DtbPropertyType.DtbStr:
return fdt.items.PropStrings(p.name, value)
elif p.p_type is DtbPropertyType.DtbStrList:
return fdt.items.PropStrings(p.name, *value)
else:
raise TypeError(f"Unsupported type {p.p_type} for property {p.name}")
_prop_from_fdt(p)
private
Converts an fdt.items.property to its p_type and p_data values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
p |
Property |
required |
Returns:
Type | Description |
---|---|
Tuple[ofrak.core.dtb.DtbPropertyType, bytes] |
Source code in ofrak/core/dtb.py
def _prop_from_fdt(p: fdt.items.Property) -> Tuple[DtbPropertyType, bytes]:
"""
Converts an fdt.items.property to its p_type and p_data values.
:param p:
:return:
"""
if type(p) is fdt.items.Property or len(p.data) == 0:
_p_type = DtbPropertyType.DtbPropNoValue
_p_data = b""
elif type(p) is fdt.items.PropBytes:
_p_type = DtbPropertyType.DtbBytes
_p_data = bytes(p.data)
elif isinstance(p.value, int):
if len(p.data) == 1:
_p_type = DtbPropertyType.DtbInt
else:
_p_type = DtbPropertyType.DtbIntList
_p_data = b"".join([struct.pack(">I", i) for i in p.data])
elif isinstance(p.value, str):
if len(p.data) == 1:
_p_type = DtbPropertyType.DtbStr
_p_data = b"".join([s.encode("ascii") for s in p.data])
else:
_p_type = DtbPropertyType.DtbStrList
_p_data = b"\0".join([s.encode("ascii") for s in p.data])
else:
raise TypeError(f"Unknown type for DTB Property: {p}")
return _p_type, _p_data