analyzer.py
ofrak.core.flash.analyzer
Heuristic OFRAK analyzer: infer FlashAttributes for raw NAND dumps tagged as FlashResource.
FlashGeometryHeuristicAnalyzer (Analyzer)
Infers FlashAttributes for a raw NAND dump tagged as FlashResource.
Ranks each standard (data_size, oob_size) geometry that evenly divides the file by
running a library of heuristics (Linux MTD large-page OOB, YAFFS2 packed tags, small-page
ECC density, exact Linux MTD Hamming verification) plus entropy and OOB-aligned gap
tiebreakers.
The returned FlashAttributes describes one data block containing DATA
followed by SPARE of the OOB size, so the existing FlashOobResourceUnpacker
preserves the per-block spare region verbatim as a FlashSpareAreaResource when
unpacking (rather than discarding it).
If no standard geometry evenly divides the file into a power-of-two page count, the
analyzer logs a warning and returns no attributes rather than raising, so that other
analyzers (e.g. BinwalkAnalyzer) can still run on the same resource.
analyze(self, resource, config=None)
async
Analyze a resource for to extract specific ResourceAttributes.
Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource |
Resource |
The resource that is being analyzed |
required |
config |
Optional[ofrak.core.flash.analyzer.FlashGeometryHeuristicAnalyzerConfig] |
Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run. |
None |
Returns:
| Type | Description |
|---|---|
Tuple[ofrak.core.flash.flash.FlashAttributes, ...] |
The analysis results |
Source code in ofrak/core/flash/analyzer.py
async def analyze(
self,
resource: Resource,
config: Optional[FlashGeometryHeuristicAnalyzerConfig] = None,
) -> Tuple[FlashAttributes, ...]:
config = config or FlashGeometryHeuristicAnalyzerConfig()
geometries = config.geometries()
data = await resource.get_data()
candidates = enumerate_candidates(len(data), geometries)
if not candidates:
LOGGER.warning(
"No standard NAND geometry matches file size with a power-of-2 page count "
"(file size: %d bytes). Skipping FlashAttributes inference.",
len(data),
)
return ()
heuristic_results = [
run_heuristics(data, c, config.scan, config.heuristics) for c in candidates
]
scored = [score_candidate(e, config.weights, geometries) for e in heuristic_results]
winner = min(scored, key=lambda s: s.sort_key)
winning_candidate = winner.evidence.candidate
return (
FlashAttributes(
data_block_format=[
FlashField(FlashFieldType.DATA, winning_candidate.data_size),
FlashField(FlashFieldType.SPARE, winning_candidate.oob_size),
]
),
)
FlashGeometryHeuristicAnalyzerConfig (ComponentConfig)
dataclass
Config for FlashGeometryHeuristicAnalyzer.
To "train" the analyzer on new image families, supply a custom heuristics list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
extra_geometries |
additional |
required | |
scan |
tunables for the evidence-collection pass |
required | |
weights |
tiebreaker thresholds for the entropy and gap signals |
required | |
heuristics |
ordered list of heuristics to run against each candidate geometry |
required |
GeometryEvidence
dataclass
Per-geometry signal counts gathered for a candidate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
candidate |
the geometry candidate these counts belong to |
required | |
pages_scanned |
number of pages actually visited during the scan |
required | |
heuristic_evidence |
one |
required | |
gap_hits |
count of OOB-sized 0xFF gaps found between non-erased data regions |
required | |
mean_data_entropy |
mean Shannon entropy (bits/byte) across scanned data regions |
required | |
mean_oob_entropy |
mean Shannon entropy (bits/byte) across scanned OOB regions |
required |
GeometryScore
dataclass
Ranking vector for a single candidate.
Precedence when comparing candidates:
- higher
oob_signal_score(sum of per-heuristic scores) - higher
entropy_signal_score(tiebreak: data-minus-OOB entropy delta) - higher
gap_signal_score(tiebreak: OOB-aligned 0xFF gaps) - lower
preference_index(earlier-listed geometries win)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
evidence |
the evidence the scores were computed from |
required | |
oob_signal_score |
sum of per-heuristic scores |
required | |
entropy_signal_score |
scaled data-minus-OOB entropy delta, or 0 when gated out |
required | |
gap_signal_score |
gap-hit count after noise-floor gating |
required | |
preference_index |
position of the geometry in the configured ordered list |
required |
ScoringWeights
dataclass
Scoring knobs for the tiebreakers that aren't heuristics.
Per-heuristic weight and noise-floor live on each heuristic's HeuristicSpec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entropy_min_delta_bits |
minimum data-minus-OOB entropy delta (bits/byte) required before entropy contributes to the tiebreaker |
required | |
entropy_min_data_entropy_bits |
minimum mean data entropy (bits/byte) required before entropy contributes to the tiebreaker |
required | |
entropy_tiebreak_scale |
integer multiplier applied to the entropy delta so it can be compared in the same ranking vector as heuristic hit counts |
required | |
gap_relative_min_hit_rate |
fractional floor on gap hits per page scanned before the gap signal is allowed to contribute |
required |
enumerate_candidates(file_size, geometries)
Filter geometries down to those that evenly divide file_size into a power-of-two
number of pages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_size |
int |
size of the flash image in bytes |
required |
geometries |
Sequence[Tuple[int, int]] |
candidate |
required |
Returns:
| Type | Description |
|---|---|
List[ofrak.core.flash.heuristics.GeometryCandidate] |
one |
Source code in ofrak/core/flash/analyzer.py
def enumerate_candidates(
file_size: int,
geometries: Sequence[Tuple[int, int]],
) -> List[GeometryCandidate]:
"""
Filter `geometries` down to those that evenly divide `file_size` into a power-of-two
number of pages.
:param file_size: size of the flash image in bytes
:param geometries: candidate `(data_size, oob_size)` pairings to consider
:return: one `GeometryCandidate` per surviving pairing, in input order
"""
out: List[GeometryCandidate] = []
for data_size, oob_size in geometries:
total = data_size + oob_size
if total <= 0 or file_size % total != 0:
continue
pages = file_size // total
if not _is_power_of_two(pages):
continue
out.append(GeometryCandidate(data_size=data_size, oob_size=oob_size, num_pages=pages))
return out
run_heuristics(data, candidate, scan, heuristics=None)
Gather per-heuristic evidence plus the entropy + gap tiebreak signals.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
the full flash image bytes |
required |
candidate |
GeometryCandidate |
the candidate geometry to evaluate |
required |
scan |
ScanConfig |
tunables for the scan pass |
required |
heuristics |
Optional[Sequence[Union[ofrak.core.flash.heuristics.GlobalHeuristic, ofrak.core.flash.heuristics.PerPageOobHeuristic]]] |
heuristics to run; defaults to |
None |
Returns:
| Type | Description |
|---|---|
GeometryEvidence |
the aggregated |
Source code in ofrak/core/flash/analyzer.py
def run_heuristics(
data: bytes,
candidate: GeometryCandidate,
scan: ScanConfig,
heuristics: Optional[Sequence[Heuristic]] = None,
) -> GeometryEvidence:
"""
Gather per-heuristic evidence plus the entropy + gap tiebreak signals.
:param data: the full flash image bytes
:param candidate: the candidate geometry to evaluate
:param scan: tunables for the scan pass
:param heuristics: heuristics to run; defaults to `DEFAULT_HEURISTICS`
:return: the aggregated `GeometryEvidence` for `candidate`
"""
if heuristics is None:
heuristics = DEFAULT_HEURISTICS
heuristic_evidence = evaluate_heuristics(data, candidate, scan, heuristics)
gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned = _scan_entropy_and_gap(
data, candidate, scan
)
return GeometryEvidence(
candidate=candidate,
pages_scanned=pages_scanned,
heuristic_evidence=heuristic_evidence,
gap_hits=gap_hits,
mean_data_entropy=mean_data_entropy,
mean_oob_entropy=mean_oob_entropy,
)
score_candidate(evidence, weights, geometries)
Combine per-heuristic evidence and entropy/gap tiebreakers into a ranking vector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
evidence |
GeometryEvidence |
the |
required |
weights |
ScoringWeights |
tiebreaker thresholds for the entropy and gap signals |
required |
geometries |
Sequence[Tuple[int, int]] |
the ordered list of geometries (used to derive |
required |
Returns:
| Type | Description |
|---|---|
GeometryScore |
the |
Source code in ofrak/core/flash/analyzer.py
def score_candidate(
evidence: GeometryEvidence,
weights: ScoringWeights,
geometries: Sequence[Tuple[int, int]],
) -> GeometryScore:
"""
Combine per-heuristic evidence and entropy/gap tiebreakers into a ranking vector.
:param evidence: the `GeometryEvidence` for the candidate
:param weights: tiebreaker thresholds for the entropy and gap signals
:param geometries: the ordered list of geometries (used to derive `preference_index`)
:return: the `GeometryScore` for the candidate
"""
oob_signal = sum(ev.score() for ev in evidence.heuristic_evidence)
gap_min_hits = int(evidence.pages_scanned * weights.gap_relative_min_hit_rate)
gap_signal = evidence.gap_hits if evidence.gap_hits >= gap_min_hits else 0
# Entropy is a confirmation signal only.
has_existing_evidence = oob_signal > 0 or evidence.gap_hits > 0
entropy_signal = 0
if (
has_existing_evidence
and evidence.mean_data_entropy >= weights.entropy_min_data_entropy_bits
and evidence.entropy_data_minus_oob >= weights.entropy_min_delta_bits
):
entropy_signal = int(evidence.entropy_data_minus_oob * weights.entropy_tiebreak_scale)
return GeometryScore(
evidence=evidence,
oob_signal_score=oob_signal,
entropy_signal_score=entropy_signal,
gap_signal_score=gap_signal,
preference_index=_geometry_preference_index(
evidence.candidate.data_size, evidence.candidate.oob_size, geometries
),
)
_shannon_entropy(buf)
private
Compute Shannon entropy of buf in bits/byte (0..8).
Source code in ofrak/core/flash/analyzer.py
def _shannon_entropy(buf: bytes) -> float:
"""
Compute Shannon entropy of `buf` in bits/byte (0..8).
"""
n = len(buf)
if n == 0:
return 0.0
counts = [0] * 256
for b in buf:
counts[b] += 1
h = 0.0
for c in counts:
if c:
p = c / n
h -= p * math.log2(p)
return h
_scan_entropy_and_gap(data, candidate, scan)
private
Single pass collecting entropy aggregates and page-boundary gap hits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data |
bytes |
the full flash image bytes |
required |
candidate |
GeometryCandidate |
the candidate geometry being measured |
required |
scan |
ScanConfig |
tunables for the scan pass |
required |
Returns:
| Type | Description |
|---|---|
Tuple[int, float, float, int] |
|
Source code in ofrak/core/flash/analyzer.py
def _scan_entropy_and_gap(
data: bytes,
candidate: GeometryCandidate,
scan: ScanConfig,
) -> Tuple[int, float, float, int]:
"""
Single pass collecting entropy aggregates and page-boundary gap hits.
:param data: the full flash image bytes
:param candidate: the candidate geometry being measured
:param scan: tunables for the scan pass
:return: `(gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned)`
"""
gap_hits = 0
gap_budget = scan.gap_sample_count
file_len = len(data)
w = ERASED_SENTINEL_WINDOW
erased_window = b"\xff" * w
erased_oob = b"\xff" * candidate.oob_size
pages_available = file_len // candidate.total_chunk_size
pages_to_scan = min(scan.oob_scan_cap_pages, pages_available)
entropy_enabled = scan.entropy_enabled and candidate.oob_size > 0
data_entropy_sum = 0.0
oob_entropy_sum = 0.0
pages_scanned = 0
for page in range(pages_to_scan):
base = page * candidate.total_chunk_size
oob_off = base + candidate.data_size
if oob_off + candidate.oob_size > file_len:
break
if entropy_enabled:
page_data = data[base:oob_off]
oob = data[oob_off : oob_off + candidate.oob_size]
data_entropy_sum += _shannon_entropy(page_data)
oob_entropy_sum += _shannon_entropy(oob)
pages_scanned += 1
# Probe this page boundary for an OOB-sized 0xFF gap between non-erased data.
# Skip page 0 (needs a previous page for context); only run while within the
# sampling window and the budget isn't exhausted.
if page == 0 or page > scan.gap_sample_max_scan or gap_budget <= 0:
continue
# Skip pages whose data region looks erased (first + last 16 bytes both 0xFF);
# these would otherwise waste the gap budget.
if data[base : base + w] == erased_window and data[oob_off - w : oob_off] == erased_window:
continue
gap_budget -= 1
oob_region = data[oob_off : oob_off + candidate.oob_size]
if oob_region != erased_oob:
continue
data_before = data[oob_off - w : oob_off]
data_after = data[oob_off + candidate.oob_size : oob_off + candidate.oob_size + w]
if len(data_before) != w or len(data_after) != w:
continue
if data_before == erased_window and data_after == erased_window:
continue
gap_hits += 1
mean_data_entropy = (
data_entropy_sum / pages_scanned if (entropy_enabled and pages_scanned) else 0.0
)
mean_oob_entropy = (
oob_entropy_sum / pages_scanned if (entropy_enabled and pages_scanned) else 0.0
)
return gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned