Skip to content

data_service.py

ofrak.service.data_service

DataService (DataServiceInterface)

create_root(self, data_id, data) async

Create a root data model with its own data bytes.

Parameters:

Name Type Description Default
data_id bytes

Unique ID for the new data model

required
data bytes

Binary data belonging to the new data model

required

Returns:

Type Description
DataModel

The new data model object

Exceptions:

Type Description
AlreadyExistError

if data_id is already associated with a model

Source code in ofrak/service/data_service.py
async def create_root(self, data_id: DataId, data: bytes) -> DataModel:
    if data_id in self._model_store:
        raise AlreadyExistError(f"A model with {data_id.hex()} already exists!")

    new_model = DataModel(data_id, Range(0, len(data)), data_id)

    self._model_store[data_id] = new_model
    self._roots[data_id] = _DataRoot(new_model, data)

    return new_model

create_mapped(self, data_id, parent_id, range_in_parent) async

Create a new data model which is mapped into another data model. That is, it does not hold its own data, but defines its own data as a subsection of another model's data. The model it maps from (parent_id) may be a root model or another mapped model; if parent_id is another mapped node, the new mapped node created here will be mapped to the same root as parent_id at a range translated to be within parent_id as defined by range_in_parent.

Parameters:

Name Type Description Default
data_id bytes

Unique ID for the new data model

required
parent_id bytes

ID of the data model to map the new model into

required
range_in_parent Range

Range in parent_id which the new model will map

required

Returns:

Type Description
DataModel

The new data model object

Exceptions:

Type Description
AlreadyExistError

if data_id is already associated with a model

NotFoundError

if parent_id is not associated with any known model

Source code in ofrak/service/data_service.py
async def create_mapped(
    self,
    data_id: DataId,
    parent_id: DataId,
    range_in_parent: Range,
) -> DataModel:
    if data_id in self._model_store:
        raise AlreadyExistError(f"A model with {data_id.hex()} already exists!")

    parent_model = self._get_by_id(parent_id)
    range_in_root = range_in_parent.translate(parent_model.range.start)
    if range_in_root.end > parent_model.range.end:
        raise OutOfBoundError(
            f"Cannot map a new node into range {range_in_root} into {parent_model.range} of "
            f"{parent_id.hex()}"
        )

    new_model = DataModel(data_id, range_in_root, parent_model.root_id)
    self._roots[parent_model.root_id].add_mapped_model(new_model)
    self._model_store[data_id] = new_model

    return new_model

get_by_id(self, data_id) async

Get the data model object associated with the given ID.

Parameters:

Name Type Description Default
data_id bytes

A unique ID for a data model

required

Returns:

Type Description
DataModel

The model associated with data_id

Exceptions:

Type Description
NotFoundError

if data_id is not associated with any known model

Source code in ofrak/service/data_service.py
async def get_by_id(self, data_id: DataId) -> DataModel:
    return self._get_by_id(data_id)

get_by_ids(self, data_ids) async

Get the data models object associated with the given IDs.

Parameters:

Name Type Description Default
data_ids Iterable[bytes]

Multiple unique IDs for data models

required

Returns:

Type Description
Iterable[ofrak.model.data_model.DataModel]

The models associated with each ID in data_ids, in the same order their IDs were provided

Exceptions:

Type Description
NotFoundError

if any ID in data_ids is not associated with any known model

Source code in ofrak/service/data_service.py
async def get_by_ids(self, data_ids: Iterable[DataId]) -> Iterable[DataModel]:
    return [self._get_by_id(data_id) for data_id in data_ids]

get_data_length(self, data_id) async

Return the length of a single data model.

Parameters:

Name Type Description Default
data_id bytes

A unique ID for a data model

required

Returns:

Type Description
int

The length of the data included in the model

Exceptions:

Type Description
NotFoundError

if data_id is not associated with any known model

Source code in ofrak/service/data_service.py
async def get_data_length(self, data_id: DataId) -> int:
    return self._get_by_id(data_id).range.length()

