Skip to content

iso9660.py

ofrak_components.iso9660

ElToritoISO9660Image (ISO9660Image) dataclass

ElToritoISO9660Image()

ISO9660Entry (ResourceView) dataclass

ISO9660Entry()

ISO9660EntryAttributes (ResourceAttributes) dataclass

ISO9660EntryAttributes(name: str, path: str, is_dir: bool, is_file: bool, is_symlink: bool, is_dot: bool, is_dotdot: bool, iso_version: int)

ISO9660Image (GenericBinary, FilesystemRoot) dataclass

ISO 9660 image. ISO 9660 is a file system for optical disc media.

ISO9660ImageAnalyzer (Analyzer)

analyze(self, resource, config=None) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description

The analysis results

Source code in ofrak_components/iso9660.py
async def analyze(self, resource: Resource, config=None):
    joliet_level = None
    rockridge_version = None
    udf_version = None

    iso = PyCdlib()
    iso.open_fp(BytesIO(await resource.get_data()))

    interchange_level = iso.interchange_level
    has_joliet = iso.has_joliet()
    has_rockridge = iso.has_rock_ridge()
    has_udf = iso.has_udf()
    has_eltorito = iso.eltorito_boot_catalog is not None
    vol_identifier = iso.pvd.volume_identifier.decode("utf-8").strip()
    sys_identifier = iso.pvd.system_identifier.decode("utf-8").strip()
    app_identifier = iso.pvd.application_identifier.text.decode("utf-8").strip()
    xa = iso.xa

    if has_joliet:
        esc = iso.joliet_vd.escape_sequences
        if b"%/@" in esc:
            joliet_level = 1
        elif b"%/C" in esc:
            joliet_level = 2
        elif b"%/E" in esc:
            joliet_level = 3
    elif has_rockridge:
        rockridge_version = iso.rock_ridge
    elif has_udf:
        udf_version = "2.60"

    iso.close()

    return ISO9660ImageAttributes(
        interchange_level=interchange_level,
        volume_identifier=vol_identifier,
        system_identifier=sys_identifier,
        app_identifier=app_identifier,
        extended_attributes=xa,
        has_joliet=has_joliet,
        joliet_level=joliet_level,
        has_rockridge=has_rockridge,
        rockridge_version=rockridge_version,
        has_udf=has_udf,
        udf_version=udf_version,
        has_eltorito=has_eltorito,
    )

ISO9660ImageAttributes (ResourceAttributes) dataclass

ISO9660ImageAttributes(interchange_level: int, volume_identifier: str, system_identifier: str, app_identifier: str, extended_attributes: bool, has_joliet: bool, has_rockridge: bool, has_udf: bool, has_eltorito: bool, joliet_level: Union[int, NoneType] = None, rockridge_version: Union[str, NoneType] = None, udf_version: Union[str, NoneType] = None)

ISO9660Packer (Packer)

Pack files in an ISO 9660 image.

pack(self, resource, config=None) async

Pack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.pack.

Parameters:

Name Type Description Default
resource Resource required
config

Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak_components/iso9660.py
async def pack(self, resource: Resource, config=None):
    image_attributes = resource.get_attributes(ISO9660ImageAttributes)

    iso_result = PyCdlib()
    iso_result.new(
        interchange_level=image_attributes.interchange_level,
        sys_ident=image_attributes.system_identifier,
        vol_ident=image_attributes.volume_identifier,
        app_ident_str=image_attributes.app_identifier,
        joliet=image_attributes.joliet_level,
        rock_ridge=image_attributes.rockridge_version,
        xa=image_attributes.extended_attributes,
        udf=image_attributes.udf_version,
    )

    if image_attributes.has_joliet:
        resource.add_tag(JolietISO9660Image)
        facade = iso_result.get_joliet_facade()
        path_arg = "joliet_path"
    elif image_attributes.has_udf:
        resource.add_tag(UdfISO9660Image)
        facade = iso_result.get_udf_facade()
        path_arg = "udf_path"
        LOGGER.warning("UDF images are not currently supported")
    elif image_attributes.has_rockridge:
        LOGGER.warning("Rock Ridge images are not currently supported")
        resource.add_tag(RockRidgeISO9660Image)
        facade = iso_result.get_rock_ridge_facade()
        path_arg = "rr_name"
    else:
        facade = iso_result.get_iso9660_facade()
        path_arg = "iso_path"

    if image_attributes.has_eltorito:
        LOGGER.warning("El Torito images are not currently supported")

    child_queue = [
        d
        for d in await resource.get_children_as_view(
            ISO9660Entry, r_filter=ResourceFilter(tags=(ISO9660Entry,))
        )
    ]

    while len(child_queue) > 0:
        child = child_queue.pop(0)
        entry_attributes = child.resource.get_attributes(ISO9660EntryAttributes)

        path = entry_attributes.path
        if entry_attributes.iso_version != -1:
            path += ";" + str(entry_attributes.iso_version)

        if child.resource.has_tag(Folder):
            facade.add_directory(**{path_arg: path})
            for d in await child.resource.get_children_as_view(
                ISO9660Entry, r_filter=ResourceFilter(tags=(ISO9660Entry,))
            ):
                child_queue.append(d)
        elif child.resource.has_tag(File):
            file_data = await child.resource.get_data()
            facade.add_fp(BytesIO(file_data), len(file_data), **{path_arg: path})

    result = BytesIO()
    iso_result.write_fp(result)
    iso_result.close()

    iso_data = result.getvalue()
    result.close()
    resource.queue_patch(Range(0, await resource.get_data_length()), iso_data)

