Skip to content

data_service.py

ofrak.service.data_service

DataNode

get_unmapped_ranges(self, sort_by_size=False, bounds=None)

Get an iterator over unmapped ranges in zero passes.

Source code in ofrak/service/data_service.py
def get_unmapped_ranges(
    self, sort_by_size: bool = False, bounds: Optional[Range] = None
) -> Iterable[Range]:
    """Get an iterator over unmapped ranges in zero passes."""
    # Get hash, and get size-sorted-range from hash :: [(size, hash)] -> [Range].
    size_sorted_ranges: Iterable[Range] = map(lambda x: x[2], self._unmapped_sizes)

    # Get range, discard index :: [(idx, Range)] -> [Range].
    unmapped_ranges: Iterable[Range] = map(lambda x: x[1], self._unmapped_ranges)

    # Choose between size-sorted and start-index-sorted.
    unmapped_iterable = size_sorted_ranges if sort_by_size else unmapped_ranges

    def selector(urange: Range, bounds: Optional[Range]) -> Tuple[int, Range]:
        """Intersects ``urange`` with bounds, returning (length, Range)."""
        bounds = bounds if bounds is not None else urange
        start = max(urange.start, bounds.start)
        end = min(urange.end, bounds.end)
        end = max(start, end)
        intersection = Range(start, end)
        return end - start, intersection

    # Fill in ``bounds`` kwarg.
    bounded_selector = functools.partial(selector, bounds=bounds)

    # Get lengths and range intersections.
    lengths_and_ranges = map(bounded_selector, unmapped_iterable)

    # Filter 0-length ranges.
    nontrivial_lengths_and_ranges = filter(lambda x: x[0] > 0, lengths_and_ranges)

    # Take ranges, discard lengths.
    bounded_ranges = map(lambda x: x[1], nontrivial_lengths_and_ranges)
    return bounded_ranges

_pop_size(self, candidate) private

Delete the representations of candidate from self._unmapped_sizes.

Source code in ofrak/service/data_service.py
def _pop_size(self, candidate: Range) -> None:
    """
    Delete the representations of ``candidate`` from
    ``self._unmapped_sizes``.
    """
    candidate_tuple = (candidate.length(), hash(candidate), candidate)
    idx = bisect.bisect_left(self._unmapped_sizes, candidate_tuple)
    del self._unmapped_sizes[idx]

_push_size(self, candidate) private

Add the representations of candidate to self._unmapped_sizes.

Source code in ofrak/service/data_service.py
def _push_size(self, candidate: Range) -> None:
    """
    Add the representations of ``candidate`` to ``self._unmapped_sizes``.
    """
    candidate_tuple = (candidate.length(), hash(candidate), candidate)
    bisect.insort_left(self._unmapped_sizes, candidate_tuple)

get_children_in_range(self, target_range)

Get all the children that overlap the provided range. If the provided range is empty and there are empty ranges at that location, they are included in the results. The children are not ordered.

Parameters:

Name Type Description Default
target_range Range required

Returns:

Type Description
Iterable[DataNode]
Source code in ofrak/service/data_service.py
def get_children_in_range(self, target_range: Range) -> Iterable["DataNode"]:
    """
    Get all the children that overlap the provided range. If the provided range is empty and
    there are empty ranges at that location, they are included in the results. The children
    are not ordered.
    :param target_range:
    :return:
    """
    if len(self._children) == 0:
        return tuple()
    child_index = bisect.bisect_left(self._children, (target_range.start,))
    if not self.is_overlaps_enabled():
        # Since overlaps are not allowed, this is just binary search
        if child_index != 0:
            # The child_index corresponds to the first child that starts at or after the
            # provided range. The child to the left must also be checked if it exists.
            _, left_child = self._children[child_index - 1]
            if target_range.overlaps(left_child.model.range):
                yield left_child

        for i in range(child_index, len(self._children)):
            # Iterate the children that starts at or after the provided range until the first
            # child to the left of the range is found.
            _, child = self._children[i]
            if target_range.end < child.model.range.start:
                break
            if target_range.overlaps(child.model.range) or target_range == child.model.range:
                # The ranges overlaps or they are equal (which is only possible if they are 0
                # sized ranges, which is considered an overlap)
                yield child
    else:
        self._children_tree = cast(IntervalTree, self._children_tree)
        if target_range.length() > 0:
            intervals = self._children_tree.overlap(target_range.start, target_range.end)
        else:
            intervals = self._children_tree.at(target_range.start)
        for interval in sorted(intervals):
            if interval.data.model.range.overlaps(target_range):
                yield interval.data
        for i in range(child_index, len(self._children)):
            # Handle the 0-sized children
            _, child = self._children[i]
            if child.model.range.start > target_range.end:
                break
            if child.model.range.length() == 0 and (
                child.model.range.overlaps(target_range) or target_range == child.model.range
            ):
                yield child

