Skip to content

filesystem.py

ofrak.core.filesystem

BlockDevice (SpecialFileType) dataclass

Block special device file.

CharacterDevice (SpecialFileType) dataclass

Character special device file.

FIFOPipe (SpecialFileType) dataclass

Named pipe.

File (FilesystemEntry)

Stores the data and location of a file within a filesystem or folder's descendant file tree.

FilesystemEntry (ResourceView) dataclass

Handles generic management of any entry stored within a filesystem.

get_name(self)

Get the base name of a folder.

Returns:

Type Description
str

The file or folder's base name

Source code in ofrak/core/filesystem.py
def get_name(self) -> str:
    """
    Get the base name of a folder.

    :return: The file or folder's base name
    """
    return self.Name

set_stat(self, stat_result) async

Set the stat for the FilesystemEntry. Useful for newly created files where we want to control the full 10-tuple.

Parameters:

Name Type Description Default
stat_result stat_result

os.stat_result object containing all necessary values

required
Source code in ofrak/core/filesystem.py
async def set_stat(self, stat_result: os.stat_result):
    """
    Set the stat for the `FilesystemEntry`. Useful for newly created files where we want to
    control the full 10-tuple.

    :param stat_result: `os.stat_result` object containing all necessary values
    """
    self.stat = stat_result
    if self.resource is None:
        return
    all_view_attrs: Dict[
        Type[ResourceAttributes], ResourceAttributes
    ] = self.get_attributes_instances()
    filesystem_attrs = all_view_attrs[AttributesType[FilesystemEntry]]
    self.resource.add_attributes(filesystem_attrs)
    await self.resource.save()

modify_stat_attribute(self, st_stat, stat_value) async

Modify a specific os.stat attribute on the filesystem entry.

Example:

fs_entry.modify_stat_attribute(stat.ST_MODE, 0o100755)

Parameters:

Name Type Description Default
st_stat int

The st_stat struct member to be modified

required
stat_value int

The new stat value

required
Source code in ofrak/core/filesystem.py
async def modify_stat_attribute(self, st_stat: int, stat_value: int):
    """
    Modify a specific `os.stat` attribute on the filesystem entry.

    Example:

    ```python
    fs_entry.modify_stat_attribute(stat.ST_MODE, 0o100755)
    ```

    :param st_stat: The `st_stat` struct member to be modified
    :param stat_value: The new stat value
    """
    if self.stat is None:
        raise ValueError("Cannot modify a stat attribute when stat attributes are not set")
    stat_attributes = list(self.stat)  # type: ignore
    stat_attributes[st_stat] = stat_value
    filesystem_stat_attributes = os.stat_result(tuple(stat_attributes))
    await self.set_stat(filesystem_stat_attributes)

modify_xattr_attribute(self, attribute, value) async

Modify the extended file attributes ("xattrs") for the filesystem entry.

Parameters:

Name Type Description Default
attribute str required
value bytes required
Source code in ofrak/core/filesystem.py
async def modify_xattr_attribute(self, attribute: str, value: bytes):
    """
    Modify the extended file attributes ("xattrs") for the filesystem entry.

    :param attribute:
    :param value:
    """
    if self.xattrs is None:
        self.xattrs = dict()
    self.xattrs[attribute] = value
    if self.resource is None:
        return
    await self.resource.save()

get_path(self) async

Get a folder's path, with the FilesystemRoot as the path root.

Returns:

Type Description
str

The full path name, with the FilesystemRoot ancestor as the path root

Source code in ofrak/core/filesystem.py
async def get_path(self) -> str:
    """
    Get a folder's path, with the `FilesystemRoot` as the path root.

    :return: The full path name, with the `FilesystemRoot` ancestor as the path root
    """
    path = [self.get_name()]

    for a in await self.resource.get_ancestors(
        r_filter=ResourceFilter(
            tags=(FilesystemEntry, FilesystemRoot),
            tags_condition=ResourceFilterCondition.OR,
        )
    ):
        if (a is None) or (a.has_tag(FilesystemRoot)):
            break
        a_view = await a.view_as(FilesystemEntry)
        path.append(a_view.get_name())
    path.reverse()

    return os.path.join(*path)

apply_stat_attrs(self, path)

Set file mode and access times of a path on disk to match the attributes stored on this resource.

Parameters:

Name Type Description Default
path str

Path on disk to set attributes of.

required
Source code in ofrak/core/filesystem.py
def apply_stat_attrs(self, path: str):
    """
    Set file mode and access times of a path on disk to match the attributes stored on this
    resource.

    :param path: Path on disk to set attributes of.
    """
    if self.stat:
        os.chown(path, self.stat.st_uid, self.stat.st_gid)
        os.chmod(path, self.stat.st_mode)
        os.utime(path, (self.stat.st_atime, self.stat.st_mtime))
    if self.xattrs:
        for attr, value in self.xattrs.items():
            xattr.setxattr(path, attr, value)