ISO9660Unpacker (Unpacker)

Unpack an ISO 9660 image.

unpack(self, resource, config=None) async

Unpack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.

Parameters:

Name Type Description Default
resource Resource

The resource that is being unpacked

required
config

Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak_components/iso9660.py
async def unpack(self, resource: Resource, config=None):
    iso_data = await resource.get_data()

    iso_attributes = await resource.analyze(ISO9660ImageAttributes)
    resource.add_attributes(iso_attributes)
    iso_resource = await resource.view_as(ISO9660Image)

    iso = PyCdlib()
    iso.open_fp(BytesIO(iso_data))

    if iso_attributes.has_joliet:
        facade = iso.get_joliet_facade()
        path_var = "joliet_path"
    elif iso_attributes.has_udf:
        LOGGER.warning("UDF images are not currently supported")
        facade = iso.get_udf_facade()
        path_var = "udf_path"
    elif iso_attributes.has_rockridge:
        LOGGER.warning("Rock Ridge images are not currently supported")
        facade = iso.get_rock_ridge_facade()
        path_var = "rr_name"
    else:
        facade = iso.get_iso9660_facade()
        path_var = "iso_path"

    if iso_attributes.has_eltorito:
        LOGGER.warning("El Torito images are not currently supported")

    for root, dirs, files in iso.walk(**{path_var: "/"}):
        for d in dirs:
            path = os.path.join(root, d)
            folder_tags = (ISO9660Entry, Folder)
            attributes = (
                ISO9660EntryAttributes(
                    name=d,
                    path=path,
                    is_dir=True,
                    is_file=False,
                    is_symlink=False,
                    is_dot=(str(d).startswith(".") and not str(d).startswith("..")),
                    is_dotdot=str(d).startswith(".."),
                    iso_version=-1,
                ),
            )
            await iso_resource.add_folder(path, None, None, folder_tags, attributes)
        for f in files:
            path = os.path.join(root, f)
            file_tags = (ISO9660Entry, File)
            fp = BytesIO()

            facade.get_file_from_iso_fp(fp, **{path_var: path})
            file_data = fp.getvalue()

            if ";" in f:
                f, iso_version = f.split(";")
                iso_version = int(iso_version)
                path = path.split(";")[0]

                if f.endswith("."):
                    f = f[:-1]
                    path = path[:-1]
            else:
                iso_version = -1

            attributes = (
                ISO9660EntryAttributes(
                    name=f,
                    path=path,
                    is_dir=False,
                    is_file=True,
                    is_symlink=False,
                    is_dot=(str(f).startswith(".") and not str(f).startswith("..")),
                    is_dotdot=str(f).startswith(".."),
                    iso_version=iso_version,
                ),
            )
            await iso_resource.add_file(path, file_data, None, None, file_tags, attributes)
            fp.close()

    iso.close()

JolietISO9660Image (ISO9660Image) dataclass

JolietISO9660Image()

RockRidgeISO9660Image (ISO9660Image) dataclass

RockRidgeISO9660Image()

UdfISO9660Image (ISO9660Image) dataclass

UdfISO9660Image()