get_range_index(self, child_range, within_data_id=None, after_data_id=None, before_data_id=None)

Return the index of an existing node and the enum value describing the relation between child_range and that node.

Source code in ofrak/service/data_service.py
def get_range_index(
    self,
    child_range: Range,
    within_data_id: Optional[bytes] = None,
    after_data_id: Optional[bytes] = None,
    before_data_id: Optional[bytes] = None,
) -> Tuple[int, DataRangePosition]:
    """
    Return the index of an existing node and the enum value describing the
    relation between ``child_range`` and that node.
    """
    if child_range.start < 0 or child_range.end > self.model.range.length():
        raise OutOfBoundError(
            f"The provided {child_range} is outside the bounds of the parent {self}"
        )
    if len(self._children) == 0:
        return 0, DataRangePosition.UNDEFINED

    min_child_index = cast(int, bisect.bisect_left(self._children, (child_range.start,)))
    max_child_index = cast(int, bisect.bisect_left(self._children, (child_range.start + 1,)))
    if min_child_index == max_child_index:
        # There is no existing child that match the start address of the new child
        if min_child_index > 0:
            # Check the left child for overlaps
            _, left_child_node = self._children[min_child_index - 1]
            left_child_range = left_child_node.model.range
            if left_child_range.end > child_range.end or (
                left_child_range.end == child_range.end and child_range.length() > 0
            ):
                return self._validate_range_index(
                    child_range,
                    min_child_index - 1,
                    DataRangePosition.WITHIN,
                    within_data_id,
                    after_data_id,
                    before_data_id,
                )
            if left_child_node.model.range.end > child_range.start:
                return self._validate_range_index(
                    child_range,
                    min_child_index - 1,
                    DataRangePosition.OVERLAP,
                    within_data_id,
                    after_data_id,
                    before_data_id,
                )

        if min_child_index < len(self._children):
            # Check the right child for overlaps
            _, right_child_node = self._children[min_child_index]
            if right_child_node.model.range.start < child_range.end:
                return self._validate_range_index(
                    child_range,
                    min_child_index,
                    DataRangePosition.OVERLAP,
                    within_data_id,
                    after_data_id,
                    before_data_id,
                )

        return self._validate_range_index(
            child_range,
            min_child_index,
            DataRangePosition.UNMAPPED,
            within_data_id,
            after_data_id,
            before_data_id,
        )

    # At this point, there are one or more existing children that have the same start address
    # as the new child.
    if child_range.length() != 0:
        # If all existing children are zero-sized and the new child is not, it should go at
        # the end.
        all_zero_sized = True
        for i in range(min_child_index, max_child_index):
            _, _child = self._children[i]
            if _child.model.range.length() > 0:
                if not self.is_overlaps_enabled():
                    if _child.model.range.end >= child_range.end:
                        return self._validate_range_index(
                            child_range,
                            i,
                            DataRangePosition.WITHIN,
                            within_data_id,
                            after_data_id,
                            before_data_id,
                        )
                    else:
                        return self._validate_range_index(
                            child_range,
                            i,
                            DataRangePosition.OVERLAP,
                            within_data_id,
                            after_data_id,
                            before_data_id,
                        )
                else:
                    all_zero_sized = False
        if all_zero_sized:
            # If overlaps are forbidden, this branch will always be taken (otherwise it would
            # have returned in the loop above). If overlaps are allowed, there is only one
            # possibility for the new child's index
            return self._validate_range_index(
                child_range,
                max_child_index,
                DataRangePosition.UNMAPPED,
                within_data_id,
                after_data_id,
                before_data_id,
            )

    if (
        max_child_index - min_child_index == 1
        and child_range.length() == 0
        and within_data_id is None
    ):
        # If there is just one existing non-zero-sized child, the new child should go before it.
        _, _child = self._children[min_child_index]
        if _child.model.range.length() != 0:
            return self._validate_range_index(
                child_range,
                min_child_index,
                DataRangePosition.UNMAPPED,
                within_data_id,
                after_data_id,
                before_data_id,
            )

    # At this point, there is at least one existing child and the new child have the same
    # start position. If overlaps are not allowed, the new child is 0-sized and at least
    # one other child sharing the same start address is 0-sized. It requires a hint
    # to know where the new child will be inserted relative to the other 0-sized child(ren)

    _, min_child_node = self._children[min_child_index]
    if after_data_id is None and before_data_id is None and within_data_id is None:
        raise AmbiguousOrderError(
            f"The {child_range} could got before or after child {min_child_node} but the "
            f"order was not specified"
        )
    elif (
        len(
            list(
                filter(lambda v: v is not None, (within_data_id, after_data_id, before_data_id))
            )
        )
        != 1
    ):
        raise AmbiguousOrderError(
            f"The range {child_range} could go within, before or after multiple "
            f"children. Exactly one within, after or before data id parameter must be included."
        )
    if after_data_id is not None:
        min_previous_child_index = max(min_child_index - 1, 0)
        for previous_child_index in range(min_previous_child_index, max_child_index):
            _, previous_child = self._children[previous_child_index]
            if previous_child.model.id == after_data_id:
                return previous_child_index, DataRangePosition.AFTER
        raise AmbiguousOrderError(
            f"The {child_range} could be go in multiple locations but the order was "
            f"specified with an after data ID that does not exist or is not located "
            f"immediately before one of the child's valid insertion indexes."
        )
    elif before_data_id is not None:
        max_next_child_index = min(max_child_index, len(self._children) - 1)
        for next_child_index in range(min_child_index, max_next_child_index + 1):
            _, next_child = self._children[next_child_index]
            if next_child.model.id == before_data_id:
                return next_child_index, DataRangePosition.BEFORE
        raise AmbiguousOrderError(
            f"The {child_range} could be inserted before or after child {min_child_node} but"
            f" the order was specified with an after data ID that does not exist or is "
            f"located after the provided range."
        )
    elif within_data_id is not None:
        min_previous_child_index = max(min_child_index - 1, 0)
        max_next_child_index = min(max_child_index, len(self._children) - 1)
        for child_index in range(min_previous_child_index, max_next_child_index + 1):
            _, _child = self._children[child_index]
            if _child.model.id == within_data_id and child_range.within(_child.model.range):
                return child_index, DataRangePosition.WITHIN
        raise AmbiguousOrderError(
            f"The {child_range} could be inserted before or after child {min_child_node} but"
            f" the order was specified with an after data ID that does not exist or is "
            f"located after the provided range."
        )
    # Unreachable code
    raise NotImplementedError()

