Skip to content

analyzer.py

ofrak.core.flash.analyzer

Heuristic OFRAK analyzer: infer FlashAttributes for raw NAND dumps tagged as FlashResource.

FlashGeometryHeuristicAnalyzer (Analyzer)

Infers FlashAttributes for a raw NAND dump tagged as FlashResource.

Ranks each standard (data_size, oob_size) geometry that evenly divides the file by running a library of heuristics (Linux MTD large-page OOB, YAFFS2 packed tags, small-page ECC density, exact Linux MTD Hamming verification) plus entropy and OOB-aligned gap tiebreakers.

The returned FlashAttributes describes one data block containing DATA followed by SPARE of the OOB size, so the existing FlashOobResourceUnpacker preserves the per-block spare region verbatim as a FlashSpareAreaResource when unpacking (rather than discarding it).

If no standard geometry evenly divides the file into a power-of-two page count, the analyzer logs a warning and returns no attributes rather than raising, so that other analyzers (e.g. BinwalkAnalyzer) can still run on the same resource.

analyze(self, resource, config=None) async

Analyze a resource for to extract specific ResourceAttributes.

Users should not call this method directly; rather, they should run Resource.run or Resource.analyze.

Parameters:

Name Type Description Default
resource Resource

The resource that is being analyzed

required
config Optional[ofrak.core.flash.analyzer.FlashGeometryHeuristicAnalyzerConfig]

Optional config for analyzing. If an implementation provides a default, this default will always be used when config would otherwise be None. Note that a copy of the default config will be passed, so the default config values cannot be modified persistently by a component run.

None

Returns:

Type Description
Tuple[ofrak.core.flash.flash.FlashAttributes, ...]

The analysis results

Source code in ofrak/core/flash/analyzer.py
async def analyze(
    self,
    resource: Resource,
    config: Optional[FlashGeometryHeuristicAnalyzerConfig] = None,
) -> Tuple[FlashAttributes, ...]:
    config = config or FlashGeometryHeuristicAnalyzerConfig()
    geometries = config.geometries()
    data = await resource.get_data()

    candidates = enumerate_candidates(len(data), geometries)
    if not candidates:
        LOGGER.warning(
            "No standard NAND geometry matches file size with a power-of-2 page count "
            "(file size: %d bytes). Skipping FlashAttributes inference.",
            len(data),
        )
        return ()

    heuristic_results = [
        run_heuristics(data, c, config.scan, config.heuristics) for c in candidates
    ]
    scored = [score_candidate(e, config.weights, geometries) for e in heuristic_results]
    winner = min(scored, key=lambda s: s.sort_key)
    winning_candidate = winner.evidence.candidate

    return (
        FlashAttributes(
            data_block_format=[
                FlashField(FlashFieldType.DATA, winning_candidate.data_size),
                FlashField(FlashFieldType.SPARE, winning_candidate.oob_size),
            ]
        ),
    )

FlashGeometryHeuristicAnalyzerConfig (ComponentConfig) dataclass

Config for FlashGeometryHeuristicAnalyzer.

To "train" the analyzer on new image families, supply a custom heuristics list.

Parameters:

Name Type Description Default
extra_geometries

additional (data_size, oob_size) pairs to consider beyond DEFAULT_GEOMETRIES

required
scan

tunables for the evidence-collection pass

required
weights

tiebreaker thresholds for the entropy and gap signals

required
heuristics

ordered list of heuristics to run against each candidate geometry

required

GeometryEvidence dataclass

Per-geometry signal counts gathered for a candidate.

Parameters:

Name Type Description Default
candidate

the geometry candidate these counts belong to

required
pages_scanned

number of pages actually visited during the scan

required
heuristic_evidence

one HeuristicEvidence per configured heuristic

required
gap_hits

count of OOB-sized 0xFF gaps found between non-erased data regions

required
mean_data_entropy

mean Shannon entropy (bits/byte) across scanned data regions

required
mean_oob_entropy

mean Shannon entropy (bits/byte) across scanned OOB regions

required

GeometryScore dataclass

Ranking vector for a single candidate.

Precedence when comparing candidates:

  1. higher oob_signal_score (sum of per-heuristic scores)
  2. higher entropy_signal_score (tiebreak: data-minus-OOB entropy delta)
  3. higher gap_signal_score (tiebreak: OOB-aligned 0xFF gaps)
  4. lower preference_index (earlier-listed geometries win)

Parameters:

Name Type Description Default
evidence

the evidence the scores were computed from

required
oob_signal_score

sum of per-heuristic scores

required
entropy_signal_score

scaled data-minus-OOB entropy delta, or 0 when gated out

required
gap_signal_score

gap-hit count after noise-floor gating

required
preference_index

position of the geometry in the configured ordered list

required

ScoringWeights dataclass

Scoring knobs for the tiebreakers that aren't heuristics.

