Skip to content

entropy.py

ofrak.core.entropy.entropy

DataSummary (ResourceAttributes) dataclass

High-level summary of binary data.

Attributes:

Name Type Description
entropy_samples bytes

Shannon entropy of the data. A description of Shannon entropy and how it can be used is here.

magnitude_samples bytes

Sample of the binary data to put an upper limit on the displayed byte magnitudes; if the input data is smaller than this upper limit, all bytes are sampled.

DataSummaryAnalyzer (Analyzer)

Analyze binary data and return summaries of its structure via the entropy and magnitude of its bytes.

analyze(self, resource, config=None, depth=0) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description
DataSummary

The analysis results

Source code in ofrak/core/entropy/entropy.py
async def analyze(self, resource: Resource, config=None, depth=0) -> DataSummary:
    if depth > self.max_analysis_retries:
        raise RuntimeError(
            f"Analysis process killed more than {self.max_analysis_retries} times. Aborting."
        )

    data = await resource.get_data()
    # Run blocking computations in separate processes
    try:
        entropy = await asyncio.get_running_loop().run_in_executor(
            self.pool, sample_entropy, data, resource.get_id()
        )
        magnitude = await asyncio.get_running_loop().run_in_executor(
            self.pool, sample_magnitude, data
        )
        return DataSummary(entropy, magnitude)
    except BrokenProcessPool:
        # If the previous one was aborted, try again with a new pool
        self.pool = ProcessPoolExecutor()
        return await self.analyze(resource, config=config, depth=depth + 1)

sample_entropy(data, resource_id, window_size=256, max_samples=1048576)

Return a list of entropy values where each value represents the Shannon entropy of the byte value distribution over a fixed-size, sliding window. If the entropy data is larger than a maximum size, summarize it by periodically sampling it.

Shannon entropy represents how uniform a probability distribution is. Since more uniform implies less predictable (because the probability of any outcome is equally likely in a uniform distribution), a sample with higher entropy is "more random" than one with lower entropy. More here: https://en.wikipedia.org/wiki/Entropy_(information_theory).

Source code in ofrak/core/entropy/entropy.py
def sample_entropy(
    data: bytes, resource_id: bytes, window_size=256, max_samples=2**20
) -> bytes:  # pragma: no cover
    """
    Return a list of entropy values where each value represents the Shannon entropy of the byte
    value distribution over a fixed-size, sliding window. If the entropy data is larger than a
    maximum size, summarize it by periodically sampling it.

    Shannon entropy represents how uniform a probability distribution is. Since more uniform
    implies less predictable (because the probability of any outcome is equally likely in a
    uniform distribution), a sample with higher entropy is "more random" than one with lower
    entropy. More here: <https://en.wikipedia.org/wiki/Entropy_(information_theory)>.
    """

    if len(data) < 256:
        return b""

    def log_percent(percent):  # pragma: no cover
        LOGGER.info(f"Entropy calculation {percent}% complete for {resource_id.hex()}")

    result = entropy_func(data, window_size, log_percent)

    if len(result) <= max_samples:
        return result

    # Sample the calculated array if it is too large
    skip = len(result) / max_samples
    return bytes(result[math.floor(i * skip)] for i in range(max_samples))