Skip to content

entropy.py

ofrak_components.entropy.entropy

DataSummary (ResourceAttributes) dataclass

DataSummary(entropy_samples: bytes, magnitude_samples: bytes)

DataSummaryAnalyzer (Analyzer)

Analyze binary data and return summaries of its structure via the entropy and magnitude of its bytes.

analyze(self, resource, config=None, depth=0) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description
DataSummary

The analysis results

Source code in ofrak_components/entropy/entropy.py
async def analyze(self, resource: Resource, config=None, depth=0) -> DataSummary:
    if depth > self.max_analysis_retries:
        raise RuntimeError(
            f"Analysis process killed more than {self.max_analysis_retries} times. Aborting."
        )

    if not _ENTROPY_SO_DEPENDENCY.is_tool_installed():
        raise ComponentMissingDependencyError(self, _ENTROPY_SO_DEPENDENCY)

    data = await resource.get_data()
    # Run blocking computations in separate processes
    try:
        entropy = await asyncio.get_running_loop().run_in_executor(
            self.pool, sample_entropy, data, resource.get_id()
        )
        magnitude = await asyncio.get_running_loop().run_in_executor(
            self.pool, sample_magnitude, data
        )
        return DataSummary(entropy, magnitude)
    except BrokenProcessPool:
        # If the previous one was aborted, try again with a new pool
        self.pool = ProcessPoolExecutor()
        return await self.analyze(resource, config=config, depth=depth + 1)

_EntropyCTypesTool (ComponentExternalTool) private

__init__(self) special

Initialize self. See help(type(self)) for accurate signature.

Source code in ofrak_components/entropy/entropy.py
def __init__(self):
    # TODO: Add docs page on building entropy.so.1
    super().__init__("entropy.so.1", None, None, None)

is_tool_installed(self)

Check if a tool is installed by running it with the install_check_arg. This method runs <tool> <install_check_arg>.

Returns:

Type Description
bool

True if the tool command returned zero, False if tool could not be found or returned non-zero exit code.

Source code in ofrak_components/entropy/entropy.py
def is_tool_installed(self) -> bool:
    return ENTROPY_FUNCTION is not None

sample_entropy(data, resource_id, window_size=256, max_samples=1048576)

Return a list of entropy values where each value represents the Shannon entropy of the byte value distribution over a fixed-size, sliding window. If the entropy data is larger than a maximum size, summarize it by periodically sampling it.

Shannon entropy represents how uniform a probability distribution is. Since more uniform implies less predictable (because the probability of any outcome is equally likely in a uniform distribution), a sample with higher entropy is "more random" than one with lower entropy. More here: https://en.wikipedia.org/wiki/Entropy_(information_theory).

Source code in ofrak_components/entropy/entropy.py
def sample_entropy(
    data: bytes, resource_id: bytes, window_size=256, max_samples=2**20
) -> bytes:  # pragma: no cover
    """
    Return a list of entropy values where each value represents the Shannon entropy of the byte
    value distribution over a fixed-size, sliding window. If the entropy data is larger than a
    maximum size, summarize it by periodically sampling it.

    Shannon entropy represents how uniform a probability distribution is. Since more uniform
    implies less predictable (because the probability of any outcome is equally likely in a
    uniform distribution), a sample with higher entropy is "more random" than one with lower
    entropy. More here: <https://en.wikipedia.org/wiki/Entropy_(information_theory)>.
    """

    if len(data) < 256:
        return b""

    def log_percent(percent):  # pragma: no cover
        LOGGER.info(f"Entropy calculation {percent}% complete for {resource_id.hex()}")

    # Make the entropy buffer mutable to the external C function
    entropy = ctypes.create_string_buffer(len(data) - window_size)
    errval = ENTROPY_FUNCTION(data, len(data), entropy, window_size, C_LOG_TYPE(log_percent))
    if errval != 0:
        raise ValueError("Bad input to entropy function.")
    result = bytes(entropy.raw)

    if len(result) <= max_samples:
        return result

    # Sample the calculated array if it is too large
    skip = len(result) / max_samples
    return bytes(result[math.floor(i * skip)] for i in range(max_samples))