Per-heuristic weight and noise-floor live on each heuristic's HeuristicSpec.

Parameters:

Name Type Description Default
entropy_min_delta_bits

minimum data-minus-OOB entropy delta (bits/byte) required before entropy contributes to the tiebreaker

required
entropy_min_data_entropy_bits

minimum mean data entropy (bits/byte) required before entropy contributes to the tiebreaker

required
entropy_tiebreak_scale

integer multiplier applied to the entropy delta so it can be compared in the same ranking vector as heuristic hit counts

required
gap_relative_min_hit_rate

fractional floor on gap hits per page scanned before the gap signal is allowed to contribute

required

enumerate_candidates(file_size, geometries)

Filter geometries down to those that evenly divide file_size into a power-of-two number of pages.

Parameters:

Name Type Description Default
file_size int

size of the flash image in bytes

required
geometries Sequence[Tuple[int, int]]

candidate (data_size, oob_size) pairings to consider

required

Returns:

Type Description
List[ofrak.core.flash.heuristics.GeometryCandidate]

one GeometryCandidate per surviving pairing, in input order

Source code in ofrak/core/flash/analyzer.py
def enumerate_candidates(
    file_size: int,
    geometries: Sequence[Tuple[int, int]],
) -> List[GeometryCandidate]:
    """
    Filter `geometries` down to those that evenly divide `file_size` into a power-of-two
    number of pages.

    :param file_size: size of the flash image in bytes
    :param geometries: candidate `(data_size, oob_size)` pairings to consider

    :return: one `GeometryCandidate` per surviving pairing, in input order
    """
    out: List[GeometryCandidate] = []
    for data_size, oob_size in geometries:
        total = data_size + oob_size
        if total <= 0 or file_size % total != 0:
            continue
        pages = file_size // total
        if not _is_power_of_two(pages):
            continue
        out.append(GeometryCandidate(data_size=data_size, oob_size=oob_size, num_pages=pages))
    return out

run_heuristics(data, candidate, scan, heuristics=None)

Gather per-heuristic evidence plus the entropy + gap tiebreak signals.

Parameters:

Name Type Description Default
data bytes

the full flash image bytes

required
candidate GeometryCandidate

the candidate geometry to evaluate

required
scan ScanConfig

tunables for the scan pass

required
heuristics Optional[Sequence[Union[ofrak.core.flash.heuristics.GlobalHeuristic, ofrak.core.flash.heuristics.PerPageOobHeuristic]]]

heuristics to run; defaults to DEFAULT_HEURISTICS

None

Returns:

Type Description
GeometryEvidence

the aggregated GeometryEvidence for candidate

Source code in ofrak/core/flash/analyzer.py
def run_heuristics(
    data: bytes,
    candidate: GeometryCandidate,
    scan: ScanConfig,
    heuristics: Optional[Sequence[Heuristic]] = None,
) -> GeometryEvidence:
    """
    Gather per-heuristic evidence plus the entropy + gap tiebreak signals.

    :param data: the full flash image bytes
    :param candidate: the candidate geometry to evaluate
    :param scan: tunables for the scan pass
    :param heuristics: heuristics to run; defaults to `DEFAULT_HEURISTICS`

    :return: the aggregated `GeometryEvidence` for `candidate`
    """
    if heuristics is None:
        heuristics = DEFAULT_HEURISTICS

    heuristic_evidence = evaluate_heuristics(data, candidate, scan, heuristics)
    gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned = _scan_entropy_and_gap(
        data, candidate, scan
    )

    return GeometryEvidence(
        candidate=candidate,
        pages_scanned=pages_scanned,
        heuristic_evidence=heuristic_evidence,
        gap_hits=gap_hits,
        mean_data_entropy=mean_data_entropy,
        mean_oob_entropy=mean_oob_entropy,
    )

score_candidate(evidence, weights, geometries)

Combine per-heuristic evidence and entropy/gap tiebreakers into a ranking vector.

Parameters:

Name Type Description Default
evidence GeometryEvidence

the GeometryEvidence for the candidate

required
weights ScoringWeights

tiebreaker thresholds for the entropy and gap signals

required
geometries Sequence[Tuple[int, int]]

the ordered list of geometries (used to derive preference_index)

required

Returns:

Type Description
GeometryScore

the GeometryScore for the candidate

Source code in ofrak/core/flash/analyzer.py
def score_candidate(
    evidence: GeometryEvidence,
    weights: ScoringWeights,
    geometries: Sequence[Tuple[int, int]],
) -> GeometryScore:
    """
    Combine per-heuristic evidence and entropy/gap tiebreakers into a ranking vector.

    :param evidence: the `GeometryEvidence` for the candidate
    :param weights: tiebreaker thresholds for the entropy and gap signals
    :param geometries: the ordered list of geometries (used to derive `preference_index`)

    :return: the `GeometryScore` for the candidate
    """
    oob_signal = sum(ev.score() for ev in evidence.heuristic_evidence)

    gap_min_hits = int(evidence.pages_scanned * weights.gap_relative_min_hit_rate)
    gap_signal = evidence.gap_hits if evidence.gap_hits >= gap_min_hits else 0

    # Entropy is a confirmation signal only.
    has_existing_evidence = oob_signal > 0 or evidence.gap_hits > 0
    entropy_signal = 0
    if (
        has_existing_evidence
        and evidence.mean_data_entropy >= weights.entropy_min_data_entropy_bits
        and evidence.entropy_data_minus_oob >= weights.entropy_min_delta_bits
    ):
        entropy_signal = int(evidence.entropy_data_minus_oob * weights.entropy_tiebreak_scale)

    return GeometryScore(
        evidence=evidence,
        oob_signal_score=oob_signal,
        entropy_signal_score=entropy_signal,
        gap_signal_score=gap_signal,
        preference_index=_geometry_preference_index(
            evidence.candidate.data_size, evidence.candidate.oob_size, geometries
        ),
    )

_shannon_entropy(buf) private

Compute Shannon entropy of buf in bits/byte (0..8).

Source code in ofrak/core/flash/analyzer.py
def _shannon_entropy(buf: bytes) -> float:
    """
    Compute Shannon entropy of `buf` in bits/byte (0..8).
    """
    n = len(buf)
    if n == 0:
        return 0.0
    counts = [0] * 256
    for b in buf:
        counts[b] += 1
    h = 0.0
    for c in counts:
        if c:
            p = c / n
            h -= p * math.log2(p)
    return h

_scan_entropy_and_gap(data, candidate, scan) private

Single pass collecting entropy aggregates and page-boundary gap hits.

Parameters:

Name Type Description Default
data bytes

the full flash image bytes

required
candidate GeometryCandidate

the candidate geometry being measured

required
scan ScanConfig

tunables for the scan pass

required

Returns:

Type Description
Tuple[int, float, float, int]

(gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned)

Source code in ofrak/core/flash/analyzer.py
def _scan_entropy_and_gap(
    data: bytes,
    candidate: GeometryCandidate,
    scan: ScanConfig,
) -> Tuple[int, float, float, int]:
    """
    Single pass collecting entropy aggregates and page-boundary gap hits.

    :param data: the full flash image bytes
    :param candidate: the candidate geometry being measured
    :param scan: tunables for the scan pass

    :return: `(gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned)`
    """
    gap_hits = 0
    gap_budget = scan.gap_sample_count

    file_len = len(data)
    w = ERASED_SENTINEL_WINDOW
    erased_window = b"\xff" * w
    erased_oob = b"\xff" * candidate.oob_size

    pages_available = file_len // candidate.total_chunk_size
    pages_to_scan = min(scan.oob_scan_cap_pages, pages_available)

    entropy_enabled = scan.entropy_enabled and candidate.oob_size > 0
    data_entropy_sum = 0.0
    oob_entropy_sum = 0.0

    pages_scanned = 0
    for page in range(pages_to_scan):
        base = page * candidate.total_chunk_size
        oob_off = base + candidate.data_size
        if oob_off + candidate.oob_size > file_len:
            break

        if entropy_enabled:
            page_data = data[base:oob_off]
            oob = data[oob_off : oob_off + candidate.oob_size]
            data_entropy_sum += _shannon_entropy(page_data)
            oob_entropy_sum += _shannon_entropy(oob)

        pages_scanned += 1

        # Probe this page boundary for an OOB-sized 0xFF gap between non-erased data.
        # Skip page 0 (needs a previous page for context); only run while within the
        # sampling window and the budget isn't exhausted.
        if page == 0 or page > scan.gap_sample_max_scan or gap_budget <= 0:
            continue

        # Skip pages whose data region looks erased (first + last 16 bytes both 0xFF);
        # these would otherwise waste the gap budget.
        if data[base : base + w] == erased_window and data[oob_off - w : oob_off] == erased_window:
            continue
        gap_budget -= 1

        oob_region = data[oob_off : oob_off + candidate.oob_size]
        if oob_region != erased_oob:
            continue
        data_before = data[oob_off - w : oob_off]
        data_after = data[oob_off + candidate.oob_size : oob_off + candidate.oob_size + w]
        if len(data_before) != w or len(data_after) != w:
            continue
        if data_before == erased_window and data_after == erased_window:
            continue
        gap_hits += 1

    mean_data_entropy = (
        data_entropy_sum / pages_scanned if (entropy_enabled and pages_scanned) else 0.0
    )
    mean_oob_entropy = (
        oob_entropy_sum / pages_scanned if (entropy_enabled and pages_scanned) else 0.0
    )
    return gap_hits, mean_data_entropy, mean_oob_entropy, pages_scanned