entropy.py
ofrak_components.entropy.entropy
DataSummary (ResourceAttributes)
dataclass
DataSummary(entropy_samples: bytes, magnitude_samples: bytes)
DataSummaryAnalyzer (Analyzer)
Analyze binary data and return summaries of its structure via the entropy and magnitude of its bytes.
analyze(self, resource, config=None, depth=0)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Returns:
Type | Description |
---|---|
DataSummary |
The analysis results |
Source code in ofrak_components/entropy/entropy.py
async def analyze(self, resource: Resource, config=None, depth=0) -> DataSummary:
if depth > self.max_analysis_retries:
raise RuntimeError(
f"Analysis process killed more than {self.max_analysis_retries} times. Aborting."
)
if not _ENTROPY_SO_DEPENDENCY.is_tool_installed():
raise ComponentMissingDependencyError(self, _ENTROPY_SO_DEPENDENCY)
data = await resource.get_data()
# Run blocking computations in separate processes
try:
entropy = await asyncio.get_running_loop().run_in_executor(
self.pool, sample_entropy, data, resource.get_id()
)
magnitude = await asyncio.get_running_loop().run_in_executor(
self.pool, sample_magnitude, data
)
return DataSummary(entropy, magnitude)
except BrokenProcessPool:
# If the previous one was aborted, try again with a new pool
self.pool = ProcessPoolExecutor()
return await self.analyze(resource, config=config, depth=depth + 1)
_EntropyCTypesTool (ComponentExternalTool)
private
__init__(self)
special
Initialize self. See help(type(self)) for accurate signature.
Source code in ofrak_components/entropy/entropy.py
def __init__(self):
# TODO: Add docs page on building entropy.so.1
super().__init__("entropy.so.1", None, None, None)
is_tool_installed(self)
Check if a tool is installed by running it with the install_check_arg
.
This method runs <tool> <install_check_arg>
.
Returns:
Type | Description |
---|---|
bool |
True if the |
Source code in ofrak_components/entropy/entropy.py
def is_tool_installed(self) -> bool:
return ENTROPY_FUNCTION is not None
sample_entropy(data, resource_id, window_size=256, max_samples=1048576)
Return a list of entropy values where each value represents the Shannon entropy of the byte value distribution over a fixed-size, sliding window. If the entropy data is larger than a maximum size, summarize it by periodically sampling it.
Shannon entropy represents how uniform a probability distribution is. Since more uniform implies less predictable (because the probability of any outcome is equally likely in a uniform distribution), a sample with higher entropy is "more random" than one with lower entropy. More here: https://en.wikipedia.org/wiki/Entropy_(information_theory).
Source code in ofrak_components/entropy/entropy.py
def sample_entropy(
data: bytes, resource_id: bytes, window_size=256, max_samples=2**20
) -> bytes: # pragma: no cover
"""
Return a list of entropy values where each value represents the Shannon entropy of the byte
value distribution over a fixed-size, sliding window. If the entropy data is larger than a
maximum size, summarize it by periodically sampling it.
Shannon entropy represents how uniform a probability distribution is. Since more uniform
implies less predictable (because the probability of any outcome is equally likely in a
uniform distribution), a sample with higher entropy is "more random" than one with lower
entropy. More here: <https://en.wikipedia.org/wiki/Entropy_(information_theory)>.
"""
if len(data) < 256:
return b""
def log_percent(percent): # pragma: no cover
LOGGER.info(f"Entropy calculation {percent}% complete for {resource_id.hex()}")
# Make the entropy buffer mutable to the external C function
entropy = ctypes.create_string_buffer(len(data) - window_size)
errval = ENTROPY_FUNCTION(data, len(data), entropy, window_size, C_LOG_TYPE(log_percent))
if errval != 0:
raise ValueError("Bad input to entropy function.")
result = bytes(entropy.raw)
if len(result) <= max_samples:
return result
# Sample the calculated array if it is too large
skip = len(result) / max_samples
return bytes(result[math.floor(i * skip)] for i in range(max_samples))