Skip to content

iso9660.py

ofrak.core.iso9660

ElToritoISO9660Image (ISO9660Image) dataclass

ElToritoISO9660Image()

ISO9660Entry (ResourceView) dataclass

ISO9660Entry(name: str, path: str, is_dir: bool, is_file: bool, is_symlink: bool, is_dot: bool, is_dotdot: bool, iso_version: int)

ISO9660Image (GenericBinary, FilesystemRoot) dataclass

ISO 9660 image. ISO 9660 is a file system for optical disc media.

ISO9660ImageAnalyzer (Analyzer)

analyze(self, resource, config=None) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description

The analysis results

Source code in ofrak/core/iso9660.py
async def analyze(self, resource: Resource, config=None):
    joliet_level = None
    rockridge_version = None
    udf_version = None

    iso = PyCdlib()
    iso.open_fp(BytesIO(await resource.get_data()))

    interchange_level = iso.interchange_level
    has_joliet = iso.has_joliet()
    has_rockridge = iso.has_rock_ridge()
    has_udf = iso.has_udf()
    has_eltorito = iso.eltorito_boot_catalog is not None
    vol_identifier = iso.pvd.volume_identifier.decode("utf-8").strip()
    sys_identifier = iso.pvd.system_identifier.decode("utf-8").strip()
    app_identifier = iso.pvd.application_identifier.text.decode("utf-8").strip()
    xa = iso.xa

    if has_joliet:
        esc = iso.joliet_vd.escape_sequences
        if b"%/@" in esc:
            joliet_level = 1
        elif b"%/C" in esc:
            joliet_level = 2
        elif b"%/E" in esc:
            joliet_level = 3
    elif has_rockridge:
        rockridge_version = iso.rock_ridge
    elif has_udf:
        udf_version = "2.60"

    iso.close()

    return ISO9660ImageAttributes(
        interchange_level=interchange_level,
        volume_identifier=vol_identifier,
        system_identifier=sys_identifier,
        app_identifier=app_identifier,
        extended_attributes=xa,
        has_joliet=has_joliet,
        joliet_level=joliet_level,
        has_rockridge=has_rockridge,
        rockridge_version=rockridge_version,
        has_udf=has_udf,
        udf_version=udf_version,
        has_eltorito=has_eltorito,
    )

ISO9660ImageAttributes (ResourceAttributes) dataclass

ISO9660ImageAttributes(interchange_level: int, volume_identifier: str, system_identifier: str, app_identifier: str, extended_attributes: bool, has_joliet: bool, has_rockridge: bool, has_udf: bool, has_eltorito: bool, joliet_level: Union[int, NoneType] = None, rockridge_version: Union[str, NoneType] = None, udf_version: Union[str, NoneType] = None)

ISO9660Packer (Packer)

pack(self, resource, config=None) async

Pack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.pack.

Parameters:

Name Type Description Default
resource Resource required
config

Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak/core/iso9660.py
async def pack(self, resource: Resource, config=None) -> None:
    iso_view = await resource.view_as(ISO9660Image)

    try:
        isolinux_bin = await resource.get_only_descendant_as_view(
            ISO9660Entry,
            r_filter=ResourceFilter(
                attribute_filters=(
                    (ResourceAttributeValueFilter(ISO9660Entry.Name, "isolinux.bin"),)
                ),
            ),
        )
        isolinux_bin_cmd = [
            "-b",
            isolinux_bin.path.strip("/"),
        ]  # The leading "/" is not needed in this CLI arg
    except NotFoundError:
        isolinux_bin_cmd = list()
    try:
        boot_cat = await resource.get_only_descendant_as_view(
            ISO9660Entry,
            r_filter=ResourceFilter(
                attribute_filters=(
                    (ResourceAttributeValueFilter(ISO9660Entry.Name, "boot.cat"),)
                ),
            ),
        )
        boot_cat_cmd = [
            "-c",
            boot_cat.path.strip("/"),
        ]  # The leading "/" is not needed in this CLI arg
    except NotFoundError:
        boot_cat_cmd = list()

    iso_attrs = resource.get_attributes(ISO9660ImageAttributes)
    temp_flush_dir = await iso_view.flush_to_disk()
    with tempfile.NamedTemporaryFile(suffix=".iso", mode="rb") as temp:
        cmd = [
            "mkisofs",
            *(["-J"] if iso_attrs.has_joliet else []),
            *(["-R"] if iso_attrs.has_rockridge else []),
            *(["-V", iso_attrs.volume_identifier] if iso_attrs.volume_identifier else []),
            *(["-sysid", iso_attrs.system_identifier] if iso_attrs.system_identifier else []),
            *(["-A", iso_attrs.app_identifier] if iso_attrs.app_identifier else []),
            *(
                [
                    "-no-emul-boot",
                    *isolinux_bin_cmd,
                    *boot_cat_cmd,
                    "-boot-info-table",
                    "-no-emul-boot",
                ]
                if iso_attrs.has_eltorito
                else []
            ),
            "-allow-multidot",
            "-o",
            temp.name,
            temp_flush_dir,
        ]
        proc = await asyncio.create_subprocess_exec(*cmd)
        returncode = await proc.wait()
        if proc.returncode:
            raise CalledProcessError(returncode=returncode, cmd=cmd)
        new_data = temp.read()
        # Passing in the original range effectively replaces the original data with the new data
        resource.queue_patch(Range(0, await resource.get_data_length()), new_data)