FilesystemRoot (ResourceView) dataclass

A resource that contains a filesystem's file tree. All descendant resources are stored in a tree structure that reflects a filesystem's file tree. The methods within this class are intended to be used as utilities when unpacking any filesystem-like resource.

Any resource that contains a file tree should inherit the FilesystemRoot class.

flush_to_disk(self, path=None) async

Writes this FilesystemRoot's FilesystemEntry descendants to directory. If a target path is not provided, the output is written to a temporary directory.

Returns:

Type Description

the root directory containing the flushed filesystem

Source code in ofrak/core/filesystem.py
async def flush_to_disk(
    self,
    path: Optional[str] = None,
):
    """
    Writes this `FilesystemRoot`'s `FilesystemEntry` descendants to directory. If a target path
    is not provided, the output is written to a temporary directory.

    :return: the root directory containing the flushed filesystem
    """
    if path is None:
        root_path = tempfile.mkdtemp()
    else:
        root_path = path

    entries = [
        f
        for f in await self.resource.get_children_as_view(
            FilesystemEntry, r_filter=ResourceFilter(tags=(FilesystemEntry,))
        )
    ]
    while len(entries) > 0:
        entry = entries.pop(0)
        if entry.is_folder():
            for child in await entry.resource.get_children_as_view(
                FilesystemEntry, r_filter=ResourceFilter(tags=(FilesystemEntry,))
            ):
                entries.append(child)
        await entry.flush_to_disk(root_path=root_path)

    return root_path

get_entry(self, path) async

Searches this FilesystemRoot's descendants for a filesystem entry with a given path, and returns that entry if found.

Parameters:

Name Type Description Default
path str

the path of the FilesystemEntry to search for, with this FilesystemRoot as the path root

required

Returns:

Type Description

the descendant FilesystemEntry, if found; otherwise, returns None

Source code in ofrak/core/filesystem.py
async def get_entry(self, path: str):
    """
    Searches this `FilesystemRoot`'s descendants for a filesystem entry with a given path,
        and returns that entry if found.

    :param path: the path of the `FilesystemEntry` to search for, with this `FilesystemRoot` as
    the path root

    :return: the descendant `FilesystemEntry`, if found; otherwise, returns `None`
    """
    basename = os.path.basename(path)
    # only searching paths with the same base name should reduce the search space by quite a lot
    descendants = await self.resource.get_descendants_as_view(
        FilesystemEntry,
        r_filter=ResourceFilter(
            attribute_filters=(ResourceAttributeValueFilter(FilesystemEntry.Name, basename),)
        ),
    )

    for d in descendants:
        if await d.get_path() == os.path.normpath(path):
            return d
    return None

list_dir(self) async

Enumerates a FilesystemRoot's children, much like os.listdir.

Returns:

Type Description
dict

a dictionary of child entries, with the child's name as the key

Source code in ofrak/core/filesystem.py
async def list_dir(self) -> dict:
    """
    Enumerates a `FilesystemRoot`'s children, much like `os.listdir`.

    :return: a dictionary of child entries, with the child's name as the key
    """
    entries = dict()
    for c in await self.resource.get_children_as_view(FilesystemEntry):
        entries[c.get_name()] = c
    return entries

add_folder(self, path, folder_stat_result=None, folder_xattrs=None, tags=(), attributes=()) async

Adds a Folder resource to a FilesystemRoot, creating all parent folders as needed.

Parameters:

Name Type Description Default
path str

the path that will contain the folder to be added

required
folder_stat_result Optional[os.stat_result]

the filesystem attributes associated with the folder

None
folder_xattrs Optional[Dict[str, bytes]]

xattrs for the folder

None
tags Iterable[ofrak.model.tag_model.ResourceTag]

the list of tags to be added to the new resource. The Folder tag is added by default

()
attributes Iterable[ofrak.model.resource_model.ResourceAttributes]

the list of additional attributes to be added to the new folder, the folder's name attribute is added automatically

()

Returns:

Type Description
Folder

the Folder resource that was added to the FilesystemRoot

Exceptions:

Type Description
ValueError

if the path is too short and doesn't actually include any directories

