Skip to content

zip.py

ofrak_components.zip

ZipArchive (GenericBinary, FilesystemRoot) dataclass

Filesystem stored in a zip archive.

ZipPacker (Packer)

Pack files into a compressed zip archive.

pack(self, resource, config=None) async

Pack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.pack.

Parameters:

Name Type Description Default
resource Resource required
config

Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak_components/zip.py
async def pack(self, resource: Resource, config=None):
    zip_view: ZipArchive = await resource.view_as(ZipArchive)
    flush_dir = await zip_view.flush_to_disk()
    temp_archive = f"{flush_dir}.zip"
    cwd = os.getcwd()
    os.chdir(flush_dir)
    command = ["zip", "-r", temp_archive, "."]
    try:
        subprocess.run(command, check=True, capture_output=True)
    except subprocess.CalledProcessError as e:
        raise PackerError(format_called_process_error(e))
    os.chdir(cwd)
    with open(temp_archive, "rb") as fh:
        resource.queue_patch(Range(0, await zip_view.resource.get_data_length()), fh.read())

ZipUnpacker (Unpacker)

Unpack (decompress) a zip archive.

unpack(self, resource, config=None) async

Unpack the given resource.

Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.

Parameters:

Name Type Description Default
resource Resource

The resource that is being unpacked

required
config

Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None
Source code in ofrak_components/zip.py
async def unpack(self, resource: Resource, config=None):
    zip_view = await resource.view_as(ZipArchive)
    with tempfile.NamedTemporaryFile(suffix=".zip") as temp_archive:
        temp_archive.write(await resource.get_data())
        temp_archive.flush()
        with tempfile.TemporaryDirectory() as temp_dir:
            command = ["unzip", temp_archive.name, "-d", temp_dir]
            try:
                subprocess.run(command, check=True, capture_output=True)
            except subprocess.CalledProcessError as e:
                raise UnpackerError(format_called_process_error(e))
            cwd = os.getcwd()
            os.chdir(temp_dir)
            await zip_view.initialize_from_disk(temp_dir)
            os.chdir(cwd)