Skip to content

ubifs.py

ofrak.core.ubifs

SuperblockNode dataclass

Each UBIFS image has a superblock which describe a large number of parameters regarding the filesystem. The minimal set of parameters necessary to re-pack an UBIFS filesystem are stored here. (see also: https://elixir.bootlin.com/linux/v6.1.7/source/fs/ubifs/ubifs.h#L1017).

Attributes:

Name Type Description
max_leb_count int

Maximum number / limit of Logical Erase Blocks

default_compr str

Default compression algorithm

fanout int

Fanout of the index tree (number of links per indexing node)

key_hash str

Type of hash function used for keying direntries (typically 'r5' of reiserfs)

orph_lebs int

Number of LEBs used for orphan area (orphans are inodes with no links; see https://elixir.bootlin.com/linux/v6.1.7/source/fs/ubifs/orphan.c#L13)

log_lebs int

LEBs reserved for the journal (see the 'Journal' section in https://www.kernel.org/doc/Documentation/filesystems/ubifs-authentication.rst)

Ubifs (GenericBinary, FilesystemRoot) dataclass

UBIFS is a filesystem specially made to run on top of an UBI volume layer. UBIFS specifically provides indexing, compression, encryption / authentication and some other filesystem-related features.

As part of an UBI image, re-packing an UBIFS image requires the 'min_io_size' and 'leb_size' properties that are stored as part of the UBI header (https://elixir.bootlin.com/linux/v6.1.7/source/drivers/mtd/ubi/ubi.h#L441).

Each UBIFS image has a superblock which is encoded in OFRAK as a SuperblockNode.

Some documentation about UBIFS layout can also be found here: https://www.kernel.org/doc/Documentation/filesystems/ubifs-authentication.rst http://www.linux-mtd.infradead.org/doc/ubifs.html

Attributes:

Name Type Description
min_io_size int

Minimum number of bytes per I/O transaction (see http://www.linux-mtd.infradead.org/doc/ubi.html#L_min_io_unit)

leb_size int

Size of Logical Erase Blocks

superblock SuperblockNode

A SuberblockNode

UbifsAnalyzer (Analyzer)

Extract UBIFS parameters required for packing a resource.

analyze(self, resource, config=None) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description
Ubifs

The analysis results

Source code in ofrak/core/ubifs.py
async def analyze(self, resource: Resource, config=None) -> Ubifs:
    async with resource.temp_to_disk() as temp_path:
        ubifs_obj = ubireader_ubifs(
            ubi_io.ubi_file(
                temp_path,
                block_size=guess_leb_size(temp_path),
                start_offset=0,
                end_offset=None,
            )
        )
        return Ubifs(
            ubifs_obj._get_min_io_size(),
            ubifs_obj._get_leb_size(),
            SuperblockNode(
                ubifs_obj.superblock_node.max_leb_cnt,
                PRINT_UBIFS_COMPR[ubifs_obj.superblock_node.default_compr],
                ubifs_obj.superblock_node.fanout,
                PRINT_UBIFS_KEY_HASH[ubifs_obj.superblock_node.key_hash],
                ubifs_obj.superblock_node.orph_lebs,
                ubifs_obj.superblock_node.log_lebs,
            ),
        )

UbifsIdentifier (Identifier)

Check the first four bytes of a resource and tag the resource as Ubifs if it matches the file magic.

identify(self, resource, config=None) async

Perform identification on the given resource.

Users should not call this method directly; rather, they should run Resource.identify.

Parameters:

Name Type Description Default
resource Resource required
config

Optional config for identifying. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak/core/ubifs.py
async def identify(self, resource: Resource, config=None) -> None:
    datalength = await resource.get_data_length()
    if datalength >= 4:
        data = await resource.get_data(Range(0, 4))
        if data == UBIFS_NODE_MAGIC:
            resource.add_tag(Ubifs)

UbifsPacker (Packer)

Generate an UBIFS image from a filesystem representation in OFRAK.

pack(self, resource, config=None) async

Pack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.pack.

Parameters:

Name Type Description Default
resource Resource required
config

Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak/core/ubifs.py
async def pack(self, resource: Resource, config=None) -> None:
    ubifs_view = await resource.view_as(Ubifs)
    flush_dir = await ubifs_view.flush_to_disk()

    with tempfile.NamedTemporaryFile(mode="rb", delete_on_close=False) as temp:
        temp.close()
        cmd = [
            "mkfs.ubifs",
            "-m",
            f"{ubifs_view.min_io_size}",
            "-e",
            f"{ubifs_view.leb_size}",
            "-c",
            f"{ubifs_view.superblock.max_leb_count}",
            "-x",
            f"{ubifs_view.superblock.default_compr}",
            "-f",
            f"{ubifs_view.superblock.fanout}",
            "-k",
            f"{ubifs_view.superblock.key_hash}",
            "-p",
            f"{ubifs_view.superblock.orph_lebs}",
            "-l",
            f"{ubifs_view.superblock.log_lebs}",
            "-F",
            "-r",
            flush_dir,
            temp.name,
        ]
        proc = await asyncio.create_subprocess_exec(
            *cmd,
        )
        returncode = await proc.wait()
        if proc.returncode:
            raise CalledProcessError(returncode=returncode, cmd=cmd)
        with open(temp.name, "rb") as new_fh:
            new_data = new_fh.read()

        resource.queue_patch(Range(0, await resource.get_data_length()), new_data)

UbifsUnpacker (Unpacker)

Unpack the UBIFS image into a filesystem representation.

unpack(self, resource, config=None) async

Unpack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.

Parameters:

Name Type Description Default
resource Resource

The resource that is being unpacked

required
config

Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak/core/ubifs.py
async def unpack(self, resource: Resource, config=None):
    with tempfile.TemporaryDirectory() as temp_flush_dir:
        # flush to disk
        with open(f"{temp_flush_dir}/input.img", "wb") as temp_file:
            resource_data = await resource.get_data()
            temp_file.write(resource_data)
            temp_file.flush()

        cmd = [
            "ubireader_extract_files",
            "-k",
            "-o",
            f"{temp_flush_dir}/output",
            temp_file.name,
        ]
        proc = await asyncio.create_subprocess_exec(
            *cmd,
        )
        returncode = await proc.wait()
        if proc.returncode:
            raise CalledProcessError(returncode=returncode, cmd=cmd)

        ubifs_view = await resource.view_as(Ubifs)
        await ubifs_view.initialize_from_disk(f"{temp_flush_dir}/output")