iso9660.py
ofrak.core.iso9660
ElToritoISO9660Image (ISO9660Image)
dataclass
ElToritoISO9660Image()
ISO9660Entry (ResourceView)
dataclass
ISO9660Entry(name: str, path: str, is_dir: bool, is_file: bool, is_symlink: bool, is_dot: bool, is_dotdot: bool, iso_version: int)
ISO9660Image (GenericBinary, FilesystemRoot)
dataclass
ISO 9660 image. ISO 9660 is a file system for optical disc media.
ISO9660ImageAnalyzer (Analyzer)
analyze(self, resource, config=None)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Returns:
Type | Description |
---|---|
The analysis results |
Source code in ofrak/core/iso9660.py
async def analyze(self, resource: Resource, config=None):
joliet_level = None
rockridge_version = None
udf_version = None
iso = PyCdlib()
iso.open_fp(BytesIO(await resource.get_data()))
interchange_level = iso.interchange_level
has_joliet = iso.has_joliet()
has_rockridge = iso.has_rock_ridge()
has_udf = iso.has_udf()
has_eltorito = iso.eltorito_boot_catalog is not None
vol_identifier = iso.pvd.volume_identifier.decode("utf-8").strip()
sys_identifier = iso.pvd.system_identifier.decode("utf-8").strip()
app_identifier = iso.pvd.application_identifier.text.decode("utf-8").strip()
xa = iso.xa
if has_joliet:
esc = iso.joliet_vd.escape_sequences
if b"%/@" in esc:
joliet_level = 1
elif b"%/C" in esc:
joliet_level = 2
elif b"%/E" in esc:
joliet_level = 3
elif has_rockridge:
rockridge_version = iso.rock_ridge
elif has_udf:
udf_version = "2.60"
iso.close()
return ISO9660ImageAttributes(
interchange_level=interchange_level,
volume_identifier=vol_identifier,
system_identifier=sys_identifier,
app_identifier=app_identifier,
extended_attributes=xa,
has_joliet=has_joliet,
joliet_level=joliet_level,
has_rockridge=has_rockridge,
rockridge_version=rockridge_version,
has_udf=has_udf,
udf_version=udf_version,
has_eltorito=has_eltorito,
)
ISO9660ImageAttributes (ResourceAttributes)
dataclass
ISO9660ImageAttributes(interchange_level: int, volume_identifier: str, system_identifier: str, app_identifier: str, extended_attributes: bool, has_joliet: bool, has_rockridge: bool, has_udf: bool, has_eltorito: bool, joliet_level: Optional[int] = None, rockridge_version: Optional[str] = None, udf_version: Optional[str] = None)
ISO9660Packer (Packer)
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/iso9660.py
async def pack(self, resource: Resource, config=None) -> None:
iso_view = await resource.view_as(ISO9660Image)
try:
isolinux_bin = await resource.get_only_descendant_as_view(
ISO9660Entry,
r_filter=ResourceFilter(
attribute_filters=(
(ResourceAttributeValueFilter(ISO9660Entry.Name, "isolinux.bin"),)
),
),
)
isolinux_bin_cmd = [
"-b",
isolinux_bin.path.strip("/"),
] # The leading "/" is not needed in this CLI arg
except NotFoundError:
isolinux_bin_cmd = list()
try:
boot_cat = await resource.get_only_descendant_as_view(
ISO9660Entry,
r_filter=ResourceFilter(
attribute_filters=(
(ResourceAttributeValueFilter(ISO9660Entry.Name, "boot.cat"),)
),
),
)
boot_cat_cmd = [
"-c",
boot_cat.path.strip("/"),
] # The leading "/" is not needed in this CLI arg
except NotFoundError:
boot_cat_cmd = list()
iso_attrs = resource.get_attributes(ISO9660ImageAttributes)
temp_flush_dir = await iso_view.flush_to_disk()
with tempfile.NamedTemporaryFile(suffix=".iso", mode="rb", delete_on_close=False) as temp:
temp.close()
cmd = [
"mkisofs",
*(["-J"] if iso_attrs.has_joliet else []),
*(["-R"] if iso_attrs.has_rockridge else []),
*(["-V", iso_attrs.volume_identifier] if iso_attrs.volume_identifier else []),
*(["-sysid", iso_attrs.system_identifier] if iso_attrs.system_identifier else []),
*(["-A", iso_attrs.app_identifier] if iso_attrs.app_identifier else []),
*(
[
"-no-emul-boot",
*isolinux_bin_cmd,
*boot_cat_cmd,
"-boot-info-table",
"-no-emul-boot",
]
if iso_attrs.has_eltorito
else []
),
"-allow-multidot",
"-o",
temp.name,
temp_flush_dir,
]
proc = await asyncio.create_subprocess_exec(*cmd)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
with open(temp.name, "rb") as new_fh:
new_data = new_fh.read()
# Passing in the original range effectively replaces the original data with the new data
resource.queue_patch(Range(0, await resource.get_data_length()), new_data)
ISO9660Unpacker (Unpacker)
Unpack an ISO 9660 image.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/iso9660.py
async def unpack(self, resource: Resource, config=None):
iso_data = await resource.get_data()
iso_attributes = await resource.analyze(ISO9660ImageAttributes)
resource.add_attributes(iso_attributes)
iso_resource = await resource.view_as(ISO9660Image)
iso = PyCdlib()
iso.open_fp(BytesIO(iso_data))
if iso_attributes.has_joliet:
facade = iso.get_joliet_facade()
path_var = "joliet_path"
elif iso_attributes.has_udf:
LOGGER.warning("UDF images are not currently supported")
facade = iso.get_udf_facade()
path_var = "udf_path"
elif iso_attributes.has_rockridge:
LOGGER.warning("Rock Ridge images are not currently supported")
facade = iso.get_rock_ridge_facade()
path_var = "rr_name"
else:
facade = iso.get_iso9660_facade()
path_var = "iso_path"
if iso_attributes.has_eltorito:
LOGGER.warning("El Torito images are not currently supported")
for root, dirs, files in iso.walk(**{path_var: "/"}):
for d in dirs:
path = posixpath.join(root, d)
folder_tags = (ISO9660Entry, Folder)
entry = ISO9660Entry(
name=d,
path=path,
is_dir=True,
is_file=False,
is_symlink=False,
is_dot=(str(d).startswith(".") and not str(d).startswith("..")),
is_dotdot=str(d).startswith(".."),
iso_version=-1,
)
await iso_resource.add_folder(
path, None, None, folder_tags, entry.get_attributes_instances().values()
)
for f in files:
path = posixpath.join(root, f)
file_tags = (ISO9660Entry, File)
fp = BytesIO()
facade.get_file_from_iso_fp(fp, **{path_var: path})
file_data = fp.getvalue()
if ";" in f:
f, iso_version = f.split(";")
iso_version = int(iso_version)
path = path.split(";")[0]
if f.endswith("."):
f = f[:-1]
path = path[:-1]
else:
iso_version = -1
entry = ISO9660Entry(
name=f,
path=path,
is_dir=False,
is_file=True,
is_symlink=False,
is_dot=(str(f).startswith(".") and not str(f).startswith("..")),
is_dotdot=str(f).startswith(".."),
iso_version=iso_version,
)
await iso_resource.add_file(
path,
file_data,
None,
None,
file_tags,
entry.get_attributes_instances().values(),
)
fp.close()
iso.close()
JolietISO9660Image (ISO9660Image)
dataclass
JolietISO9660Image()
RockRidgeISO9660Image (ISO9660Image)
dataclass
RockRidgeISO9660Image()
UdfISO9660Image (ISO9660Image)
dataclass
UdfISO9660Image()