gzip.py
ofrak_components.gzip
GzipData (GenericBinary)
A gzip binary blob.
GzipPacker (Packer)
Pack data into a compressed gzip file.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak_components/gzip.py
async def pack(self, resource: Resource, config=None):
gzip_view = await resource.view_as(GzipData)
result = BytesIO()
with GzipFile(fileobj=result, mode="w") as gzip_file:
gzip_child_r = await gzip_view.get_file()
gzip_data = await gzip_child_r.get_data()
gzip_file.write(gzip_data)
original_gzip_size = await gzip_view.resource.get_data_length()
resource.queue_patch(Range(0, original_gzip_size), result.getvalue())
GzipUnpacker (Unpacker)
Unpack (decompress) a gzip file.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak_components/gzip.py
async def unpack(self, resource: Resource, config=None):
# Create temporary file with .gz xtension
with tempfile.NamedTemporaryFile(suffix=".gz") as temp_file:
temp_file.write(await resource.get_data())
temp_file.flush()
command = ["pigz", "-d", "-c", temp_file.name] # Decompress to stdout
try:
run_result = subprocess.run(command, check=True, capture_output=True)
data = run_result.stdout
except subprocess.CalledProcessError as e:
# Forward any gzip warning message and continue
if e.returncode == -2 or e.returncode == 2:
LOGGER.warning(e.stderr)
data = e.stdout
else:
raise
await resource.create_child(
tags=(GenericBinary,),
data=data,
)