DataService (DataServiceInterface)

apply_patches(self, patches=None, moves=None) async

Apply multiple patches at once. The patches must meet the following guidelines:

  • Patches should not overlap each other.
  • Patches that cause a size change cannot affect any of the node's children, and they cannot overlap boundaries of the node, its siblings or its ancestors.
  • There can only be at most two patches between sibling nodes: one after the left child and one before the right child. There may be more patches that resolve to the same position if they are applied at a greater depth.
Source code in ofrak/service/data_service.py
async def apply_patches(
    self,
    patches: Optional[List[DataPatch]] = None,
    moves: Optional[List[DataMove]] = None,
) -> List[DataPatchesResult]:
    """
    Apply multiple patches at once. The patches must meet the following guidelines:

    - Patches should not overlap each other.
    - Patches that cause a size change cannot affect any of the node's children, and they cannot
      overlap boundaries of the node, its siblings or its ancestors.
    - There can only be at most two patches between sibling nodes: one after the left child and
      one before the right child. There may be more patches that resolve to the same position if
      they are applied at a greater depth.
    """
    if patches is None:
        patches = []
    moved_data_ids = set()
    delayed_patches = []
    current_patches = []
    if moves is not None and len(moves) > 0:
        # The moves can only happen once their after/before data IDs has been moved
        move_ids = set()
        moves_by_dependency = defaultdict(list)
        moves_ready: Deque[DataMove] = deque()
        remaining_moves = len(moves)
        for move in moves:
            move_ids.add(move.data_id)

        for move in moves:
            # Figure out which moves depends on which
            if move.after_data_id and move.after_data_id in move_ids:
                moves_by_dependency[move.after_data_id].append(move)
                continue
            elif move.before_data_id and move.before_data_id in move_ids:
                moves_by_dependency[move.before_data_id].append(move)
                continue
            moves_ready.append(move)

        while len(moves_ready) > 0:
            move = moves_ready.popleft()
            remaining_moves -= 1
            move_dependencies = moves_by_dependency[move.data_id]
            for move_dependency in move_dependencies:
                # These moves are now considered ready
                moves_ready.append(move_dependency)
            data_node = self._data_node_store.get(move.data_id)
            if data_node is None:
                raise NotFoundError(f"The data {move.data_id.hex()} does not exist")
            if not data_node.model.is_mapped():
                raise ValueError(f"The data {move.data_id.hex()} is not mapped")
            data = await self.get_data(move.data_id)
            parent_node = data_node.parent
            assert parent_node is not None
            patches.append(
                DataPatch(
                    data_node.model.range,
                    parent_node.model.id,
                    b"\x00" * len(data),
                )
            )
            parent_node.remove(data_node)
            data_node.model = dataclasses.replace(data_node.model, range=move.range)
            parent_node.insert_node(
                data_node, after_data_id=move.after_data_id, before_data_id=move.before_data_id
            )
            current_patches.append(
                DataPatch(
                    Range.from_size(0, move.range.length()),
                    move.data_id,
                    data,
                )
            )
            moved_data_ids.add(move.data_id)
        if remaining_moves != 0:
            # TODO: Make this error message more helpful for debugging. This should be a very
            #  rare case of foot-shooting but also very hard to track.
            raise ValueError("There is a circular dependencies on the moves")

    for patch in patches:
        # Determine if a patch can be applied immediately or if it needs to wait until after
        # the move is completed. This is to avoid issues when a patch applies to data that is
        # sized differently after the patch for the move (going from 0 to x for example).
        if patch.data_id not in moved_data_ids:
            current_patches.append(patch)
        else:
            if patch.range.length() != len(patch.data):
                raise ValueError(
                    "Cannot apply a patch that causes a change in size to a node that is "
                    "being moved"
                )
            delayed_patches.append(patch)

    # Maps ``patch.data_id`` to maps of original data being overwritten by patches.
    overwritten_map: Dict[bytes, Dict[int, bytes]] = {}

    # Save a copy of the patches we applied during this call.
    for patch in current_patches:

        # Maps hashes of ranges to original data within a patch.
        orig_patch_map: Dict[int, bytes] = overwritten_map.get(patch.data_id, {})
        overwritten_data = await self.get_data(patch.data_id, patch.range)
        orig_patch_map[hash(patch.range)] = overwritten_data
        overwritten_map[patch.data_id] = orig_patch_map

    # Add patches and overwritten history to patchlog.
    self._patchlog.append((copy.deepcopy(current_patches), overwritten_map))

    patches_by_root_id: Dict[bytes, List[PatchInfo]] = defaultdict(list)
    for patch in current_patches:
        data_node = self._data_node_store.get(patch.data_id)
        if data_node is None:
            raise NotFoundError(f"The data {patch.data_id.hex()} does not exist")
        if patch.range.end > data_node.model.range.end:
            raise OutOfBoundError(
                f"The patch on {patch.data_id.hex()} at {patch.range} is outside the bound of"
                f" {data_node}"
            )
        patch_data_length = len(patch.data)
        patch_range_length = patch.range.length()
        patch_length_diff = patch_data_length - patch_range_length

        if patch_length_diff != 0:
            # Validate that the patch does not overlap with any child of the targeted node.
            children = list(data_node.get_children_in_range(patch.range))
            for child in children:
                raise PatchOverlapError(f"The {patch} overlaps with child {child}")

        # Build up the information required to sort the patches.
        ancestors_info: Deque[PatchAncestor] = deque()
        ancestors_indexes: Deque[PatchPosition] = deque()
        ancestor_node: Optional[DataNode] = data_node
        ancestor_patch_range = patch.range
        while ancestor_node is not None:
            if patch_length_diff != 0:
                ancestor_children = ancestor_node.get_children_in_range(ancestor_patch_range)
                # Validate that the patch overlaps with at most one child within the ancestor.
                for i, child in enumerate(ancestor_children):
                    # "Overlapping" with 0-sized children is not a problem
                    if i != 0 and child.model.range.length() != 0:
                        raise PatchOverlapError(
                            f"The {patch} overlaps with more than one child within ancestor"
                            f" {ancestor_node}"
                        )
                    if not ancestor_patch_range.within(child.model.range):
                        raise OutOfBoundError(
                            f"The {patch} is not fully within ancestor {ancestor_node}"
                        )
            # Get the index for that patch within the ancestor children to eventually sort them
            patch_index, patch_position = ancestor_node.get_range_index(
                ancestor_patch_range,
                None if len(ancestors_info) == 0 else ancestors_info[0][0].model.id,
                patch.after_data_id if len(ancestors_info) == 0 else None,
                patch.before_data_id if len(ancestors_info) == 0 else None,
            )
            assert patch_position != DataRangePosition.OVERLAP, "Unexpected overlap"
            ancestors_indexes.appendleft((patch_index, patch_position))
            ancestors_info.appendleft((ancestor_node, ancestor_patch_range))

            ancestor_patch_range = ancestor_patch_range.translate(
                ancestor_node.model.range.start
            )
            ancestor_node = ancestor_node.parent

        # Populate the dictionary used to segregate patches that affect different root data
        # nodes
        root_data_node, root_patch_range = ancestors_info[0]
        patches_by_root_id[root_data_node.model.id].append(
            (
                root_patch_range.start,  # Used for sorting
                tuple(ancestors_indexes),  # Used for sorting
                patch,  # The original patch
                root_patch_range,  # The patch range within the root node
                list(ancestors_info),  # The nodes affected by the patch
            )
        )

    results_by_id: Dict[bytes, List[DataPatchResult]] = defaultdict(list)
    for node_data_id, node_data_patches in patches_by_root_id.items():
        try:
            node_data_patches.sort()
        except TypeError:
            offenders = []
            starts_and_ancestor_idxs = [
                (data_patch[0], data_patch[1]) for data_patch in node_data_patches
            ]
            for _, dup_locations in _list_duplicates(starts_and_ancestor_idxs):
                for loc in dup_locations:
                    # We only want to look at the patch for debugging purposes.
                    offenders.append(node_data_patches[loc][2])
            # That means the `DataPatch` instance had to be compared, meaning there are at
            # least 2 patches that are being applied to the same location
            raise PatchOverlapError(f"The patch overlaps another patch. Offenders: {offenders}")
        current_offset = None
        current_patch_info = None

        patch_info_by_node: Dict[
            DataNode, List[Tuple[Range, int, DataRangePosition, int]]
        ] = defaultdict(list)

        root_patches: List[Tuple[Range, bytes]] = []
        # Validate that there are no overlaps and populate the `patches_by_node` dictionary
        for (
            patch_start,
            patch_positions,
            patch,
            root_patch_range,
            patch_ancestors,
        ) in node_data_patches:
            prev_patch_info = current_patch_info
            prev_offset = current_offset
            current_patch_info = (patch_start, patch_positions)
            current_offset = root_patch_range.end
            if prev_patch_info is not None:
                if root_patch_range.start < prev_offset:
                    raise PatchOverlapError("The patch overlaps another patch")
                if current_patch_info == prev_patch_info:
                    # This doesn't get caught by a sort error if the patches are completely
                    # equal
                    raise PatchOverlapError("The patch overlaps another patch")
            size_diff = len(patch.data) - patch.range.length()
            root_patches.append((root_patch_range, patch.data))
            for ((patch_child_index, patch_child_position), (patch_node, patch_range)) in zip(
                patch_positions, patch_ancestors
            ):
                patch_info_by_node[patch_node].append(
                    (patch_range, patch_child_index, patch_child_position, size_diff)
                )
                results_by_id[patch_node.model.id].append(
                    DataPatchResult(
                        patch_range,
                        size_diff,
                        patch_child_index,
                        patch_child_position,
                    )
                )

        # TODO: Deal with alignment
        # Update the root data.
        data = self._data_store[node_data_id]
        data_parts = []
        previous_patch_end = 0
        for patch_range, patch_data in root_patches:
            data_parts.append(data[previous_patch_end : patch_range.start])
            data_parts.append(patch_data)
            previous_patch_end = patch_range.end
        data_parts.append(data[previous_patch_end:])
        self._data_store[node_data_id] = b"".join(data_parts)

        # Translate and resize the nodes affected by the patches
        for node, patch_infos in patch_info_by_node.items():
            # Resize the node first so that the children can be translated into a valid range
            aggregate_size_change = 0
            for _, _, _, size_diff in patch_infos:
                aggregate_size_change += size_diff
            node.resize(aggregate_size_change)
            node.translate_children(patch_infos)

    results = []
    for node_data_id, data_patch_results in results_by_id.items():
        results.append(DataPatchesResult(node_data_id, data_patch_results))

    if len(delayed_patches) > 0:
        # TODO: Merge the results
        await self.apply_patches(delayed_patches)
    return results