Source code in ofrak/core/filesystem.py
async def add_folder(
    self,
    path: str,
    folder_stat_result: Optional[os.stat_result] = None,
    folder_xattrs: Optional[Dict[str, bytes]] = None,
    tags: Iterable[ResourceTag] = (),
    attributes: Iterable[ResourceAttributes] = (),
) -> Folder:
    """
    Adds a [Folder][ofrak.core.filesystem.Folder] resource to a `FilesystemRoot`, creating all
    parent folders as needed.

    :param path: the path that will contain the folder to be added
    :param folder_stat_result: the filesystem attributes associated with the folder
    :param folder_xattrs: xattrs for the folder
    :param tags: the list of tags to be added to the new resource. The `Folder` tag is added by
    default
    :param attributes: the list of additional attributes to be added to the new folder, the
    folder's name attribute is added automatically

    :raises ValueError: if the path is too short and doesn't actually include any directories

    :return: the `Folder` resource that was added to the `FilesystemRoot`
    """
    # Normalizes and cleans up paths beginning with "./" and containing "./../" as well as
    # other extraneous separators
    split_dir = os.path.normpath(path).rstrip("/").lstrip("/").split("/")

    parent: Union[FilesystemRoot, Folder] = self
    for directory in split_dir:
        folder_entries = await parent.list_dir()

        if directory not in folder_entries.keys():
            new_missing_folder = await parent.resource.create_child_from_view(
                Folder(directory, folder_stat_result, folder_xattrs),
                data=b"",
                additional_tags=tags,
                additional_attributes=attributes,
            )
            parent = await new_missing_folder.view_as(Folder)
        else:
            parent = await folder_entries[directory].resource.view_as(Folder)

    if type(parent) is FilesystemRoot:
        assert len(split_dir) == 0  # Only case this should happen
        raise ValueError(f"The path {path} is too short (no directories)")

    if not isinstance(parent, Folder):
        raise ValueError(
            f"Parent folder {parent} has an unexpected type {type(parent)}. It "
            f"should be a Folder instead."
        )
    return parent

add_file(self, path, data, file_stat_result=None, file_xattrs=None, tags=(), attributes=()) async

Adds a File resource to a FilesystemRoot, creating all parent Folders as needed.

Parameters:

Name Type Description Default
path str

the path that will contain the File to be added

required
data bytes

contents of the file being added

required
file_stat_result Optional[os.stat_result]

the filesystem attributes associated with the file

None
file_xattrs Optional[Dict[str, bytes]]

xattrs for the file

None
tags Iterable[ofrak.model.tag_model.ResourceTag]

the list of tags to be added to the new resource, the File tag is added by default

()
attributes Iterable[ofrak.model.resource_model.ResourceAttributes]

the list of additional attributes to be added to the new Folder, the file's name attribute is added automatically

()

Returns:

Type Description
Resource

the File resource that was added to the FilesystemRoot

Source code in ofrak/core/filesystem.py
async def add_file(
    self,
    path: str,
    data: bytes,
    file_stat_result: Optional[os.stat_result] = None,
    file_xattrs: Optional[Dict[str, bytes]] = None,
    tags: Iterable[ResourceTag] = (),
    attributes: Iterable[ResourceAttributes] = (),
) -> Resource:
    """
    Adds a [File][ofrak.core.filesystem.File] resource to a `FilesystemRoot`, creating all
    parent [Folders][ofrak.core.filesystem.Folder] as needed.

    :param path: the path that will contain the `File` to be added
    :param data: contents of the file being added
    :param file_stat_result: the filesystem attributes associated with the file
    :param file_xattrs: xattrs for the file
    :param tags: the list of tags to be added to the new resource, the File tag is added by
    default
    :param attributes: the list of additional attributes to be added to the new Folder,
        the file's name attribute is added automatically

    :return: the `File` resource that was added to the `FilesystemRoot`
    """
    dirname = os.path.dirname(path)
    filename = os.path.basename(path)

    if dirname == "":
        parent_folder = self
    else:
        parent_folder = await self.get_entry(dirname)
        if parent_folder is None:
            parent_folder = await self.add_folder(dirname)

    new_file = await parent_folder.resource.create_child_from_view(
        File(filename, file_stat_result, file_xattrs),
        data=data,
        additional_tags=tags,
        additional_attributes=attributes,
    )
    return new_file

remove_file(self, path) async

Removes a File resource from a FilesystemRoot.

Parameters:

Name Type Description Default
path str

the path of the file to be removed

required

Returns:

Type Description
None

None

Source code in ofrak/core/filesystem.py
async def remove_file(self, path: str) -> None:
    """
    Removes a [File][ofrak.core.filesystem.File] resource from a `FilesystemRoot`.
    :param path: the path of the file to be removed
    :return: None
    """
    file_to_remove = await self.get_entry(path)
    await file_to_remove.resource.delete()
    await file_to_remove.resource.save()

