entropy.py
ofrak.core.entropy.entropy
DataSummary (ResourceAttributes)
dataclass
High-level summary of binary data.
Attributes:
Name | Type | Description |
---|---|---|
entropy_samples |
bytes |
Shannon entropy of the data. A description of Shannon entropy and how it can be used is here. |
magnitude_samples |
bytes |
Sample of the binary data to put an upper limit on the displayed byte magnitudes; if the input data is smaller than this upper limit, all bytes are sampled. |
DataSummaryAnalyzer (Analyzer)
Analyze binary data and return summaries of its structure via the entropy and magnitude of its bytes.
analyze(self, resource, config=None, depth=0)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Returns:
Type | Description |
---|---|
DataSummary |
The analysis results |
Source code in ofrak/core/entropy/entropy.py
async def analyze(self, resource: Resource, config=None, depth=0) -> DataSummary:
if depth > self.max_analysis_retries:
raise RuntimeError(
f"Analysis process killed more than {self.max_analysis_retries} times. Aborting."
)
data = await resource.get_data()
# Run blocking computations in separate processes
try:
entropy = await asyncio.get_running_loop().run_in_executor(
self.pool, sample_entropy, data, resource.get_id()
)
magnitude = await asyncio.get_running_loop().run_in_executor(
self.pool, sample_magnitude, data
)
return DataSummary(entropy, magnitude)
except BrokenProcessPool:
# If the previous one was aborted, try again with a new pool
self.pool = ProcessPoolExecutor()
return await self.analyze(resource, config=config, depth=depth + 1)
sample_entropy(data, resource_id, window_size=256, max_samples=1048576)
Return a list of entropy values where each value represents the Shannon entropy of the byte value distribution over a fixed-size, sliding window. If the entropy data is larger than a maximum size, summarize it by periodically sampling it.
Shannon entropy represents how uniform a probability distribution is. Since more uniform implies less predictable (because the probability of any outcome is equally likely in a uniform distribution), a sample with higher entropy is "more random" than one with lower entropy. More here: https://en.wikipedia.org/wiki/Entropy_(information_theory).
Source code in ofrak/core/entropy/entropy.py
def sample_entropy(
data: bytes, resource_id: bytes, window_size=256, max_samples=2**20
) -> bytes: # pragma: no cover
"""
Return a list of entropy values where each value represents the Shannon entropy of the byte
value distribution over a fixed-size, sliding window. If the entropy data is larger than a
maximum size, summarize it by periodically sampling it.
Shannon entropy represents how uniform a probability distribution is. Since more uniform
implies less predictable (because the probability of any outcome is equally likely in a
uniform distribution), a sample with higher entropy is "more random" than one with lower
entropy. More here: <https://en.wikipedia.org/wiki/Entropy_(information_theory)>.
"""
if len(data) < 256:
return b""
def log_percent(percent): # pragma: no cover
LOGGER.info(f"Entropy calculation {percent}% complete for {resource_id.hex()}")
result = entropy_func(data, window_size, log_percent)
if len(result) <= max_samples:
return result
# Sample the calculated array if it is too large
skip = len(result) / max_samples
return bytes(result[math.floor(i * skip)] for i in range(max_samples))