yaffs.py
ofrak.core.yaffs
Yaffs2Filesystem (GenericBinary, FilesystemRoot)
dataclass
Filesystem stored in YAFFS (Yet Another Flash File System) format.
Yaffs2FilesystemAttributes (ResourceAttributes)
dataclass
Geometry of a YAFFS2 image.
Yaffs2Identifier (Identifier)
Identify YAFFSv2 filesystem images by checking for valid YAFFS2 object header magic bytes at offset 0, valid spare area magic at the detected page boundary, and a valid subsequent object header at the detected block size.
identify(self, resource, config=None)
async
Perform identification on the given resource.
Users should not call this method directly; rather, they should run Resource.identify.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for identifying. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/yaffs.py
async def identify(self, resource: Resource, config=None) -> None:
header = await resource.get_data(range=Range(0, 10))
if len(header) < 10 or header not in YAFFS_HEADER_MAGICS:
return
endian = ">" if header[0] == 0x00 else "<"
# Upper bound: (max_page + max_spare) * max_confirm_blocks + 10
read_size = (max(PAGE_SIZES) + MAX_SPARE_SIZE) * NUM_CONFIRM_BLOCKS + 10
data = await resource.get_data(range=Range(0, read_size))
page_size = detect_page_size(data)
if page_size == 0:
return
spare_size = detect_spare_size(data, page_size, endian)
if spare_size == 0:
return
resource.add_tag(Yaffs2Filesystem)
resource.add_attributes(
Yaffs2FilesystemAttributes(
page_size=page_size,
spare_size=spare_size,
endian=Endianness.BIG_ENDIAN if endian == ">" else Endianness.LITTLE_ENDIAN,
)
)
Yaffs2Packer (Packer)
Packages files into a YAFFS2 image. The packer preserves Unix permissions, ownership, symbolic links, and special files, and writes the image using the same page size, spare size, and endianness detected at unpack time.
pack(self, resource, config=None)
async
Pack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.pack.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource |
Resource |
required | |
config |
Optional config for packing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/yaffs.py
async def pack(self, resource: Resource, config=None):
attrs = await _get_yaffs2_attributes(resource)
view: Yaffs2Filesystem = await resource.view_as(Yaffs2Filesystem)
temp_flush_dir = await view.flush_to_disk()
with tempfile.NamedTemporaryFile(
suffix=".yaffs2", mode="rb", delete_on_close=False
) as temp:
temp.close()
cmd = [
"mkyaffs2",
"-p",
str(attrs.page_size),
"-s",
str(attrs.spare_size),
*endian_arg(attrs),
temp_flush_dir,
temp.name,
]
proc = await asyncio.create_subprocess_exec(*cmd)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
with open(temp.name, "rb") as new_fh:
new_data = new_fh.read()
resource.queue_patch(Range(0, await resource.get_data_length()), new_data)
Yaffs2Unpacker (Unpacker)
Extracts files and directories from YAFFS2 images. YAFFS2 is commonly used as the root filesystem in embedded Linux devices built on NAND flash, notably older Android devices and various industrial firmware. The unpacker preserves file permissions, ownership, symbolic links, and special files.
unpack(self, resource, config=None)
async
Unpack the given resource.
Users should not call this method directly; rather, they should run Resource.run or Resource.unpack.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource |
Resource |
The resource that is being unpacked |
required |
config |
Optional config for unpacking. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Source code in ofrak/core/yaffs.py
async def unpack(self, resource: Resource, config=None):
attrs = await _get_yaffs2_attributes(resource)
async with resource.temp_to_disk() as temp_path:
with tempfile.TemporaryDirectory() as temp_flush_dir:
cmd = [
"unyaffs2",
"-p",
str(attrs.page_size),
"-s",
str(attrs.spare_size),
*endian_arg(attrs),
temp_path,
temp_flush_dir,
]
proc = await asyncio.create_subprocess_exec(*cmd)
returncode = await proc.wait()
if proc.returncode:
raise CalledProcessError(returncode=returncode, cmd=cmd)
view = await resource.view_as(Yaffs2Filesystem)
await view.initialize_from_disk(temp_flush_dir)
_Yaffs2UtilTool (ComponentExternalTool)
private
yaffs2utils binaries (mkyaffs2/unyaffs2) never exit 0 when run for help (they return 255 regardless), so detect installation by running with no arguments and checking the banner in stdout.
__init__(self, tool)
special
Initialize self. See help(type(self)) for accurate signature.
Source code in ofrak/core/yaffs.py
def __init__(self, tool: str):
super().__init__(tool, "https://code.google.com/archive/p/yaffs2utils/", "")
is_tool_installed(self)
async
Check if a tool is installed by running it with the install_check_arg.
This method runs <tool> <install_check_arg>.
Returns:
| Type | Description |
|---|---|
bool |
True if the |
Source code in ofrak/core/yaffs.py
async def is_tool_installed(self) -> bool:
try:
proc = await asyncio.create_subprocess_exec(
self.tool,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
stdout, _ = await proc.communicate()
except FileNotFoundError:
return False
return self.tool.encode() in stdout
detect_page_size(data)
Detect page size by looking for spare magic at known page size offsets.
Source code in ofrak/core/yaffs.py
def detect_page_size(data: bytes) -> int:
"""
Detect page size by looking for spare magic at known page size offsets.
"""
NUM_CHECKS = 3
SPARE_SIZE_CANDIDATES = (16, 32, 64, 128, 256, 512)
for page_size in PAGE_SIZES:
for magic in SPARE_MAGICS:
for spare_size in SPARE_SIZE_CANDIDATES:
block_size = page_size + spare_size
for i in range(NUM_CHECKS):
start = block_size * i + page_size
end = start + len(magic)
if end > len(data) or data[start:end] != magic:
break
else: # No break
return page_size
return 0
detect_spare_size(data, page_size, endian)
Detect spare size by scanning for the next valid object header after the first page.
Searches for a valid header at each 4-byte-aligned offset in the spare region after the first page. When a candidate is found, it is validated by checking that another valid header exists at a later block_size-aligned offset (file objects may have data chunks between headers).
Source code in ofrak/core/yaffs.py
def detect_spare_size(data: bytes, page_size: int, endian: str) -> int:
"""
Detect spare size by scanning for the next valid object header after the first page.
Searches for a valid header at each 4-byte-aligned offset in the spare region
after the first page. When a candidate is found, it is validated by checking
that another valid header exists at a later block_size-aligned offset (file
objects may have data chunks between headers).
"""
scan_start = page_size + 4 # skip past spare magic bytes
scan_end = min(page_size + MAX_SPARE_SIZE, len(data) - 10)
for offset in range(scan_start, scan_end, 4):
if parse_obj_header(data[offset:], endian):
block_size = offset
# Scan subsequent blocks for another valid header
for n in range(2, NUM_CONFIRM_BLOCKS):
later = block_size * n
if later + 10 > len(data):
break
if parse_obj_header(data[later:], endian):
return block_size - page_size
return 0