lzo.py
ofrak.core.lzo
LzoData (GenericBinary)
An lzo binary blob.
LzoPacker (Packer)
Pack data into a compressed LZO file.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
ComponentConfig |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/lzo.py
async def pack(self, resource: Resource, config: ComponentConfig = None):
lzo_view = await resource.view_as(LzoData)
child_file = await lzo_view.get_child()
uncompressed_data = await child_file.resource.get_data()
with tempfile.NamedTemporaryFile(suffix=".lzo") as uncompressed_file:
uncompressed_file.write(uncompressed_data)
uncompressed_file.flush()
cmd = [
"lzop",
"-f",
"-c",
uncompressed_file.name,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode:
raise CalledProcessError(returncode=proc.returncode, cmd=cmd)
compressed_data = stdout
original_size = await lzo_view.resource.get_data_length()
resource.queue_patch(Range(0, original_size), compressed_data)
LzoUnpacker (Unpacker)
Unpack (decompress) an LZO file.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
ComponentConfig |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/lzo.py
async def unpack(self, resource: Resource, config: ComponentConfig = None) -> None:
with tempfile.NamedTemporaryFile(suffix=".lzo") as compressed_file:
compressed_file.write(await resource.get_data())
compressed_file.flush()
cmd = [
"lzop",
"-d",
"-f",
"-c",
compressed_file.name,
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode:
raise CalledProcessError(returncode=proc.returncode, cmd=cmd)
await resource.create_child(tags=(GenericBinary,), data=stdout)