filesystem.py
ofrak.core.filesystem
BlockDevice (SpecialFileType)
dataclass
Block special device file.
CharacterDevice (SpecialFileType)
dataclass
Character special device file.
FIFOPipe (SpecialFileType)
dataclass
Named pipe.
File (FilesystemEntry)
Stores the data and location of a file within a filesystem or folder's descendant file tree.
FilesystemEntry (ResourceView)
dataclass
Handles generic management of any entry stored within a filesystem.
get_name(self)
Get the base name of a folder.
Returns:
Type | Description |
---|---|
str |
The file or folder's base name |
Source code in ofrak/core/filesystem.py
def get_name(self) -> str:
"""
Get the base name of a folder.
:return: The file or folder's base name
"""
return self.Name
set_stat(self, stat_result)
async
Set the stat for the FilesystemEntry
. Useful for newly created files where we want to
control the full 10-tuple.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stat_result |
stat_result |
|
required |
Source code in ofrak/core/filesystem.py
async def set_stat(self, stat_result: os.stat_result):
"""
Set the stat for the `FilesystemEntry`. Useful for newly created files where we want to
control the full 10-tuple.
:param stat_result: `os.stat_result` object containing all necessary values
"""
self.stat = stat_result
if self.resource is None:
return
all_view_attrs: Dict[
Type[ResourceAttributes], ResourceAttributes
] = self.get_attributes_instances()
filesystem_attrs = all_view_attrs[AttributesType[FilesystemEntry]]
self.resource.add_attributes(filesystem_attrs)
await self.resource.save()
modify_stat_attribute(self, st_stat, stat_value)
async
Modify a specific os.stat
attribute on the filesystem entry.
Example:
fs_entry.modify_stat_attribute(stat.ST_MODE, 0o100755)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
st_stat |
int |
The |
required |
stat_value |
int |
The new stat value |
required |
Source code in ofrak/core/filesystem.py
async def modify_stat_attribute(self, st_stat: int, stat_value: int):
"""
Modify a specific `os.stat` attribute on the filesystem entry.
Example:
```python
fs_entry.modify_stat_attribute(stat.ST_MODE, 0o100755)
```
:param st_stat: The `st_stat` struct member to be modified
:param stat_value: The new stat value
"""
if self.stat is None:
raise ValueError("Cannot modify a stat attribute when stat attributes are not set")
stat_attributes = list(self.stat) # type: ignore
stat_attributes[st_stat] = stat_value
filesystem_stat_attributes = os.stat_result(tuple(stat_attributes))
await self.set_stat(filesystem_stat_attributes)
modify_xattr_attribute(self, attribute, value)
async
Modify the extended file attributes ("xattrs") for the filesystem entry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
attribute |
str |
required | |
value |
bytes |
required |
Source code in ofrak/core/filesystem.py
async def modify_xattr_attribute(self, attribute: str, value: bytes):
"""
Modify the extended file attributes ("xattrs") for the filesystem entry.
:param attribute:
:param value:
"""
if self.xattrs is None:
self.xattrs = dict()
self.xattrs[attribute] = value
if self.resource is None:
return
await self.resource.save()
get_path(self)
async
Get a folder's path, with the FilesystemRoot
as the path root.
Returns:
Type | Description |
---|---|
str |
The full path name, with the |
Source code in ofrak/core/filesystem.py
async def get_path(self) -> str:
"""
Get a folder's path, with the `FilesystemRoot` as the path root.
:return: The full path name, with the `FilesystemRoot` ancestor as the path root
"""
path = [self.get_name()]
for a in await self.resource.get_ancestors(
r_filter=ResourceFilter(
tags=(FilesystemEntry, FilesystemRoot),
tags_condition=ResourceFilterCondition.OR,
)
):
if (a is None) or (a.has_tag(FilesystemRoot)):
break
a_view = await a.view_as(FilesystemEntry)
path.append(a_view.get_name())
path.reverse()
return os.path.join(*path)
apply_stat_attrs(self, path)
Set file mode and access times of a path on disk to match the attributes stored on this resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Path on disk to set attributes of. |
required |
Source code in ofrak/core/filesystem.py
def apply_stat_attrs(self, path: str):
"""
Set file mode and access times of a path on disk to match the attributes stored on this
resource.
:param path: Path on disk to set attributes of.
"""
if self.stat:
os.chown(path, self.stat.st_uid, self.stat.st_gid)
os.chmod(path, self.stat.st_mode)
os.utime(path, (self.stat.st_atime, self.stat.st_mtime))
if self.xattrs:
for attr, value in self.xattrs.items():
xattr.setxattr(path, attr, value)
FilesystemRoot (ResourceView)
dataclass
A resource that contains a filesystem's file tree. All descendant resources are stored in a tree structure that reflects a filesystem's file tree. The methods within this class are intended to be used as utilities when unpacking any filesystem-like resource.
Any resource that contains a file tree should inherit the FilesystemRoot
class.
flush_to_disk(self, path=None)
async
Writes this FilesystemRoot
's FilesystemEntry
descendants to directory. If a target path
is not provided, the output is written to a temporary directory.
Returns:
Type | Description |
---|---|
the root directory containing the flushed filesystem |
Source code in ofrak/core/filesystem.py
async def flush_to_disk(
self,
path: Optional[str] = None,
):
"""
Writes this `FilesystemRoot`'s `FilesystemEntry` descendants to directory. If a target path
is not provided, the output is written to a temporary directory.
:return: the root directory containing the flushed filesystem
"""
if path is None:
root_path = tempfile.mkdtemp()
else:
root_path = path
entries = [
f
for f in await self.resource.get_children_as_view(
FilesystemEntry, r_filter=ResourceFilter(tags=(FilesystemEntry,))
)
]
while len(entries) > 0:
entry = entries.pop(0)
if entry.is_folder():
for child in await entry.resource.get_children_as_view(
FilesystemEntry, r_filter=ResourceFilter(tags=(FilesystemEntry,))
):
entries.append(child)
await entry.flush_to_disk(root_path=root_path)
return root_path
get_entry(self, path)
async
Searches this FilesystemRoot
's descendants for a filesystem entry with a given path,
and returns that entry if found.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
the path of the |
required |
Returns:
Type | Description |
---|---|
the descendant |
Source code in ofrak/core/filesystem.py
async def get_entry(self, path: str):
"""
Searches this `FilesystemRoot`'s descendants for a filesystem entry with a given path,
and returns that entry if found.
:param path: the path of the `FilesystemEntry` to search for, with this `FilesystemRoot` as
the path root
:return: the descendant `FilesystemEntry`, if found; otherwise, returns `None`
"""
basename = os.path.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
r_filter=ResourceFilter(
attribute_filters=(ResourceAttributeValueFilter(FilesystemEntry.Name, basename),)
),
)
for d in descendants:
if await d.get_path() == os.path.normpath(path):
return d
return None
list_dir(self)
async
Enumerates a FilesystemRoot
's children, much like os.listdir
.
Returns:
Type | Description |
---|---|
dict |
a dictionary of child entries, with the child's name as the key |
Source code in ofrak/core/filesystem.py
async def list_dir(self) -> dict:
"""
Enumerates a `FilesystemRoot`'s children, much like `os.listdir`.
:return: a dictionary of child entries, with the child's name as the key
"""
entries = dict()
for c in await self.resource.get_children_as_view(FilesystemEntry):
entries[c.get_name()] = c
return entries
add_folder(self, path, folder_stat_result=None, folder_xattrs=None, tags=(), attributes=())
async
Adds a Folder resource to a FilesystemRoot
, creating all
parent folders as needed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
the path that will contain the folder to be added |
required |
folder_stat_result |
Optional[os.stat_result] |
the filesystem attributes associated with the folder |
None |
folder_xattrs |
Optional[Dict[str, bytes]] |
xattrs for the folder |
None |
tags |
Iterable[ofrak.model.tag_model.ResourceTag] |
the list of tags to be added to the new resource. The |
() |
attributes |
Iterable[ofrak.model.resource_model.ResourceAttributes] |
the list of additional attributes to be added to the new folder, the folder's name attribute is added automatically |
() |
Returns:
Type | Description |
---|---|
Folder |
the |
Exceptions:
Type | Description |
---|---|
ValueError |
if the path is too short and doesn't actually include any directories |
Source code in ofrak/core/filesystem.py
async def add_folder(
self,
path: str,
folder_stat_result: Optional[os.stat_result] = None,
folder_xattrs: Optional[Dict[str, bytes]] = None,
tags: Iterable[ResourceTag] = (),
attributes: Iterable[ResourceAttributes] = (),
) -> Folder:
"""
Adds a [Folder][ofrak.core.filesystem.Folder] resource to a `FilesystemRoot`, creating all
parent folders as needed.
:param path: the path that will contain the folder to be added
:param folder_stat_result: the filesystem attributes associated with the folder
:param folder_xattrs: xattrs for the folder
:param tags: the list of tags to be added to the new resource. The `Folder` tag is added by
default
:param attributes: the list of additional attributes to be added to the new folder, the
folder's name attribute is added automatically
:raises ValueError: if the path is too short and doesn't actually include any directories
:return: the `Folder` resource that was added to the `FilesystemRoot`
"""
# Normalizes and cleans up paths beginning with "./" and containing "./../" as well as
# other extraneous separators
split_dir = os.path.normpath(path).rstrip("/").lstrip("/").split("/")
parent: Union[FilesystemRoot, Folder] = self
for directory in split_dir:
folder_entries = await parent.list_dir()
if directory not in folder_entries.keys():
new_missing_folder = await parent.resource.create_child_from_view(
Folder(directory, folder_stat_result, folder_xattrs),
data=b"",
additional_tags=tags,
additional_attributes=attributes,
)
parent = await new_missing_folder.view_as(Folder)
else:
parent = await folder_entries[directory].resource.view_as(Folder)
if type(parent) is FilesystemRoot:
assert len(split_dir) == 0 # Only case this should happen
raise ValueError(f"The path {path} is too short (no directories)")
if not isinstance(parent, Folder):
raise ValueError(
f"Parent folder {parent} has an unexpected type {type(parent)}. It "
f"should be a Folder instead."
)
return parent
add_file(self, path, data, file_stat_result=None, file_xattrs=None, tags=(), attributes=())
async
Adds a File resource to a FilesystemRoot
, creating all
parent Folders as needed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
the path that will contain the |
required |
data |
bytes |
contents of the file being added |
required |
file_stat_result |
Optional[os.stat_result] |
the filesystem attributes associated with the file |
None |
file_xattrs |
Optional[Dict[str, bytes]] |
xattrs for the file |
None |
tags |
Iterable[ofrak.model.tag_model.ResourceTag] |
the list of tags to be added to the new resource, the File tag is added by default |
() |
attributes |
Iterable[ofrak.model.resource_model.ResourceAttributes] |
the list of additional attributes to be added to the new Folder, the file's name attribute is added automatically |
() |
Returns:
Type | Description |
---|---|
Resource |
the |
Source code in ofrak/core/filesystem.py
async def add_file(
self,
path: str,
data: bytes,
file_stat_result: Optional[os.stat_result] = None,
file_xattrs: Optional[Dict[str, bytes]] = None,
tags: Iterable[ResourceTag] = (),
attributes: Iterable[ResourceAttributes] = (),
) -> Resource:
"""
Adds a [File][ofrak.core.filesystem.File] resource to a `FilesystemRoot`, creating all
parent [Folders][ofrak.core.filesystem.Folder] as needed.
:param path: the path that will contain the `File` to be added
:param data: contents of the file being added
:param file_stat_result: the filesystem attributes associated with the file
:param file_xattrs: xattrs for the file
:param tags: the list of tags to be added to the new resource, the File tag is added by
default
:param attributes: the list of additional attributes to be added to the new Folder,
the file's name attribute is added automatically
:return: the `File` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
filename = os.path.basename(path)
if dirname == "":
parent_folder = self
else:
parent_folder = await self.get_entry(dirname)
if parent_folder is None:
parent_folder = await self.add_folder(dirname)
new_file = await parent_folder.resource.create_child_from_view(
File(filename, file_stat_result, file_xattrs),
data=data,
additional_tags=tags,
additional_attributes=attributes,
)
return new_file
remove_file(self, path)
async
Removes a File resource from a FilesystemRoot
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
the path of the file to be removed |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in ofrak/core/filesystem.py
async def remove_file(self, path: str) -> None:
"""
Removes a [File][ofrak.core.filesystem.File] resource from a `FilesystemRoot`.
:param path: the path of the file to be removed
:return: None
"""
file_to_remove = await self.get_entry(path)
await file_to_remove.resource.delete()
await file_to_remove.resource.save()
add_special_file_entry(self, path, special_file_view, tags=(), attributes=())
async
Adds a resource representing a SpecialFileType
to a FilesystemRoot
, creating all parent Folders as
needed.
Some examples of these "special" types are SymbolicLink and BlockDevice.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path of the |
required |
special_file_view |
SpecialFileType |
A ResourceView, whose type should be a subclass of |
required |
tags |
Iterable[ofrak.model.tag_model.ResourceTag] |
the list of tags to be added to the new resource, the File tag is added by default |
() |
attributes |
Iterable[ofrak.model.resource_model.ResourceAttributes] |
the list of additional attributes to be added to the new Folder, the file's name attribute is added automatically |
() |
Returns:
Type | Description |
---|---|
Resource |
The special |
Source code in ofrak/core/filesystem.py
async def add_special_file_entry(
self,
path: str,
special_file_view: SpecialFileType,
tags: Iterable[ResourceTag] = (),
attributes: Iterable[ResourceAttributes] = (),
) -> Resource:
"""
Adds a resource representing a [SpecialFileType][ofrak.core.filesystem.SpecialFileType]
to a `FilesystemRoot`, creating all parent [Folders][ofrak.core.filesystem.Folder] as
needed.
Some examples of these "special" types are
[SymbolicLink][ofrak.core.filesystem.SymbolicLink] and
[BlockDevice][ofrak.core.filesystem.BlockDevice].
:param path: The path of the `FilesystemEntry` to be added
:param special_file_view: A ResourceView, whose type should be a subclass of
`FilesystemEntry`
:param tags: the list of tags to be added to the new resource, the File tag is added by
default
:param attributes: the list of additional attributes to be added to the new Folder,
the file's name attribute is added automatically
:return: The special `FilesystemEntry` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
if dirname == "":
parent_folder = self
else:
parent_folder = await self.get_entry(dirname)
if parent_folder is None:
parent_folder = await self.add_folder(dirname)
new_entry = await parent_folder.resource.create_child_from_view(
special_file_view,
data=b"", # Use empty, non-None data
additional_tags=tags,
additional_attributes=attributes,
)
return new_entry
Folder (FilesystemEntry)
Describes a folder that is stored in a filesystem as a file tree.
All descendant resources are stored in a tree structure that reflects a folder/directory's file tree.
get_entry(self, path)
async
Search a folder for an entry with the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The filesystem path to search for, relative to this folder |
required |
Returns:
Type | Description |
---|---|
Optional[ofrak.core.filesystem.FilesystemEntry] |
The child |
Source code in ofrak/core/filesystem.py
async def get_entry(self, path: str) -> Optional[FilesystemEntry]:
"""
Search a folder for an entry with the given path.
:param path: The filesystem path to search for, relative to this folder
:return: The child `FilesystemEntry` resource that was found. If nothing was found, `None`
is returned
"""
basename = os.path.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
r_filter=ResourceFilter(
attribute_filters=(ResourceAttributeValueFilter(FilesystemEntry.Name, basename),)
),
)
for d in descendants:
descendant_path = await d.get_path()
if descendant_path.split(f"{self.name}/")[-1] == path:
return d
return None
list_dir(self)
async
Enumerate a folder's children, much like os.listdir
.
Returns:
Type | Description |
---|---|
Dict[str, ofrak.core.filesystem.FilesystemEntry] |
A dictionary of child entries, with the child's name as the key |
Source code in ofrak/core/filesystem.py
async def list_dir(self) -> Dict[str, FilesystemEntry]:
"""
Enumerate a folder's children, much like `os.listdir`.
:return: A dictionary of child entries, with the child's name as the key
"""
entries = dict()
for c in await self.resource.get_children_as_view(FilesystemEntry):
entries[c.get_name()] = c
return entries
SpecialFileType (FilesystemEntry)
dataclass
A filesystem entry that is not a simple type, like a file or folder. For example, symbolic links and block devices fall under this category.
SymbolicLink (SpecialFileType)
dataclass
Symbolic link pointing to a (possibly invalid) path in the filesystem. The path pointed to is
invalid if accessing it would raise a FileNotFoundError
.
Attributes:
Name | Type | Description |
---|---|---|
source_path |
str |
File pointed to by the symbolic link |