delete_node(self, data_id) async

Delete a node and re-arrange its children to the deleted node's parent.

Parameters:

Name Type Description Default
data_id bytes required

Returns:

Type Description
None
Source code in ofrak/service/data_service.py
async def delete_node(self, data_id: bytes) -> None:
    data_node_deleting = self._data_node_store[data_id]
    del self._data_node_store[data_id]

    if not data_node_deleting.is_root():
        parent = data_node_deleting.parent
        assert parent is not None
        node_index = parent.get_child_index(data_node_deleting)
        prev_node_index = node_index - 1
        prev_node_id: Optional[bytes]
        if prev_node_index >= 0:
            prev_node_id = parent.get_child(prev_node_index).model.id
        else:
            prev_node_id = None

        parent.remove(data_node_deleting)

        for i in range(data_node_deleting.get_num_children()):
            child = data_node_deleting.get_child(i)
            original_child_range = child.model.range
            new_child_range = Range(
                original_child_range.start + data_node_deleting.model.range.start,
                original_child_range.end + data_node_deleting.model.range.start,
            )
            child.model.range = new_child_range
            parent.insert_node(child, after_data_id=prev_node_id)
            prev_node_id = child.model.id
            child.set_parent(parent)

    else:
        # TODO: Make all children roots?
        raise NotImplementedError

