data_service.py
ofrak.service.data_service
DataService (DataServiceInterface)
create_root(self, data_id, data)
async
Create a root data model with its own data bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
Unique ID for the new data model |
required |
data |
bytes |
Binary data belonging to the new data model |
required |
Returns:
Type | Description |
---|---|
DataModel |
The new data model object |
Exceptions:
Type | Description |
---|---|
AlreadyExistError |
if |
Source code in ofrak/service/data_service.py
async def create_root(self, data_id: DataId, data: bytes) -> DataModel:
if data_id in self._model_store:
raise AlreadyExistError(f"A model with {data_id.hex()} already exists!")
new_model = DataModel(data_id, Range(0, len(data)), data_id)
self._model_store[data_id] = new_model
self._roots[data_id] = _DataRoot(new_model, data)
return new_model
create_mapped(self, data_id, parent_id, range_in_parent)
async
Create a new data model which is mapped into another data model. That is, it does not hold
its own data, but defines its own data as a subsection of another model's data. The model
it maps from (parent_id
) may be a root model or another mapped model; if parent_id
is
another mapped node, the new mapped node created here will be mapped to the same root as
parent_id
at a range translated to be within parent_id
as defined by range_in_parent
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
Unique ID for the new data model |
required |
parent_id |
bytes |
ID of the data model to map the new model into |
required |
range_in_parent |
Range |
Range in |
required |
Returns:
Type | Description |
---|---|
DataModel |
The new data model object |
Exceptions:
Type | Description |
---|---|
AlreadyExistError |
if |
NotFoundError |
if |
Source code in ofrak/service/data_service.py
async def create_mapped(
self,
data_id: DataId,
parent_id: DataId,
range_in_parent: Range,
) -> DataModel:
if data_id in self._model_store:
raise AlreadyExistError(f"A model with {data_id.hex()} already exists!")
parent_model = self._get_by_id(parent_id)
range_in_root = range_in_parent.translate(parent_model.range.start)
if range_in_root.end > parent_model.range.end:
raise OutOfBoundError(
f"Cannot map a new node into range {range_in_root} into {parent_model.range} of "
f"{parent_id.hex()}"
)
new_model = DataModel(data_id, range_in_root, parent_model.root_id)
self._roots[parent_model.root_id].add_mapped_model(new_model)
self._model_store[data_id] = new_model
return new_model
get_by_id(self, data_id)
async
Get the data model object associated with the given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
A unique ID for a data model |
required |
Returns:
Type | Description |
---|---|
DataModel |
The model associated with |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if |
Source code in ofrak/service/data_service.py
async def get_by_id(self, data_id: DataId) -> DataModel:
return self._get_by_id(data_id)
get_by_ids(self, data_ids)
async
Get the data models object associated with the given IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_ids |
Iterable[bytes] |
Multiple unique IDs for data models |
required |
Returns:
Type | Description |
---|---|
Iterable[ofrak.model.data_model.DataModel] |
The models associated with each ID in |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if any ID in |
Source code in ofrak/service/data_service.py
async def get_by_ids(self, data_ids: Iterable[DataId]) -> Iterable[DataModel]:
return [self._get_by_id(data_id) for data_id in data_ids]
get_data_length(self, data_id)
async
Return the length of a single data model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
A unique ID for a data model |
required |
Returns:
Type | Description |
---|---|
int |
The length of the data included in the model |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if |
Source code in ofrak/service/data_service.py
async def get_data_length(self, data_id: DataId) -> int:
return self._get_by_id(data_id).range.length()
get_data_range_within_root(self, data_id)
async
Get the range that a model maps in its root. If the model specified by data_id
is itself
a root, returns a range covering that whole root (i.e. Range(0, length)).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
A unique ID for a data model |
required |
Returns:
Type | Description |
---|---|
Range |
Range that |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if |
Source code in ofrak/service/data_service.py
async def get_data_range_within_root(self, data_id: DataId) -> Range:
return self._get_by_id(data_id).range
get_range_within_other(self, data_id, within_data_id)
async
Get the range representing the intersection between two data models, assuming they are both
mapped into the same root data. Either of data_id
or within_data_id
may be roots, but
they cannot both be roots (unless they are the same).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
A unique ID for a data model |
required |
within_data_id |
bytes |
A unique ID for a data model |
required |
Returns:
Type | Description |
---|---|
Range |
The range where |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if |
ValueError |
if |
Source code in ofrak/service/data_service.py
async def get_range_within_other(self, data_id: DataId, within_data_id: DataId) -> Range:
model = self._get_by_id(data_id)
within_model = self._get_by_id(within_data_id)
if data_id == within_data_id:
return Range.from_size(0, model.range.length())
if self._is_root(data_id):
raise ValueError(
f"{data_id.hex()} is a root, not mapped into {within_data_id.hex()} (a root)!"
)
elif self._is_root(within_data_id) and model.root_id != within_model.id:
raise ValueError(f"{data_id.hex()} is not mapped into {within_data_id.hex()} (a root)!")
elif not self._is_root(within_data_id) and model.root_id != within_model.root_id:
raise ValueError(
f"{data_id.hex()} and {within_data_id.hex()} are not mapped into the same root!"
)
else:
return within_model.range.intersect(model.range).translate(-within_model.range.start)
get_data(self, data_id, data_range=None)
async
Get the data (or section of data) of a model. The optional data_range
parameter specifies
which a range within data_id
's data to return; if this range actually falls outside the
boundaries of data_id
's data, an empty bytestring is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
bytes |
A unique ID for a data model |
required |
data_range |
Optional[ofrak_type.range.Range] |
An optional range within the model's data to return |
None |
Returns:
Type | Description |
---|---|
bytes |
Bytes of data from the model associated with |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if |
Source code in ofrak/service/data_service.py
async def get_data(self, data_id: DataId, data_range: Optional[Range] = None) -> bytes:
model = self._get_by_id(data_id)
root = self._get_root_by_id(model.root_id)
if data_range is not None:
translated_range = data_range.translate(model.range.start).intersect(root.model.range)
return root.data[translated_range.start : translated_range.end]
else:
return root.data[model.range.start : model.range.end]
apply_patches(self, patches)
async
Modify the data of a number of models, modeled as a list of DataPatch
structures each
specifying: a target data model (by ID), new data, and a range to overwrite with the new
data. The listed patches are applied in order, so that subsequent patches may effectively
'erase' an earlier patch. Patches may resize data if the new data is not the same size as
the range it is overwriting. Such patches create additional restrictions:
- If
patches
contains a patch that resizes a range of data, no subsequent patch inpatches
is allowed to modify that resized range. - Resizing patches are not allowed to overwrite ranges that contain the borders of any data models. For example, if model B maps Range(0, 6) of model A, a patch that resizes Range(4, 10) of model A is not allowed (whether it increases or decreases the size).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
patches |
List[ofrak.model.data_model.DataPatch] |
A list of patch data structures to be applied, in order |
required |
Returns:
Type | Description |
---|---|
List[ofrak.model.data_model.DataPatchesResult] |
A list of data structures describing all modified ranges of each data model affected by |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if any data ID in the |
PatchOverlapError |
if a patch targets a region of data which has already been modified by a patch which resized that region |
PatchOverlapError |
if a patch would resize a region of data which contains the start or end of one or more data models |
Source code in ofrak/service/data_service.py
async def apply_patches(self, patches: List[DataPatch]) -> List[DataPatchesResult]:
patches_by_root: Dict[DataId, List[DataPatch]] = defaultdict(list)
for patch in patches:
target_data_model = self._get_by_id(patch.data_id)
patches_by_root[target_data_model.root_id].append(patch)
results = []
for root_id, patches_for_root in patches_by_root.items():
results.extend(self._apply_patches_to_root(root_id, patches_for_root))
return results
delete_models(self, data_ids)
async
Delete one or more data models. If a root model is deleted, all models mapped into that root are also deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_ids |
Iterable[bytes] |
Multiple unique IDs for data models |
required |
Exceptions:
Type | Description |
---|---|
NotFoundError |
if any ID in |
Source code in ofrak/service/data_service.py
async def delete_models(self, data_ids: Iterable[DataId]) -> None:
roots_to_delete = dict()
mapped_to_delete = dict()
for data_id in data_ids:
try:
model = self._get_by_id(data_id)
except NotFoundError:
continue
if model.is_mapped():
mapped_to_delete[model.id] = model
else:
roots_to_delete[model.id] = model
for root_model in roots_to_delete.values():
root = self._roots[root_model.id]
for child_model in root.get_children():
mapped_to_delete.pop(child_model.id, None)
del self._model_store[child_model.id]
del self._roots[root_model.id]
del self._model_store[root_model.id]
for model in mapped_to_delete.values():
root = self._get_root_by_id(model.root_id)
root.delete_mapped_model(model)
del self._model_store[model.id]
search(self, data_id, query, start=None, end=None, max_matches=None)
async
Search for some data in one of the models. The query may be a regex pattern (a return value
of re.compile
). If the query is a regex pattern, returns a tuple of pairs with both the
offset of the match and the contents of the match itself. If the query is plain bytes, a
list of only the match offsets are returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_id |
Data model to search |
required | |
query |
Plain bytes to exactly match or a regex pattern to search for |
required | |
start |
Start offset in the data model to begin searching |
None |
|
end |
End offset in the data model to stop searching |
None |
|
max_matches |
Maximum number of matches to return |
None |
Returns:
Type | Description |
---|---|
A tuple of offsets matching a plain bytes query, or a list of (offset, match) pairs for a regex pattern query |
Source code in ofrak/service/data_service.py
async def search(self, data_id, query, start=None, end=None, max_matches=None):
model = self._get_by_id(data_id)
root = self._get_root_by_id(model.root_id)
start = model.range.start if start is None else model.range.start + start
end = model.range.end if end is None else min(model.range.end, model.range.start + end)
if isinstance(query, bytes):
matches = []
while max_matches is None or len(matches) < max_matches:
match_offset = root.data.find(query, start, end)
if match_offset < 0:
break
matches.append(match_offset - model.range.start)
start = match_offset + 1
return tuple(matches)
else:
query = cast(Pattern, query)
match_iterator = query.finditer(root.data, start, end)
if max_matches is not None:
match_iterator = itertools.islice(match_iterator, max_matches)
matches = (
(match.start() - model.range.start, match.group(0)) for match in match_iterator
)
return tuple(matches)
_CompareFirstTuple (tuple, Generic)
private
Wrapper for tuple that ensures only the first item in the tuple is checked.
Necessary because bisect methods don't have a key
function
Helpful for making sorted dictionary-like data structures
__new__(cls, *args)
special
staticmethod
Create and return a new object. See help(type) for accurate signature.
Source code in ofrak/service/data_service.py
def __new__(cls, *args):
return super().__new__(cls, args)
_DataRoot
private
A root data model which may have other data models mapped into it
_shift_grid_axis(axis, shift, merge_func, minimum=None, maximum=None, inclusive=(True, False))
private
staticmethod
Shift a range of values in an axis, without affecting the sorted order of the points in the axis. With two exceptions: - If the minimum shifted point is shifted DOWN exactly enough to be equal to the previous point (which has by definition not been shifted), those two points are allowed to merge - If the maximum shifted point is shifted UP exactly enough to be equal to the next point (which has by definition not been shifted), those two points are allowed to merge
At most one of these can happen when shifting. The merge_func
parameter handles merging
those two points. Since we may be shifting either a row or a column, the merged "points" may
be either columns (if shifting rows) or sets of bytes (if shifting columns).
Source code in ofrak/service/data_service.py
@staticmethod
def _shift_grid_axis(
axis: List[_CompareFirstTuple[T]],
shift: int,
merge_func: Callable[[T, T], T],
minimum: Optional[int] = None,
maximum: Optional[int] = None,
inclusive: Tuple[bool, bool] = (True, False),
) -> Iterable[T]:
"""
Shift a range of values in an axis, without affecting the sorted order of the points in
the axis. With two exceptions:
- If the minimum shifted point is shifted DOWN exactly enough to be equal to the previous
point (which has by definition not been shifted), those two points are allowed to merge
- If the maximum shifted point is shifted UP exactly enough to be equal to the next
point (which has by definition not been shifted), those two points are allowed to merge
At most one of these can happen when shifting. The `merge_func` parameter handles merging
those two points. Since we may be shifting either a row or a column, the merged "points" may
be either columns (if shifting rows) or sets of bytes (if shifting columns).
"""
pre_yield = None
post_yield = None
if minimum is not None:
if inclusive[0]:
min_i = _CompareFirstTuple.bisect_left(axis, minimum)
else:
min_i = _CompareFirstTuple.bisect_right(axis, minimum)
else:
min_i = 0
if 0 < min_i < (len(axis) - 1):
post_shift_min = axis[min_i][0] + shift
if post_shift_min < axis[min_i - 1][0]:
raise _ShiftBreaksSortError(
f"shifting {minimum} to {maximum} by {shift} would collide at the lower range!"
)
elif post_shift_min == axis[min_i - 1][0]:
# will merge the lowest val in shifted range into previous
val1 = axis[min_i - 1][1]
_, pre_yield = axis.pop(min_i)
if maximum is not None:
if inclusive[1]:
max_i = _CompareFirstTuple.bisect_left(axis, maximum)
else:
max_i = _CompareFirstTuple.bisect_right(axis, maximum)
else:
max_i = len(axis)
if 0 < (max_i + 1) < len(axis):
post_shift_max = axis[max_i][0] + shift
if post_shift_max > axis[max_i + 1][0]:
raise _ShiftBreaksSortError(
f"shifting {minimum} to {maximum} by {shift} would collide at the upper range!"
)
elif post_shift_max == axis[max_i + 1][0]:
# will merge the highest val in shifted range into next
val1 = axis[max_i + 1][1]
_, post_yield = axis.pop(max_i)
max_i -= 1
if pre_yield is not None:
yield pre_yield
axis[min_i - 1] = _CompareFirstTuple(post_shift_min, merge_func(val1, pre_yield))
i = min_i
while i < max_i:
old_key, val = axis[i]
axis[i] = _CompareFirstTuple(old_key + shift, val)
yield val
i += 1
if post_yield is not None:
yield post_yield
axis[max_i + 2] = _CompareFirstTuple(post_shift_max, merge_func(val1, post_yield))