get_data_range_within_root(self, data_id) async

Get the range that a model maps in its root. If the model specified by data_id is itself a root, returns a range covering that whole root (i.e. Range(0, length)).

Parameters:

Name Type Description Default
data_id bytes

A unique ID for a data model

required

Returns:

Type Description
Range

Range that data_id maps in its root

Exceptions:

Type Description
NotFoundError

if data_id is not associated with any known model

Source code in ofrak/service/data_service.py
async def get_data_range_within_root(self, data_id: DataId) -> Range:
    return self._get_by_id(data_id).range

get_range_within_other(self, data_id, within_data_id) async

Get the range representing the intersection between two data models, assuming they are both mapped into the same root data. Either of data_id or within_data_id may be roots, but they cannot both be roots (unless they are the same).

Parameters:

Name Type Description Default
data_id bytes

A unique ID for a data model

required
within_data_id bytes

A unique ID for a data model

required

Returns:

Type Description
Range

The range where data_id's model intersects within_data_id's model

Exceptions:

Type Description
NotFoundError

if data_id or within_data_id is not associated with any known model

ValueError

if data_id is not mapped into within_data_id or they do not share the same root

Source code in ofrak/service/data_service.py
async def get_range_within_other(self, data_id: DataId, within_data_id: DataId) -> Range:
    model = self._get_by_id(data_id)
    within_model = self._get_by_id(within_data_id)
    if data_id == within_data_id:
        return Range.from_size(0, model.range.length())
    if self._is_root(data_id):
        raise ValueError(
            f"{data_id.hex()} is a root, not mapped into {within_data_id.hex()} (a root)!"
        )
    elif self._is_root(within_data_id) and model.root_id != within_model.id:
        raise ValueError(f"{data_id.hex()} is not mapped into {within_data_id.hex()} (a root)!")
    elif not self._is_root(within_data_id) and model.root_id != within_model.root_id:
        raise ValueError(
            f"{data_id.hex()} and {within_data_id.hex()} are not mapped into the same root!"
        )
    else:
        return within_model.range.intersect(model.range).translate(-within_model.range.start)

get_data(self, data_id, data_range=None) async

Get the data (or section of data) of a model. The optional data_range parameter specifies which a range within data_id's data to return; if this range actually falls outside the boundaries of data_id's data, an empty bytestring is returned.

Parameters:

Name Type Description Default
data_id bytes

A unique ID for a data model

required
data_range Optional[ofrak_type.range.Range]

An optional range within the model's data to return

None

Returns:

Type Description
bytes

Bytes of data from the model associated with data_id - all bytes by default, a specific slice if data_range is provided, and empty bytes if data_range is provided but is outside the modeled data.

Exceptions:

Type Description
NotFoundError

if data_id is not associated with any known model

Source code in ofrak/service/data_service.py
async def get_data(self, data_id: DataId, data_range: Optional[Range] = None) -> bytes:
    model = self._get_by_id(data_id)
    root = self._get_root_by_id(model.root_id)
    if data_range is not None:
        translated_range = data_range.translate(model.range.start).intersect(root.model.range)
        return root.data[translated_range.start : translated_range.end]
    else:
        return root.data[model.range.start : model.range.end]

apply_patches(self, patches) async

Modify the data of a number of models, modeled as a list of DataPatch structures each specifying: a target data model (by ID), new data, and a range to overwrite with the new data. The listed patches are applied in order, so that subsequent patches may effectively 'erase' an earlier patch. Patches may resize data if the new data is not the same size as the range it is overwriting. Such patches create additional restrictions:

  1. If patches contains a patch that resizes a range of data, no subsequent patch in patches is allowed to modify that resized range.
  2. Resizing patches are not allowed to overwrite ranges that contain the borders of any data models. For example, if model B maps Range(0, 6) of model A, a patch that resizes Range(4, 10) of model A is not allowed (whether it increases or decreases the size).

Parameters:

Name Type Description Default
patches List[ofrak.model.data_model.DataPatch]