merge_siblings(self, new_data_id, merging_data_ids) async

Merge the specified siblings into a new node, which has the same parent as the siblings and takes the children of the siblings.

Parameters:

Name Type Description Default
new_data_id bytes required
merging_data_ids Iterable[bytes] required

Returns:

Type Description
None
Source code in ofrak/service/data_service.py
async def merge_siblings(self, new_data_id: bytes, merging_data_ids: Iterable[bytes]) -> None:
    await self.gather_siblings(new_data_id, merging_data_ids)
    nodes_to_merge = [self._data_node_store[data_id] for data_id in merging_data_ids]
    for data_node in nodes_to_merge:
        await self.delete_node(data_node.model.id)

gather_siblings(self, new_data_id, gathering_data_ids) async

Create a new common parent for the specified data_ids. This common parent will be a child of the gathered nodes's existing common parent.

Parameters:

Name Type Description Default
new_data_id bytes required
gathering_data_ids Iterable[bytes] required

Returns:

Type Description
None
Source code in ofrak/service/data_service.py
async def gather_siblings(
    self, new_data_id: bytes, gathering_data_ids: Iterable[bytes]
) -> None:
    nodes_to_gather = [self._data_node_store[data_id] for data_id in gathering_data_ids]

    # First check that all nodes share the same parent (and none are root)
    parent_nodes = [data_node.parent for data_node in nodes_to_gather if data_node.parent]
    nodes_share_single_parent = all(
        [
            parent_node is not None and parent_node.model.id == parent_nodes[0].model.id
            for parent_node in parent_nodes
        ]
    )
    if not nodes_share_single_parent:
        raise NonContiguousError(f"Not all nodes in {gathering_data_ids} share the same parent")

    # Next check that the provided nodes are contiguous
    parent_node = parent_nodes[0]
    node_ranges = sorted(
        (data_node.model.range for data_node in nodes_to_gather), key=lambda r: r.start
    )
    prev_end = node_ranges[0].start

    for r in node_ranges:
        if r.start != prev_end:
            raise NonContiguousError(
                f"Nodes {gathering_data_ids} are not contiguous (gap between "
                f"0x{prev_end:x} and 0x{r.start:x})"
            )
        else:
            prev_end = r.end

    # Create the merged data model
    merged_range_begin = node_ranges[0].start
    merged_range_end = max(r.end for r in node_ranges)
    new_range = Range(merged_range_begin, merged_range_end)

    new_model = DataModel(new_data_id, new_range, parent_id=parent_node.model.id)

    new_node = DataNode(new_model, parent_node)
    self._data_node_store[new_data_id] = new_node

    # Remove each gathered node from the parent and this store
    for data_node in nodes_to_gather:
        parent_node.remove(data_node)
        data_node.model.range = Range(
            data_node.model.range.start - merged_range_begin,
            data_node.model.range.end - merged_range_begin,
        )
        new_node.insert_node(data_node)
        data_node.set_parent(new_node)

    parent_node.insert_node(new_node)

