ubifs.py
ofrak.core.ubifs
SuperblockNode
dataclass
Each UBIFS image has a superblock which describe a large number of parameters regarding the filesystem. The minimal set of parameters necessary to re-pack an UBIFS filesystem are stored here. (see also: https://elixir.bootlin.com/linux/v6.1.7/source/fs/ubifs/ubifs.h#L1017).
Attributes:
Name | Type | Description |
---|---|---|
max_leb_count |
int |
Maximum number / limit of Logical Erase Blocks |
default_compr |
str |
Default compression algorithm |
fanout |
int |
Fanout of the index tree (number of links per indexing node) |
key_hash |
str |
Type of hash function used for keying direntries (typically 'r5' of reiserfs) |
orph_lebs |
int |
Number of LEBs used for orphan area (orphans are inodes with no links; see https://elixir.bootlin.com/linux/v6.1.7/source/fs/ubifs/orphan.c#L13) |
log_lebs |
int |
LEBs reserved for the journal (see the 'Journal' section in https://www.kernel.org/doc/Documentation/filesystems/ubifs-authentication.rst) |
Ubifs (GenericBinary, FilesystemRoot)
dataclass
UBIFS is a filesystem specially made to run on top of an UBI volume layer. UBIFS specifically provides indexing, compression, encryption / authentication and some other filesystem-related features.
As part of an UBI image, re-packing an UBIFS image requires the 'min_io_size' and 'leb_size' properties that are stored as part of the UBI header (https://elixir.bootlin.com/linux/v6.1.7/source/drivers/mtd/ubi/ubi.h#L441).
Each UBIFS image has a superblock which is encoded in OFRAK as a SuperblockNode.
Some documentation about UBIFS layout can also be found here: https://www.kernel.org/doc/Documentation/filesystems/ubifs-authentication.rst http://www.linux-mtd.infradead.org/doc/ubifs.html
Attributes:
Name | Type | Description |
---|---|---|
min_io_size |
int |
Minimum number of bytes per I/O transaction (see http://www.linux-mtd.infradead.org/doc/ubi.html#L_min_io_unit) |
leb_size |
int |
Size of Logical Erase Blocks |
superblock |
SuperblockNode |
A SuberblockNode |
UbifsAnalyzer (Analyzer)
Extract UBIFS parameters required for packing a resource.
analyze(self, resource, config=None)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Returns:
Type | Description |
---|---|
Ubifs |
The analysis results |
Source code in ofrak/core/ubifs.py
async def analyze(self, resource: Resource, config=None) -> Ubifs:
async with resource.temp_to_disk() as temp_path:
ubifs_obj = ubireader_ubifs(
ubi_io.ubi_file(
temp_path,
block_size=guess_leb_size(temp_path),
start_offset=0,
end_offset=None,
)
)
return Ubifs(
ubifs_obj._get_min_io_size(),
ubifs_obj._get_leb_size(),
SuperblockNode(
ubifs_obj.superblock_node.max_leb_cnt,
PRINT_UBIFS_COMPR[ubifs_obj.superblock_node.default_compr],
ubifs_obj.superblock_node.fanout,
PRINT_UBIFS_KEY_HASH[ubifs_obj.superblock_node.key_hash],
ubifs_obj.superblock_node.orph_lebs,
ubifs_obj.superblock_node.log_lebs,
),
)
UbifsPacker (Packer)
Generate an UBIFS image from a filesystem representation in OFRAK.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/ubifs.py
async def pack(self, resource: Resource, config=None) -> None:
ubifs_view = await resource.view_as(Ubifs)
flush_dir = await ubifs_view.flush_to_disk()
with tempfile.NamedTemporaryFile(mode="rb", delete_on_close=False) as temp:
temp.close()
cmd = [
"mkfs.ubifs",
"-m",
f"{ubifs_view.min_io_size}",
"-e",
f"{ubifs_view.leb_size}",
"-c",
f"{ubifs_view.superblock.max_leb_count}",
"-x",
f"{ubifs_view.superblock.default_compr}",
"-f",
f"{ubifs_view.superblock.fanout}",
"-k",
f"{ubifs_view.superblock.key_hash}",
"-p",
f"{ubifs_view.superblock.orph_lebs}",
"-l",
f"{ubifs_view.superblock.log_lebs}",
"-F",
"-r",
flush_dir,
temp.name,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
with open(temp.name, "rb") as new_fh:
new_data = new_fh.read()
resource.queue_patch(Range(0, await resource.get_data_length()), new_data)
UbifsUnpacker (Unpacker)
Unpack the UBIFS image into a filesystem representation.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/ubifs.py
async def unpack(self, resource: Resource, config=None):
with tempfile.TemporaryDirectory() as temp_flush_dir:
# flush to disk
with open(f"{temp_flush_dir}/input.img", "wb") as temp_file:
resource_data = await resource.get_data()
temp_file.write(resource_data)
temp_file.flush()
cmd = [
"ubireader_extract_files",
"-k",
"-o",
f"{temp_flush_dir}/output",
temp_file.name,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
ubifs_view = await resource.view_as(Ubifs)
await ubifs_view.initialize_from_disk(f"{temp_flush_dir}/output")