A list of patch data structures to be applied, in order

required

Returns:

Type Description
List[ofrak.model.data_model.DataPatchesResult]

A list of data structures describing all modified ranges of each data model affected by patches

Exceptions:

Type Description
NotFoundError

if any data ID in the patches list is not associated with any known model

PatchOverlapError

if a patch targets a region of data which has already been modified by a patch which resized that region

PatchOverlapError

if a patch would resize a region of data which contains the start or end of one or more data models

Source code in ofrak/service/data_service.py
async def apply_patches(self, patches: List[DataPatch]) -> List[DataPatchesResult]:
    patches_by_root: Dict[DataId, List[DataPatch]] = defaultdict(list)
    for patch in patches:
        target_data_model = self._get_by_id(patch.data_id)
        patches_by_root[target_data_model.root_id].append(patch)

    results = []
    for root_id, patches_for_root in patches_by_root.items():
        results.extend(self._apply_patches_to_root(root_id, patches_for_root))

    return results

delete_models(self, data_ids) async

Delete one or more data models. If a root model is deleted, all models mapped into that root are also deleted.

Parameters:

Name Type Description Default
data_ids Iterable[bytes]

Multiple unique IDs for data models

required

Exceptions:

Type Description
NotFoundError

if any ID in data_ids is not associated with any known model

Source code in ofrak/service/data_service.py
async def delete_models(self, data_ids: Iterable[DataId]) -> None:
    roots_to_delete = dict()
    mapped_to_delete = dict()

    for data_id in data_ids:
        try:
            model = self._get_by_id(data_id)
        except NotFoundError:
            continue
        if model.is_mapped():
            mapped_to_delete[model.id] = model
        else:
            roots_to_delete[model.id] = model

    for root_model in roots_to_delete.values():
        root = self._roots[root_model.id]
        for child_model in root.get_children():
            mapped_to_delete.pop(child_model.id, None)
            del self._model_store[child_model.id]

        del self._roots[root_model.id]
        del self._model_store[root_model.id]

    for model in mapped_to_delete.values():
        root = self._get_root_by_id(model.root_id)
        root.delete_mapped_model(model)
        del self._model_store[model.id]

search(self, data_id, query, start=None, end=None, max_matches=None) async

Search for some data in one of the models. The query may be a regex pattern (a return value of re.compile). If the query is a regex pattern, returns a tuple of pairs with both the offset of the match and the contents of the match itself. If the query is plain bytes, a list of only the match offsets are returned.

Parameters:

Name Type Description Default
data_id

Data model to search

required
query

Plain bytes to exactly match or a regex pattern to search for

required
start

Start offset in the data model to begin searching

None
end

End offset in the data model to stop searching

None
max_matches

Maximum number of matches to return

None

Returns:

Type Description

A tuple of offsets matching a plain bytes query, or a list of (offset, match) pairs for a regex pattern query

Source code in ofrak/service/data_service.py
async def search(self, data_id, query, start=None, end=None, max_matches=None):
    model = self._get_by_id(data_id)
    root = self._get_root_by_id(model.root_id)
    start = model.range.start if start is None else model.range.start + start
    end = model.range.end if end is None else min(model.range.end, model.range.start + end)
    if isinstance(query, bytes):
        matches = []
        while max_matches is None or len(matches) < max_matches:
            match_offset = root.data.find(query, start, end)
            if match_offset < 0:
                break

            matches.append(match_offset - model.range.start)
            start = match_offset + 1

        return tuple(matches)
    else:
        query = cast(Pattern, query)
        match_iterator = query.finditer(root.data, start, end)

        if max_matches is not None:
            match_iterator = itertools.islice(match_iterator, max_matches)
        matches = (
            (match.start() - model.range.start, match.group(0)) for match in match_iterator
        )
        return tuple(matches)

_CompareFirstTuple (tuple, Generic) private

Wrapper for tuple that ensures only the first item in the tuple is checked. Necessary because bisect methods don't have a key function Helpful for making sorted dictionary-like data structures