create_savepoint(self) async

self._savepoints: Dict[str, int] maps savepoint IDs to indices in self._patchlog.

This function saves the index in self._patchlog, and adds a pair to self._savepoints.

Source code in ofrak/service/data_service.py
async def create_savepoint(self) -> str:
    """
    `self._savepoints: Dict[str, int]` maps savepoint IDs to indices in `self._patchlog`.

    This function saves the index in `self._patchlog`, and adds a pair to `self._savepoints`.
    """
    index = len(self._patchlog)

    # If we've saved this ``index`` before, just return that savepoint ID.
    if index in self._savepoint_idxs:
        return self._savepoint_idxs[index]

    # Otherwise, generate a new one.
    randhash = "%020x" % random.randrange(16**20)
    stamp = datetime.datetime.utcnow().strftime("%m-%d-%Y--%H:%M:%S")
    savepoint_id = f"{randhash}--{stamp}--PATCHLOG_INDEX:{index}"
    self._savepoints[savepoint_id] = index
    self._savepoint_idxs[index] = savepoint_id

    return savepoint_id

get_patches_between_savepoints(self, start, end='') async

Returns a list of lists of patches between the given savepoint strings.

Source code in ofrak/service/data_service.py
async def get_patches_between_savepoints(
    self, start: str, end: str = ""
) -> List[List[DataPatch]]:
    """
    Returns a list of lists of patches between the given savepoint strings.
    """
    if start not in self._savepoints:
        raise ValueError(f"Savepoint `{start}` not found.")
    start_index = self._savepoints.get(start, len(self._patchlog))
    end_index = self._savepoints.get(end, len(self._patchlog))
    return [savepoint[0] for savepoint in self._patchlog[start_index:end_index]]

delete_tree(self, data_id) async

Delete a data node and all of its children.

Parameters:

Name Type Description Default
data_id bytes required

Returns:

Type Description
None
Source code in ofrak/service/data_service.py
async def delete_tree(self, data_id: bytes) -> None:
    data_node_deleting = self._data_node_store.get(data_id)
    if data_node_deleting is None:
        # Already deleted
        return

    parent = data_node_deleting.parent
    if parent is not None:
        parent.remove(data_node_deleting)

    def _delete_tree_helper(_data_node: DataNode):
        for i, child in _data_node._children:
            _delete_tree_helper(child)

        del self._data_node_store[_data_node.model.id]

    _delete_tree_helper(data_node_deleting)

_list_duplicates(seq) private

Helper function for better error messages.

Source code in ofrak/service/data_service.py
def _list_duplicates(seq):
    """Helper function for better error messages."""
    tally = defaultdict(list)
    for i, item in enumerate(seq):
        tally[item].append(i)
    return ((key, locs) for key, locs in tally.items() if len(locs) > 1)