ISO9660Unpacker (Unpacker)

Unpack an ISO 9660 image.

unpack(self, resource, config=None) async

Unpack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.

Parameters:

Name Type Description Default
resource Resource

The resource that is being unpacked

required
config

Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak/core/iso9660.py
async def unpack(self, resource: Resource, config=None):
    iso_data = await resource.get_data()

    iso_attributes = await resource.analyze(ISO9660ImageAttributes)
    resource.add_attributes(iso_attributes)
    iso_resource = await resource.view_as(ISO9660Image)

    iso = PyCdlib()
    iso.open_fp(BytesIO(iso_data))

    if iso_attributes.has_joliet:
        facade = iso.get_joliet_facade()
        path_var = "joliet_path"
    elif iso_attributes.has_udf:
        LOGGER.warning("UDF images are not currently supported")
        facade = iso.get_udf_facade()
        path_var = "udf_path"
    elif iso_attributes.has_rockridge:
        LOGGER.warning("Rock Ridge images are not currently supported")
        facade = iso.get_rock_ridge_facade()
        path_var = "rr_name"
    else:
        facade = iso.get_iso9660_facade()
        path_var = "iso_path"

    if iso_attributes.has_eltorito:
        LOGGER.warning("El Torito images are not currently supported")

    for root, dirs, files in iso.walk(**{path_var: "/"}):
        for d in dirs:
            path = os.path.join(root, d)
            folder_tags = (ISO9660Entry, Folder)
            entry = ISO9660Entry(
                name=d,
                path=path,
                is_dir=True,
                is_file=False,
                is_symlink=False,
                is_dot=(str(d).startswith(".") and not str(d).startswith("..")),
                is_dotdot=str(d).startswith(".."),
                iso_version=-1,
            )
            await iso_resource.add_folder(
                path, None, None, folder_tags, entry.get_attributes_instances().values()
            )
        for f in files:
            path = os.path.join(root, f)
            file_tags = (ISO9660Entry, File)
            fp = BytesIO()

            facade.get_file_from_iso_fp(fp, **{path_var: path})
            file_data = fp.getvalue()

            if ";" in f:
                f, iso_version = f.split(";")
                iso_version = int(iso_version)
                path = path.split(";")[0]

                if f.endswith("."):
                    f = f[:-1]
                    path = path[:-1]
            else:
                iso_version = -1

            entry = ISO9660Entry(
                name=f,
                path=path,
                is_dir=False,
                is_file=True,
                is_symlink=False,
                is_dot=(str(f).startswith(".") and not str(f).startswith("..")),
                is_dotdot=str(f).startswith(".."),
                iso_version=iso_version,
            )
            await iso_resource.add_file(
                path,
                file_data,
                None,
                None,
                file_tags,
                entry.get_attributes_instances().values(),
            )
            fp.close()

    iso.close()

JolietISO9660Image (ISO9660Image) dataclass

JolietISO9660Image()

RockRidgeISO9660Image (ISO9660Image) dataclass

RockRidgeISO9660Image()

UdfISO9660Image (ISO9660Image) dataclass

UdfISO9660Image()