zip.py
ofrak.core.zip
ZipArchive (GenericBinary, FilesystemRoot)
dataclass
Filesystem stored in a zip archive.
ZipPacker (Packer)
Pack files into a compressed zip archive.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/zip.py
async def pack(self, resource: Resource, config=None):
zip_view: ZipArchive = await resource.view_as(ZipArchive)
flush_dir = await zip_view.flush_to_disk()
temp_archive = f"{flush_dir}.zip"
cwd = os.getcwd()
os.chdir(flush_dir)
cmd = [
"zip",
"-r",
temp_archive,
".",
]
proc = await asyncio.create_subprocess_exec(
*cmd,
)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
os.chdir(cwd)
with open(temp_archive, "rb") as fh:
resource.queue_patch(Range(0, await zip_view.resource.get_data_length()), fh.read())
ZipUnpacker (Unpacker)
Unpack (decompress) a zip archive.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/zip.py
async def unpack(self, resource: Resource, config=None):
zip_view = await resource.view_as(ZipArchive)
with tempfile.NamedTemporaryFile(suffix=".zip") as temp_archive:
temp_archive.write(await resource.get_data())
temp_archive.flush()
with tempfile.TemporaryDirectory() as temp_dir:
cmd = [
"unzip",
temp_archive.name,
"-d",
temp_dir,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
await zip_view.initialize_from_disk(temp_dir)