__new__(cls, *args) special staticmethod

Create and return a new object. See help(type) for accurate signature.

Source code in ofrak/service/data_service.py
def __new__(cls, *args):
    return super().__new__(cls, args)

_DataRoot private

A root data model which may have other data models mapped into it

_shift_grid_axis(axis, shift, merge_func, minimum=None, maximum=None, inclusive=(True, False)) private staticmethod

Shift a range of values in an axis, without affecting the sorted order of the points in the axis. With two exceptions: - If the minimum shifted point is shifted DOWN exactly enough to be equal to the previous point (which has by definition not been shifted), those two points are allowed to merge - If the maximum shifted point is shifted UP exactly enough to be equal to the next point (which has by definition not been shifted), those two points are allowed to merge

At most one of these can happen when shifting. The merge_func parameter handles merging those two points. Since we may be shifting either a row or a column, the merged "points" may be either columns (if shifting rows) or sets of bytes (if shifting columns).

Source code in ofrak/service/data_service.py
@staticmethod
def _shift_grid_axis(
    axis: List[_CompareFirstTuple[T]],
    shift: int,
    merge_func: Callable[[T, T], T],
    minimum: Optional[int] = None,
    maximum: Optional[int] = None,
    inclusive: Tuple[bool, bool] = (True, False),
) -> Iterable[T]:
    """
    Shift a range of values in an axis, without affecting the sorted order of the points in
    the axis. With two exceptions:
    - If the minimum shifted point is shifted DOWN exactly enough to be equal to the previous
      point (which has by definition not been shifted), those two points are allowed to merge
    - If the maximum shifted point is shifted UP exactly enough to be equal to the next
      point (which has by definition not been shifted), those two points are allowed to merge

    At most one of these can happen when shifting. The `merge_func` parameter handles merging
    those two points. Since we may be shifting either a row or a column, the merged "points" may
    be either columns (if shifting rows) or sets of bytes (if shifting columns).
    """
    pre_yield = None
    post_yield = None

    if minimum is not None:
        if inclusive[0]:
            min_i = _CompareFirstTuple.bisect_left(axis, minimum)
        else:
            min_i = _CompareFirstTuple.bisect_right(axis, minimum)
    else:
        min_i = 0

    if 0 < min_i < (len(axis) - 1):
        post_shift_min = axis[min_i][0] + shift
        if post_shift_min < axis[min_i - 1][0]:
            raise _ShiftBreaksSortError(
                f"shifting {minimum} to {maximum} by {shift} would collide at the lower range!"
            )
        elif post_shift_min == axis[min_i - 1][0]:
            # will merge the lowest val in shifted range into previous
            val1 = axis[min_i - 1][1]
            _, pre_yield = axis.pop(min_i)

    if maximum is not None:
        if inclusive[1]:
            max_i = _CompareFirstTuple.bisect_left(axis, maximum)
        else:
            max_i = _CompareFirstTuple.bisect_right(axis, maximum)
    else:
        max_i = len(axis)

    if 0 < (max_i + 1) < len(axis):
        post_shift_max = axis[max_i][0] + shift
        if post_shift_max > axis[max_i + 1][0]:
            raise _ShiftBreaksSortError(
                f"shifting {minimum} to {maximum} by {shift} would collide at the upper range!"
            )
        elif post_shift_max == axis[max_i + 1][0]:
            # will merge the highest val in shifted range into next
            val1 = axis[max_i + 1][1]
            _, post_yield = axis.pop(max_i)

            max_i -= 1

    if pre_yield is not None:
        yield pre_yield
        axis[min_i - 1] = _CompareFirstTuple(post_shift_min, merge_func(val1, pre_yield))

    i = min_i
    while i < max_i:
        old_key, val = axis[i]
        axis[i] = _CompareFirstTuple(old_key + shift, val)
        yield val
        i += 1

    if post_yield is not None:
        yield post_yield
        axis[max_i + 2] = _CompareFirstTuple(post_shift_max, merge_func(val1, post_yield))