add_special_file_entry(self, path, special_file_view, tags=(), attributes=()) async

Adds a resource representing a SpecialFileType to a FilesystemRoot, creating all parent Folders as needed.

Some examples of these "special" types are SymbolicLink and BlockDevice.

Parameters:

Name Type Description Default
path str

The path of the FilesystemEntry to be added

required
special_file_view SpecialFileType

A ResourceView, whose type should be a subclass of FilesystemEntry

required
tags Iterable[ofrak.model.tag_model.ResourceTag]

the list of tags to be added to the new resource, the File tag is added by default

()
attributes Iterable[ofrak.model.resource_model.ResourceAttributes]

the list of additional attributes to be added to the new Folder, the file's name attribute is added automatically

()

Returns:

Type Description
Resource

The special FilesystemEntry resource that was added to the FilesystemRoot

Source code in ofrak/core/filesystem.py
async def add_special_file_entry(
    self,
    path: str,
    special_file_view: SpecialFileType,
    tags: Iterable[ResourceTag] = (),
    attributes: Iterable[ResourceAttributes] = (),
) -> Resource:
    """
    Adds a resource representing a [SpecialFileType][ofrak.core.filesystem.SpecialFileType]
    to a `FilesystemRoot`, creating all parent [Folders][ofrak.core.filesystem.Folder] as
    needed.

    Some examples of these "special" types are
    [SymbolicLink][ofrak.core.filesystem.SymbolicLink] and
    [BlockDevice][ofrak.core.filesystem.BlockDevice].

    :param path: The path of the `FilesystemEntry` to be added
    :param special_file_view: A ResourceView, whose type should be a subclass of
    `FilesystemEntry`
    :param tags: the list of tags to be added to the new resource, the File tag is added by
    default
    :param attributes: the list of additional attributes to be added to the new Folder,
    the file's name attribute is added automatically

    :return: The special `FilesystemEntry` resource that was added to the `FilesystemRoot`
    """
    dirname = os.path.dirname(path)

    if dirname == "":
        parent_folder = self
    else:
        parent_folder = await self.get_entry(dirname)
        if parent_folder is None:
            parent_folder = await self.add_folder(dirname)

    new_entry = await parent_folder.resource.create_child_from_view(
        special_file_view,
        data=b"",  # Use empty, non-None data
        additional_tags=tags,
        additional_attributes=attributes,
    )
    return new_entry

Folder (FilesystemEntry)

Describes a folder that is stored in a filesystem as a file tree.

All descendant resources are stored in a tree structure that reflects a folder/directory's file tree.

get_entry(self, path) async

Search a folder for an entry with the given path.

Parameters:

Name Type Description Default
path str

The filesystem path to search for, relative to this folder

required

Returns:

Type Description
Optional[ofrak.core.filesystem.FilesystemEntry]

The child FilesystemEntry resource that was found. If nothing was found, None is returned

Source code in ofrak/core/filesystem.py
async def get_entry(self, path: str) -> Optional[FilesystemEntry]:
    """
    Search a folder for an entry with the given path.

    :param path: The filesystem path to search for, relative to this folder

    :return: The child `FilesystemEntry` resource that was found. If nothing was found, `None`
    is returned
    """
    basename = os.path.basename(path)
    # only searching paths with the same base name should reduce the search space by quite a lot
    descendants = await self.resource.get_descendants_as_view(
        FilesystemEntry,
        r_filter=ResourceFilter(
            attribute_filters=(ResourceAttributeValueFilter(FilesystemEntry.Name, basename),)
        ),
    )

    for d in descendants:
        descendant_path = await d.get_path()
        if descendant_path.split(f"{self.name}/")[-1] == path:
            return d
    return None

list_dir(self) async

Enumerate a folder's children, much like os.listdir.

Returns:

Type Description
Dict[str, ofrak.core.filesystem.FilesystemEntry]

A dictionary of child entries, with the child's name as the key

Source code in ofrak/core/filesystem.py
async def list_dir(self) -> Dict[str, FilesystemEntry]:
    """
    Enumerate a folder's children, much like `os.listdir`.

    :return: A dictionary of child entries, with the child's name as the key
    """
    entries = dict()
    for c in await self.resource.get_children_as_view(FilesystemEntry):
        entries[c.get_name()] = c
    return entries

SpecialFileType (FilesystemEntry) dataclass

A filesystem entry that is not a simple type, like a file or folder. For example, symbolic links and block devices fall under this category.

Symbolic link pointing to a (possibly invalid) path in the filesystem. The path pointed to is invalid if accessing it would raise a FileNotFoundError.

Attributes:

Name Type Description
source_path str

File pointed to by the symbolic link