Private
Public Access
0
0

refactor(scripts): move 7 code_path_audit files from src/ to scripts/code_path_audit/

The 7 code_path_audit*.py files (2604 lines total) are pure static
analysis tools. They do AST traversal of src/, no intrusive profiling,
no runtime markers. They were inlaid with src/ but only import:
- src.result_types (the Result[T] convention type)
- each other (the 6 siblings)

After the move:
- src/ is now pure application code; line-count audit metrics are clean
- scripts/code_path_audit/ is a new namespace-isolated subdir per
  AGENTS.md 'scripts are namespace-isolated by directory' rule

TIER-3 READ AGENTS.md + conductor/workflow.md + conductor/edit_workflow.md
+ conductor/code_styleguides/code_path_audit.md + the 7 files before
this commit.

Changes:
- 7 files moved: src/code_path_audit*.py -> scripts/code_path_audit/
- 7 files updated: internal imports rom src.code_path_audit_X ->
  rom code_path_audit_X (siblings in same subdir)
- 7 files updated: add sys.path.insert(0, str(Path(__file__).resolve().parents[2] / 'src'))
  to find src.result_types when run standalone
- 5 test files updated: rom src.code_path_audit -> rom code_path_audit
  + sys.path setup to find the new subdir
- 6 throwaway scripts in scripts/tier2/artifacts/ updated: import path
  + sys.path setup (parents[3] / 'src' + parents[3] / 'scripts' / 'code_path_audit')
- 2 styleguide/spec references updated: conductor/code_styleguides/code_path_audit.md
  + conductor/tracks/code_path_audit_20260607/spec_v2.md
- 1 meta-audit docstring updated: scripts/audit_code_path_audit_coverage.py
- 1 type registry entry deleted: docs/type_registry/src_code_path_audit.md
  (the type is no longer in src/)
- 1 type registry index updated: docs/type_registry/index.md (22 files, was 23)

Verification:
- 7/7 audit gates pass --strict (weak_types 102<=112, type_registry 22 files,
  main_thread_imports OK, no_models_config_io OK, code_path_audit_coverage 0
  violations, exception_handling 0 violations, optional_in_3_files 0 violations)
- 6/6 test files pass: test_code_path_audit, test_code_path_audit_integration,
  test_code_path_audit_phase78, test_code_path_audit_phase89,
  test_code_path_audit_ssdl_behavioral, test_metadata_nil_sentinel
- src/ line count: 29997 lines (down from 32621 = -2624 lines)
- scripts/code_path_audit/ line count: 2620 lines
This commit is contained in:
2026-06-25 09:29:24 -04:00
parent f7a2917938
commit 5ac0618a33
24 changed files with 110 additions and 257 deletions
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,369 @@
"""Real-data analyzers for code_path_audit v2.
These functions AST-walk real src/ files to extract actual signal:
- analyze_consumer_fields: count field accesses per consumer function
- analyze_producer_size: count fields in producer return statements
- compute_real_access_pattern: per-function access pattern from field counts
- compute_real_type_alias_coverage: typed vs untyped field access counts
- compute_real_decomposition_cost: actual cost from real struct size + access pattern
- extract_real_optimization_candidates: detect fat structs and field_by_field patterns
All functions return REAL data, not hardcoded defaults.
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
import ast
from collections import Counter
from typing import Literal
from code_path_audit import (
FunctionRef,
AccessPatternEvidence,
FrequencyEvidence,
ResultCoverage,
TypeAliasCoverage,
CrossAuditFinding,
CrossAuditFindings,
DecompositionCost,
OptimizationCandidate,
AccessPattern,
Frequency,
)
def _field_names_for_aggregate(aggregate: str, type_registry: dict) -> set[str]:
"""Get the canonical field names for an aggregate from the type registry.
If not in the registry, return an empty set (unknown fields).
"""
if aggregate in type_registry:
return {f["name"] for f in type_registry[aggregate].get("fields", [])}
return set()
def _analyze_function_field_accesses(func_node: ast.FunctionDef | ast.AsyncFunctionDef, param_names: set[str]) -> Counter:
"""Walk a function body and count field accesses on the given param names.
Recognizes 4 patterns:
- entry['key'] -> ('subscript', 'key')
- entry.attr -> ('attribute', 'attr')
- entry.get('key') / entry.get('key', default) -> ('subscript', 'key') (call subscripts)
- chained entry.attr1.attr2 -> ('attribute', 'attr1'), ('attribute', 'attr2')
"""
counts: Counter = Counter()
for sub in ast.walk(func_node):
if isinstance(sub, ast.Subscript):
if isinstance(sub.value, ast.Name) and sub.value.id in param_names:
if isinstance(sub.slice, ast.Constant) and isinstance(sub.slice.value, str):
counts[("subscript", sub.slice.value)] += 1
elif isinstance(sub.value, ast.Call):
call = sub.value
func = call.func
if isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name) and func.value.id in param_names and func.attr == "get":
if call.args and isinstance(call.args[0], ast.Constant) and isinstance(call.args[0].value, str):
counts[("subscript", call.args[0].value)] += 1
elif isinstance(sub, ast.Attribute):
if isinstance(sub.value, ast.Name) and sub.value.id in param_names:
counts[("attribute", sub.attr)] += 1
return counts
def _analyze_function_param_names(func_node: ast.FunctionDef | ast.AsyncFunctionDef) -> set[str]:
"""Get the parameter names from a function definition."""
names: set[str] = set()
for arg in func_node.args.args + func_node.args.kwonlyargs + func_node.args.posonlyargs:
names.add(arg.arg)
if func_node.args.vararg:
names.add(func_node.args.vararg.arg)
if func_node.args.kwarg:
names.add(func_node.args.kwarg.arg)
return names
def analyze_consumer_fields(
function_ref: FunctionRef,
aggregate: str,
src_dir: str = "src",
type_registry: dict | None = None,
) -> tuple[Counter, list[str], bool]:
"""For a consumer function, find which fields of the aggregate it accesses.
Returns:
- field_counts: Counter of (kind, field_name) -> access count
- accessed_fields: sorted list of accessed field names
- has_direct_access: True if function passes the aggregate without field access
"""
type_registry = type_registry or {}
canonical_fields = _field_names_for_aggregate(aggregate, type_registry)
_p = Path(function_ref.file)
if _p.exists():
filepath = _p
elif _p.is_absolute():
filepath = _p
else:
filepath = Path(src_dir) / function_ref.file
if not filepath.exists():
return Counter(), [], False
try:
source = filepath.read_text(encoding="utf-8")
tree = ast.parse(source)
except (OSError, SyntaxError):
return Counter(), [], False
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == function_ref.fqname.rsplit(".", 1)[-1]:
param_names = _analyze_function_param_names(node)
counts = _analyze_function_field_accesses(node, param_names)
accessed = sorted({key for kind, key in counts.keys()})
typed_count = sum(c for (kind, key), c in counts.items() if key in canonical_fields) if canonical_fields else 0
has_direct = typed_count == 0 and len(counts) == 0
return counts, accessed, has_direct
return Counter(), [], False
def analyze_producer_size(
function_ref: FunctionRef,
aggregate: str,
src_dir: str = "src",
) -> tuple[int, list[str]]:
"""For a producer function, count fields in its return dict literal.
Returns (field_count, field_names).
"""
_p2 = Path(function_ref.file)
if _p2.exists():
filepath = _p2
elif _p2.is_absolute():
filepath = _p2
else:
filepath = Path(src_dir) / function_ref.file
if not filepath.exists():
return 0, []
try:
source = filepath.read_text(encoding="utf-8")
tree = ast.parse(source)
except (OSError, SyntaxError):
return 0, []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name == function_ref.fqname.rsplit(".", 1)[-1]:
return_statements = [s for s in ast.walk(node) if isinstance(s, ast.Return)]
for ret in return_statements:
if ret.value is None:
continue
field_names: list[str] = []
if isinstance(ret.value, ast.Dict):
for k in ret.value.keys:
if isinstance(k, ast.Constant) and isinstance(k.value, str):
field_names.append(k.value)
if field_names:
return len(field_names), field_names
if isinstance(ret.value, ast.Call):
func_name = ""
if isinstance(ret.value.func, ast.Name):
func_name = ret.value.func.id
elif isinstance(ret.value.func, ast.Attribute):
func_name = ret.value.func.attr
if "Result" in func_name or "to_dict" in func_name or "load" in func_name:
return 5, ["unknown (via " + func_name + ")"]
return 0, []
return 0, []
def analyze_consumer_pattern(
function_ref: FunctionRef,
aggregate: str,
type_registry: dict | None = None,
src_dir: str = "src",
) -> AccessPattern:
"""Determine the access pattern for one consumer function."""
counts, _, has_direct = analyze_consumer_fields(function_ref, aggregate, src_dir, type_registry)
if has_direct:
return "whole_struct"
distinct_keys = {key for kind, key in counts.keys()}
if len(distinct_keys) <= 1:
return "whole_struct"
if len(distinct_keys) >= 3:
return "field_by_field"
return "mixed"
def aggregate_pattern_from_consumers(
consumers: tuple[FunctionRef, ...],
aggregate: str,
type_registry: dict | None = None,
src_dir: str = "src",
) -> tuple[AccessPattern, dict[str, int], list[AccessPatternEvidence]]:
"""Compute aggregate-level access pattern from per-consumer patterns.
Returns: (dominant_pattern, per_pattern_counts, evidence_list)
"""
type_registry = type_registry or {}
per_pattern_counts: dict[str, int] = {}
evidence_list: list[AccessPatternEvidence] = []
for ref in consumers:
counts, accessed, has_direct = analyze_consumer_fields(ref, aggregate, src_dir, type_registry)
if has_direct:
pattern = "whole_struct"
else:
distinct_keys = {key for kind, key in counts.keys()}
if len(distinct_keys) <= 1:
pattern = "whole_struct"
elif len(distinct_keys) >= 3:
pattern = "field_by_field"
else:
pattern = "mixed"
per_pattern_counts[pattern] = per_pattern_counts.get(pattern, 0) + 1
evidence_list.append(AccessPatternEvidence(
function=ref,
pattern=pattern,
field_accesses={key: counts[(kind, key)] for kind, key in counts.keys()},
confidence="high" if counts else "low",
))
if not per_pattern_counts:
return "mixed", {}, []
winner = max(per_pattern_counts, key=per_pattern_counts.get)
total = sum(per_pattern_counts.values())
share = per_pattern_counts[winner] / total
if share <= 0.25:
return "mixed", per_pattern_counts, evidence_list
return winner, per_pattern_counts, evidence_list
def compute_real_type_alias_coverage(
aggregate: str,
producers: tuple[FunctionRef, ...],
consumers: tuple[FunctionRef, ...],
type_registry: dict | None = None,
src_dir: str = "src",
) -> TypeAliasCoverage:
"""Compute real type_alias_coverage: count typed vs untyped field-access sites.
A site is typed if the field name matches the aggregate's canonical field set.
A site is untyped otherwise (wildcard / unknown).
"""
type_registry = type_registry or {}
canonical_fields = _field_names_for_aggregate(aggregate, type_registry)
total_sites = 0
typed_sites = 0
for ref in consumers:
counts, _, _ = analyze_consumer_fields(ref, aggregate, src_dir, type_registry)
for (kind, key), c in counts.items():
total_sites += c
if canonical_fields and key in canonical_fields:
typed_sites += c
if total_sites == 0:
return TypeAliasCoverage(total_sites=0, typed_sites=0, untyped_sites=0, summary="0 sites")
untyped = total_sites - typed_sites
pct_t = (typed_sites / total_sites * 100) if total_sites > 0 else 0
pct_u = (untyped / total_sites * 100) if total_sites > 0 else 0
summary = f"{total_sites} sites; {typed_sites} typed ({pct_t:.0f}%); {untyped} untyped ({pct_u:.0f}%)"
return TypeAliasCoverage(
total_sites=total_sites,
typed_sites=typed_sites,
untyped_sites=untyped,
summary=summary,
)
def estimate_struct_size(
aggregate: str,
producers: tuple[FunctionRef, ...],
type_registry: dict | None = None,
src_dir: str = "src",
) -> int:
"""Estimate the size (field count) of the aggregate from producer return shapes.
Takes the maximum field count across all producers (the widest producer
is the aggregate's effective size).
"""
type_registry = type_registry or {}
max_size = 0
for ref in producers:
size, _ = analyze_producer_size(ref, aggregate, src_dir)
if size > max_size:
max_size = size
return max_size
def compute_real_decomposition_cost(
aggregate: str,
producers: tuple[FunctionRef, ...],
consumers: tuple[FunctionRef, ...],
access_pattern: AccessPattern,
frequency: Frequency,
type_registry: dict | None = None,
src_dir: str = "src",
) -> DecompositionCost:
"""Compute the DecompositionCost from real data.
struct_field_count: max field count across producers
struct_frozen: True for TypeAlias-based aggregates (always frozen by convention)
componentize_savings: based on field_by_field + many-fields detection
unify_savings: based on whole_struct + small-struct detection
"""
from code_path_audit import (
recommended_direction,
generate_rationale,
per_call_cost_us,
current_total_us,
)
type_registry = type_registry or {}
struct_field_count = estimate_struct_size(aggregate, producers, type_registry, src_dir)
struct_frozen = True
if struct_field_count == 0:
struct_field_count = len(_field_names_for_aggregate(aggregate, type_registry)) or 5
hot_field_count = 2
per_call = per_call_cost_us(struct_field_count, hot_path_field_count=hot_field_count, struct_frozen=struct_frozen)
total_us = current_total_us(per_call, frequency)
direction = recommended_direction(access_pattern, struct_field_count, struct_frozen, frequency, hot_field_count)
rationale = generate_rationale(aggregate, access_pattern, frequency, struct_field_count, struct_frozen, direction)
if access_pattern == "field_by_field" and struct_field_count > 5:
c_savings = int(total_us * 0.30)
else:
c_savings = 0
if access_pattern == "whole_struct" and struct_field_count <= 5:
u_savings = int(total_us * 0.15)
else:
u_savings = 0
return DecompositionCost(
current_cost_estimate=total_us,
componentize_savings=c_savings,
unify_savings=u_savings,
recommended_direction=direction,
recommended_rationale=rationale,
batch_size=None,
struct_field_count=struct_field_count,
struct_frozen=struct_frozen,
)
def extract_real_optimization_candidates(
aggregate: str,
producers: tuple[FunctionRef, ...],
consumers: tuple[FunctionRef, ...],
decomposition_cost: DecompositionCost,
type_registry: dict | None = None,
src_dir: str = "src",
) -> tuple[OptimizationCandidate, ...]:
"""Extract real optimization candidates from actual data.
Generates candidates for:
- Fat struct detection (struct_field_count > 10 + not frozen): componentize
- Field-by-field detection: componentize when field count is large
- Whole struct small: unify when field count is small
"""
if decomposition_cost.recommended_direction == "hold":
return ()
direction = decomposition_cost.recommended_direction
if direction == "insufficient_data":
return ()
struct_size = decomposition_cost.struct_field_count
affected = sorted({f.file for f in producers} | {f.file for f in consumers})
if direction == "componentize":
candidate = f"Componentize {aggregate} (struct_field_count={struct_size}); split into smaller dataclasses"
effort = "medium" if struct_size > 15 else "small"
priority = "high" if struct_size > 20 else "medium"
elif direction == "unify":
candidate = f"Unify {aggregate} consumers into wider fat structs (current struct_field_count={struct_size})"
effort = "small"
priority = "low"
else:
return ()
return (OptimizationCandidate(
candidate=candidate,
direction=direction,
affected_files=tuple(affected),
estimated_savings_us=decomposition_cost.componentize_savings + decomposition_cost.unify_savings,
effort=effort,
priority=priority,
cross_ref=f"conductor/tracks/code_path_audit_20260607/spec_v2.md#section-7.5",
),)
@@ -0,0 +1,172 @@
"""Per-aggregate cross-audit mapping.
Maps each audit finding (file:line) to one or more aggregates
via the PCG's producers + consumers dictionaries.
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from code_path_audit import (
CrossAuditFinding,
CrossAuditFindings,
FunctionRef,
find_enclosing_function,
)
AUDIT_BUCKET_FIELDS: dict[str, str] = {
"audit_weak_types": "weak_types",
"audit_exception_handling": "exception_handling",
"audit_optional_in_3_files": "optional_in_baseline",
"audit_no_models_config_io": "config_io_ownership",
"audit_main_thread_imports": "import_graph",
}
def _all_function_refs(
producers: dict[str, list[FunctionRef]],
consumers: dict[str, list[FunctionRef]],
) -> list[FunctionRef]:
"""Flatten all FunctionRefs from the PCG dicts."""
out: list[FunctionRef] = []
for refs in producers.values():
out.extend(refs)
for refs in consumers.values():
out.extend(refs)
return out
def _file_to_aggregates(
producers: dict[str, list[FunctionRef]],
consumers: dict[str, list[FunctionRef]],
) -> dict[str, set[str]]:
"""Build a {file: {aggregate, ...}} index for file-level fallback mapping."""
out: dict[str, set[str]] = {}
for aggregate, refs in producers.items():
for r in refs:
out.setdefault(_normalize_path(r.file), set()).add(aggregate)
for aggregate, refs in consumers.items():
for r in refs:
out.setdefault(_normalize_path(r.file), set()).add(aggregate)
return out
def _aggregate_for_fqname(
fqname: str,
producers: dict[str, list[FunctionRef]],
consumers: dict[str, list[FunctionRef]],
) -> str:
"""Find which aggregate this FunctionRef is associated with."""
for ag, refs in producers.items():
if any(r.fqname == fqname for r in refs):
return ag
for ag, refs in consumers.items():
if any(r.fqname == fqname for r in refs):
return ag
return ""
def _normalize_path(p: str) -> str:
"""Normalize file path separators for comparison."""
return p.replace("\\", "/")
def map_finding_to_aggregates(
file: str,
line: int,
producers: dict[str, list[FunctionRef]],
consumers: dict[str, list[FunctionRef]],
) -> set[str]:
"""Map a (file, line) finding to a set of aggregate names.
Tier 1: function lookup via find_enclosing_function (with line=0 fallback
to file-only match). Tier 2: file heuristic via the PCG's file index.
File paths are normalized to forward-slash form for comparison.
"""
all_refs = _all_function_refs(producers, consumers)
normalized = _normalize_path(file)
fref = find_enclosing_function(file=normalized, line=line, function_refs=all_refs)
if fref is None:
same_file = [r for r in all_refs if _normalize_path(r.file) == normalized]
return {_aggregate_for_fqname(r.fqname, producers, consumers) for r in same_file}
return {_aggregate_for_fqname(fref.fqname, producers, consumers)}
def aggregate_findings(
audit_name: str,
findings: list[dict],
producers: dict[str, list[FunctionRef]],
consumers: dict[str, list[FunctionRef]],
) -> dict[str, list[CrossAuditFinding]]:
"""Group findings by aggregate via the PCG.
Mapping tiers:
1. Function lookup (find_enclosing_function) -> exact match
2. File-level fallback (file has any producer/consumer of the aggregate)
3. Unbucketed (the file has no Metadata-touching functions)
"""
out: dict[str, list[CrossAuditFinding]] = {}
file_index = _file_to_aggregates(producers, consumers)
for finding in findings:
file = finding.get("file", "") or finding.get("filename", "")
line = int(finding.get("line", 0) or 0)
note = finding.get("category", "") or finding.get("body_summary", "") or finding.get("note", "") or ""
aggregates = map_finding_to_aggregates(file, line, producers, consumers)
if not aggregates:
normalized = _normalize_path(file)
aggregates = file_index.get(normalized, set())
if not aggregates:
aggregates = {""}
for aggregate in aggregates:
cf = CrossAuditFinding(
audit_script=audit_name,
site_count=1,
example_file=file,
example_line=line,
note=note,
)
out.setdefault(aggregate, []).append(cf)
return out
def build_cross_audit_findings_for_aggregate(
aggregate: str,
aggregated: dict[str, dict[str, list[CrossAuditFinding]]],
) -> CrossAuditFindings:
"""Build a CrossAuditFindings struct for one aggregate from aggregated data."""
weak = ()
exc = ()
opt = ()
cfg = ()
imp = ()
for audit_name, by_agg in aggregated.items():
findings = by_agg.get(aggregate, [])
if not findings:
continue
bucket = AUDIT_BUCKET_FIELDS.get(audit_name, "")
total = len(findings)
first = findings[0]
combined = CrossAuditFinding(
audit_script=audit_name,
site_count=total,
example_file=first.example_file,
example_line=first.example_line,
note=f"{total} sites",
)
if bucket == "weak_types":
weak = (combined,)
elif bucket == "exception_handling":
exc = (combined,)
elif bucket == "optional_in_baseline":
opt = (combined,)
elif bucket == "config_io_ownership":
cfg = (combined,)
elif bucket == "import_graph":
imp = (combined,)
return CrossAuditFindings(
weak_types=weak,
exception_handling=exc,
optional_in_baseline=opt,
config_io_ownership=cfg,
import_graph=imp,
)
@@ -0,0 +1,292 @@
"""Generate the MVP AUDIT_REPORT.md from a list of AggregateProfiles.
Single coherent report that embeds:
- Executive summary with the verdict
- Findings sorted by severity
- Full per-aggregate profiles (15 sections each)
- SSDL analysis rollup
- Organization deductions
- Restructuring routes
- Verification + reproduction steps
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from code_path_audit import AggregateProfile
def strip_h1(text: str) -> str:
lines = text.split("\n")
if lines and lines[0].startswith("# "):
return "\n".join(lines[1:]).lstrip("\n")
return text
def generate_audit_report(
profiles: tuple[AggregateProfile, ...],
output_dir: Path,
date: str,
) -> str:
"""Generate the MVP audit report as a single string."""
agg_dir = output_dir / "aggregates"
parts: list[str] = []
parts.append(f"""# Code Path & Data Pipeline Audit Report
**Date:** {date}
**Branch:** `tier2/code_path_audit_20260607`
**Scope:** {len(profiles)} aggregates (10 real + 3 candidates) across `src/`
**Method:** AST-walking producer/consumer graph + SSDL analysis (effective codepaths, nil-check detection, field-access efficiency)
---
## 1. Executive Summary
**The audit found one critical structural problem in the codebase: the `Metadata` aggregate is a combinatoric-explosion bottleneck sitting at the center of every AI turn.**
| Verdict | Count | Aggregates |
|---|---|---|
| needs restructuring | 10 | All 10 real aggregates |
| well-organized | 0 | (none) |
| moderate | 0 | (none) |
**The Metadata aggregate is the dominant coupling point.** Real numbers from the audit (top 50 consumer/producer functions analyzed per aggregate; AST-walked from `src/`):
- **{sum(len(p.consumers) for p in profiles if not p.is_candidate)} total consumer functions** across the 10 real aggregates
- **{sum(p.type_alias_coverage.total_sites for p in profiles if not p.is_candidate)} total field-access sites** detected
- **{sum(p.type_alias_coverage.typed_sites for p in profiles if not p.is_candidate)} typed sites ({sum(p.type_alias_coverage.typed_sites for p in profiles if not p.is_candidate) / max(1, sum(p.type_alias_coverage.total_sites for p in profiles if not p.is_candidate)) * 100:.0f}% field efficiency)**
**The dominant pattern is "frozen on the outside, drilled into on the inside."** The aggregates are nominally immutable (frozen + whole_struct), but consumers reach through them via string-key dict access (`entry.get('key', default)`), which is exactly the pattern Fleury's combinatoric-explosion article warns creates branch-explosion risk.
**Three concrete refactor routes (Fleury's SSDL defusing techniques):**
1. **Nil Sentinel `[N]`** for the 6 nil-check functions. Introduces `NIL_METADATA = Metadata(...)` with safe defaults. Collapses nil-check branches into sentinel-return.
2. **Generational Handle** wrapping Metadata. Turns lifetime branches into 1 lookup + 1 generation comparison.
3. **Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]`** for the untyped field-access sites. Reduces string-keyed lookups to 1 cache fetch.
---
## 2. Methodology
The audit is implemented in `scripts/code_path_audit/code_path_audit.py` (the main pipeline) plus 5 supporting modules:
| Module | Purpose |
|---|---|
| `scripts/code_path_audit/code_path_audit.py` | Pipeline orchestrator + 5 enums + 9 dataclasses + AggregateProfile + run_audit + render_rollups |
| `scripts/code_path_audit/code_path_audit_analysis.py` | AST-walking analyzers: field counts, producer size, access pattern, type alias coverage, decomposition cost |
| `scripts/code_path_audit/code_path_audit_cross_audit.py` | 3-tier finding-to-aggregate mapping (function lookup -> file-level fallback -> unbucketed) |
| `scripts/code_path_audit/code_path_audit_render.py` | Per-profile markdown renderer (15 sections per aggregate) |
| `scripts/code_path_audit/code_path_audit_rollups.py` | Cross-aggregate rollups (call graph, hot paths, field usage, dead fields) |
| `scripts/code_path_audit/code_path_audit_ssdl.py` | **SSDL analysis layer** (the deductions engine: effective codepaths, nil-check detection, defusing techniques) |
**Pipeline steps:**
1. **PCG (Producer-Consumer Graph)** - AST-walks each `src/*.py` file with 3 passes:
- P1: find functions whose return annotation matches an aggregate type (including `dict[str, Any]` -> all aliases pointing to dict)
- P2: find functions whose parameter annotation matches an aggregate type (same alias resolution)
- P3: find field-access sites via `entry['key']`, `entry.get('key')`, or `entry.attr`
2. **Alias resolution** - `_resolve_aliases()` maps `dict[str, Any]` to all aliases pointing to it (Metadata, CommsLogEntry, HistoryMessage, FileItem, ToolDefinition, ToolCall)
3. **MemoryDim classification** - overrides > canonical mappings > file-of-origin heuristic > `unknown`
4. **APD (Access Pattern Detection)** - for each consumer function, count field-access patterns; aggregate-level pattern = dominant of: `whole_struct`, `field_by_field`, `hot_cold_split`, `bulk_batched`, `mixed`
5. **CFE (Call Frequency Estimation)** - entry-point heuristic on caller name; classifies as `per_turn`, `per_request`, etc.
6. **Decomposition Cost** - `per_call_cost_us = 50 * struct_field_count + 100 * hot_field_count + 20 * frozen_bonus`; scaled by frequency
7. **Cross-audit integration** - reads 6 input JSONs (weak_types, exception_handling, optional_in_baseline, config_io_ownership, import_graph, type_registry); maps findings to aggregates via 3-tier lookup
8. **SSDL analysis** - computes effective codepaths (sum of 2^branches per consumer), detects nil-check patterns, computes field-access efficiency, suggests defusing techniques
---
## 3. Findings (sorted by severity)
### Finding 1 (CRITICAL): Metadata aggregate has 4.01e22 effective codepaths
**Severity:** Critical. The Metadata aggregate sits at the center of every AI turn dispatch.
**Real numbers (top 50 functions analyzed):**
- 483 producers across the codebase
- 752 consumers across the codebase
- 123 field-access sites detected (0 typed)
- 3466 branch points across consumer functions
- 6 nil-check functions
**Root cause:** The `Metadata` TypeAlias resolves to `dict[str, Any]`. Functions typed as `entry: dict[str, Any]` (very common) all resolve to Metadata. They reach through with `entry.get('key', default)` patterns, multiplying branches.
**Three fixes:**
#### Fix 1: Nil Sentinel `[N]` (low effort, ~1 hour)
Introduce `NIL_METADATA = Metadata(...)` with safe defaults. Replace `if entry:` checks with `entry or NIL_METADATA`. Net effect: 6 nil-check branches collapse to 1 sentinel-return path.
#### Fix 2: Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]` (medium effort, ~half day)
Introduce `MetadataFieldCache` keyed by aggregate + field name. Consumers request `(metadata_id, 'field_name')`, get cached value. The 123 sites become 123 cache lookups.
#### Fix 3: Generational Handle (medium effort, ~half day)
Wrap `Metadata` in `(index, generation)` resolved through a registry. Validation is one comparison; mismatch returns the nil sentinel from Fix 1. 3466 lifetime branches collapse to 1 lookup + 1 generation comparison.
### Finding 2 (HIGH): All other dict[str, Any] aggregates show similar patterns
The alias resolution makes 5 additional aggregates appear with similar profiles:
- FileItem: 117 producers / 66 consumers / 135 sites
- CommsLogEntry: 117 / 66 / 135
- HistoryMessage: 118 / 68 / 137
- ToolDefinition: 119 / 66 / 135
- ToolCall: 118 / 67 / 136
These are all aliases for `dict[str, Any]`. They share the same pattern: nominal immutability with pervasive string-key reach-through.
### Finding 3 (LOW): List-typed aggregates have narrower scope
- CommsLog (`list[CommsLogEntry]`): 6 producers / 5 consumers / 4 sites
- History (`list[HistoryMessage]`): 7 / 7 / 8
- FileItems (`list[FileItem]`): 6 / 9 / 6
These are smaller in scope but the same pattern applies.
### Finding 4 (DATA-GAP): Result aggregate shows 0 producers/0 consumers
`Result` is a `dataclass`, not a `dict[str, Any]` alias. The PCG catches it via typed signatures but no functions in `src/` directly produce/consume it with the typed annotation.
### Finding 5 (CANDIDATES): 3 candidate aggregates remain placeholders
ToolSpec, ChatMessage, ProviderHistory are forward-compat placeholders for `any_type_componentization_20260621`. Real profiles would require that track merging first.
---
## 4. Per-Aggregate Profiles
Each aggregate has its full 15-section profile in `aggregates/<name>.md`. This section embeds the key per-aggregate data inline.
""")
# Per-aggregate compact summary
real_profiles = [p for p in profiles if not p.is_candidate]
parts.append("### Per-aggregate summary table\n\n")
parts.append("| Aggregate | Memory dim | Pattern | Producers | Consumers | Sites | Typed | Branches | Effective codepaths |\n")
parts.append("|---|---|---|---|---|---|---|---|---|\n")
from code_path_audit_ssdl import compute_effective_codepaths
for p in real_profiles:
ec = compute_effective_codepaths(p, "src")
branches = sum(1 for _ in [p]) # placeholder
parts.append(
f"| `{p.name}` | {p.memory_dim} | {p.access_pattern} | "
f"{len(p.producers)} | {len(p.consumers)} | "
f"{p.type_alias_coverage.total_sites} | {p.type_alias_coverage.typed_sites} | "
f"{p.decomposition_cost.struct_field_count} | {ec:.2e} |\n"
)
parts.append("\n---\n\n")
# Embed each per-aggregate .md file
parts.append("## 5. Per-Aggregate Detail (full profiles inlined)\n\n")
for agg_name in ["Metadata", "FileItems", "CommsLog", "CommsLogEntry", "FileItem", "History", "HistoryMessage", "Result", "ToolCall", "ToolDefinition", "ChatMessage", "ProviderHistory", "ToolSpec"]:
md_path = agg_dir / f"{agg_name}.md"
if md_path.exists():
text = strip_h1(md_path.read_text(encoding="utf-8"))
parts.append(f"\n\n### 5.{['Metadata', 'FileItems', 'CommsLog', 'CommsLogEntry', 'FileItem', 'History', 'HistoryMessage', 'Result', 'ToolCall', 'ToolDefinition', 'ChatMessage', 'ProviderHistory', 'ToolSpec'].index(agg_name)+1} {agg_name}\n\n")
parts.append(text)
parts.append("\n\n---\n\n")
# SSDL rollup
parts.append("## 6. SSDL Analysis Rollup\n\n")
parts.append("Per-aggregate analysis: effective codepaths, branch points, defusing opportunities.\n\n")
parts.append("| Aggregate | Consumers | Total branches | Effective codepaths | Field efficiency |\n")
parts.append("|---|---|---|---|---|\n")
from code_path_audit_ssdl import compute_effective_codepaths, count_branches_in_function, compute_field_access_efficiency
for p in sorted(real_profiles, key=lambda p: -compute_effective_codepaths(p, "src")):
ec = compute_effective_codepaths(p, "src")
tc = sum(count_branches_in_function(f, "src") for f in p.consumers)
eff = compute_field_access_efficiency(p) * 100
parts.append(f"| `{p.name}` | {len(p.consumers)} | {tc} | {ec} | {eff:.0f}% |\n")
parts.append("\n\n---\n\n")
# Organization deductions
parts.append("## 7. Organization Deductions\n\n")
parts.append("Cross-aggregate view of codebase organization.\n\n")
parts.append("| Aggregate | Verdict | Notes |\n")
parts.append("|---|---|---|\n")
from code_path_audit_ssdl import detect_nil_check_pattern
for p in real_profiles:
ec = compute_effective_codepaths(p, "src")
eff = compute_field_access_efficiency(p) * 100
nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, "src"))
if ec <= 50 and eff >= 50:
verdict = "well-organized"
elif ec > 200 or eff < 20:
verdict = "needs restructuring"
else:
verdict = "moderate"
notes: list[str] = []
if nil_count > 0:
notes.append(f"{nil_count} nil checks")
if eff < 50:
notes.append(f"{eff:.0f}% field efficiency")
if ec > 100:
notes.append(f"{ec:.2e} effective codepaths")
note_str = "; ".join(notes) if notes else "no major issues"
parts.append(f"| `{p.name}` | {verdict} | {note_str} |\n")
parts.append("\n\n")
# Restructuring routes
parts.append("## 8. Restructuring Routes (Prioritized)\n\n")
parts.append("| Priority | Aggregate | Fix | Effort | Codepath reduction |\n")
parts.append("|---|---|---|---|---|\n")
parts.append("| 1 | Metadata | Nil Sentinel + Immediate-Mode Cache | ~half day | 4.01e22 -> 123 |\n")
parts.append("| 2 | Metadata | Generational Handle | ~half day | 4.01e22 -> 752 |\n")
parts.append("| 3 | FileItem | Typed field migration | ~half day | reduces string-key access |\n")
parts.append("| 4 | CommsLogEntry | Typed field migration | ~half day | reduces string-key access |\n")
parts.append("| 5 | HistoryMessage | Typed field migration | ~half day | reduces string-key access |\n")
parts.append("| 6 | ToolDefinition | Typed field migration | ~half day | reduces string-key access |\n")
parts.append("| 7 | ToolCall | Typed field migration | ~half day | reduces string-key access |\n")
parts.append("| 8 | CommsLog/History/FileItems | Nil sentinel for list-typed | ~1 hour each | minor |\n")
parts.append("\n\n---\n\n")
# Verification
parts.append("## 9. Verification\n\n")
parts.append("- **131 tests passing** (96 unit + 15 phase78 + 13 phase89 + 7 integration)\n")
parts.append("- **Meta-audit clean** (0 violations on `audit_code_path_audit_coverage.py --strict`)\n")
parts.append("- **All 13 aggregates have audit artifacts** in `aggregates/` (10 real + 3 candidate placeholders)\n\n")
parts.append("### Audit gates\n\n")
parts.append("| Gate | Status |\n|---|---|\n")
parts.append("| `audit_exception_handling.py --strict` | PASS (informational) |\n")
parts.append("| `audit_main_thread_imports.py` | PASS |\n")
parts.append("| `audit_no_models_config_io.py` | PASS |\n")
parts.append("| `audit_code_path_audit_coverage.py --strict` | PASS (0 violations) |\n")
parts.append("| `audit_weak_types.py --strict` | REGRESSION (from cherry-picked commits on master, not from this track) |\n")
parts.append("| `audit_optional_in_3_files.py --strict` | REGRESSION (7 pre-existing `Optional[T]` violations) |\n\n")
parts.append("---\n\n")
# Reproduction
parts.append("## 10. Reproducing This Audit\n\n")
parts.append("```powershell\n")
parts.append("# Generate the 6 input JSONs\n")
parts.append("uv run python scripts/audit_weak_types.py --json > tests/artifacts/audit_inputs/audit_weak_types.json\n")
parts.append("uv run python scripts/audit_exception_handling.py --json > tests/artifacts/audit_inputs/audit_exception_handling.json\n")
parts.append("uv run python scripts/audit_optional_in_3_files.py --json > tests/artifacts/audit_inputs/audit_optional_in_3_files.json\n")
parts.append("uv run python scripts/audit_no_models_config_io.py --json > tests/artifacts/audit_inputs/audit_no_models_config_io.json\n")
parts.append("uv run python scripts/audit_main_thread_imports.py --json > tests/artifacts/audit_inputs/audit_main_thread_imports.json\n")
parts.append("uv run python scripts/generate_type_registry.py --json > tests/artifacts/audit_inputs/type_registry.json\n\n")
parts.append("# Run the v2 audit\n")
parts.append("uv run python -c \"import sys; sys.path.insert(0, 'scripts/code_path_audit'); from code_path_audit import run_audit, render_rollups; from pathlib import Path; result = run_audit(src_dir='src', audit_inputs_dir='tests/artifacts/audit_inputs', output_dir='docs/reports/code_path_audit', date='2026-06-22'); render_rollups(result.data, Path('docs/reports/code_path_audit/2026-06-22'))\"\n\n")
parts.append("# Run the meta-audit\n")
parts.append("uv run python scripts/audit_code_path_audit_coverage.py --input-dir docs/reports/code_path_audit/2026-06-22/ --strict\n\n")
parts.append("# Run the tests\n")
parts.append("uv run pytest tests/test_code_path_audit.py tests/test_code_path_audit_phase78.py tests/test_code_path_audit_phase89.py tests/test_code_path_audit_integration.py\n")
parts.append("```\n\n")
parts.append("---\n\n")
# See also
parts.append("## 11. See Also\n\n")
parts.append("**Per-aggregate detailed profiles (13 files):**\n\n")
for agg_name in ["Metadata", "FileItems", "CommsLog", "CommsLogEntry", "FileItem", "History", "HistoryMessage", "Result", "ToolCall", "ToolDefinition", "ChatMessage", "ProviderHistory", "ToolSpec"]:
parts.append(f"- `aggregates/{agg_name}.md` - 15-section detailed profile\n")
parts.append("\n**Track artifacts:**\n\n")
parts.append("- `TRACK_COMPLETION_code_path_audit_20260622.md` - the track completion report\n")
parts.append("- `conductor/tracks/code_path_audit_20260607/spec_v2.md` - canonical spec\n")
parts.append("- `conductor/tracks/code_path_audit_20260607/plan_v2.md` - canonical plan\n")
parts.append("- `conductor/code_styleguides/code_path_audit.md` - 5-convention styleguide\n")
return "".join(parts)
@@ -0,0 +1,332 @@
"""Enriched markdown renderers for code_path_audit v2.
Provides per-profile detail: call graph, field access breakdown,
struct shape, frequency per function, and concrete optimization
candidates. Designed for 2k+ line audit reports.
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from collections import Counter
from code_path_audit import (
AggregateProfile,
FunctionRef,
)
from code_path_audit_ssdl import render_ssdl_sketch
def render_full_markdown(profile: AggregateProfile) -> str:
"""Render the per-aggregate markdown with full detail.
Sections (15+):
1. Header (name, kind, memory_dim, is_candidate, totals)
2. Pipeline summary (producer/consumer counts)
3. Producers detail (per-producer: file, role, fields returned)
4. Consumers detail (per-consumer: file, role, fields accessed)
5. Field access matrix (every field x every consumer)
6. Access pattern (dominant + per-function breakdown)
7. Frequency (aggregate-level + per-function)
8. Result coverage
9. Type alias coverage (typed vs untyped breakdown)
10. Cross-audit findings (per bucket, with examples)
11. Decomposition cost (current/savings/direction/rationale)
12. Struct shape (inferred from producer return shapes)
13. Optimization candidates (concrete refactor steps)
14. Verdict (1-sentence summary)
15. Evidence appendix (every per-function evidence item)
"""
lines: list[str] = []
# Header
lines.append(f"# Aggregate Profile: {profile.name}")
lines.append("")
lines.append(f"**Aggregate kind:** {profile.aggregate_kind}")
lines.append(f"**Memory dim:** {profile.memory_dim}")
lines.append(f"**Is candidate:** {profile.is_candidate}")
lines.append("")
# Pipeline summary
lines.append("## Pipeline summary")
lines.append("")
lines.append(f"- Producers: {len(profile.producers)}")
lines.append(f"- Consumers: {len(profile.consumers)}")
lines.append(f"- Distinct producer fqnames: {len({f.fqname for f in profile.producers})}")
lines.append(f"- Distinct consumer fqnames: {len({f.fqname for f in profile.consumers})}")
lines.append(f"- Access pattern (aggregate): {profile.access_pattern}")
lines.append(f"- Frequency (aggregate): {profile.frequency}")
lines.append(f"- Decomposition direction: {profile.decomposition_cost.recommended_direction}")
lines.append(f"- Struct field count (estimated): {profile.decomposition_cost.struct_field_count}")
lines.append("")
# Producers detail
lines.append(f"## Producers ({len(profile.producers)})")
lines.append("")
if profile.producers:
# Group by file
by_file: dict[str, list[FunctionRef]] = {}
for p in profile.producers:
by_file.setdefault(p.file, []).append(p)
for file in sorted(by_file.keys()):
funcs = by_file[file]
lines.append(f"### `{file}` ({len(funcs)} producer{'s' if len(funcs) != 1 else ''})")
lines.append("")
for f in funcs:
lines.append(f"- `{f.fqname}` (line {f.line})")
lines.append("")
else:
lines.append("_(none)_")
lines.append("")
# Consumers detail
lines.append(f"## Consumers ({len(profile.consumers)})")
lines.append("")
if profile.consumers:
by_file = {}
for c in profile.consumers:
by_file.setdefault(c.file, []).append(c)
for file in sorted(by_file.keys()):
funcs = by_file[file]
lines.append(f"### `{file}` ({len(funcs)} consumer{'s' if len(funcs) != 1 else ''})")
lines.append("")
for f in funcs:
lines.append(f"- `{f.fqname}` (line {f.line})")
lines.append("")
else:
lines.append("_(none)_")
lines.append("")
# Field access matrix
lines.append("## Field access matrix")
lines.append("")
if profile.access_pattern_evidence:
all_fields: set[str] = set()
for ev in profile.access_pattern_evidence:
all_fields.update(ev.field_accesses.keys())
if all_fields:
sorted_fields = sorted(all_fields)
consumer_names = [ev.function.fqname.rsplit(".", 1)[-1] for ev in profile.access_pattern_evidence]
lines.append("| consumer | " + " | ".join(sorted_fields[:20]) + " |")
lines.append("|---|" + "|".join(["---"] * min(len(sorted_fields), 20)) + "|")
for ev in profile.access_pattern_evidence:
name = ev.function.fqname.rsplit(".", 1)[-1]
cells = []
for f in sorted_fields[:20]:
count = ev.field_accesses.get(f, 0)
cells.append(str(count) if count > 0 else ".")
lines.append(f"| `{name}` | " + " | ".join(cells) + " |")
if len(sorted_fields) > 20:
lines.append("")
lines.append(f"_... {len(sorted_fields) - 20} more fields_")
else:
lines.append("_(no field accesses detected)_")
else:
lines.append("_(no field accesses detected)_")
lines.append("")
# Access pattern
lines.append("## Access pattern")
lines.append("")
lines.append(f"**Dominant pattern:** {profile.access_pattern}")
lines.append(f"**Evidence count:** {len(profile.access_pattern_evidence)}")
if profile.access_pattern_evidence:
pattern_counts: Counter[str] = Counter()
for ev in profile.access_pattern_evidence:
pattern_counts[ev.pattern] += 1
lines.append("")
lines.append("**Per-function pattern distribution:**")
lines.append("")
for pat, count in pattern_counts.most_common():
pct = count / len(profile.access_pattern_evidence) * 100
lines.append(f"- `{pat}`: {count} functions ({pct:.0f}%)")
lines.append("")
# SSDL Sketch (between Access pattern and Frequency)
lines.append(render_ssdl_sketch(profile, "src"))
lines.append("")
# Frequency
lines.append("## Frequency")
lines.append("")
lines.append(f"**Dominant frequency:** {profile.frequency}")
lines.append(f"**Evidence count:** {len(profile.frequency_evidence)}")
if profile.frequency_evidence:
freq_counts: Counter[str] = Counter()
for ev in profile.frequency_evidence:
freq_counts[ev.frequency] += 1
lines.append("")
lines.append("**Per-function frequency distribution:**")
lines.append("")
for freq, count in freq_counts.most_common():
lines.append(f"- `{freq}`: {count} functions")
lines.append("")
# Result coverage
lines.append("## Result coverage")
lines.append("")
lines.append(f"**Summary:** {profile.result_coverage.summary}")
lines.append("")
lines.append("| metric | value |")
lines.append("|---|---|")
lines.append(f"| total producers | {profile.result_coverage.total_producers} |")
lines.append(f"| result producers | {profile.result_coverage.result_producers} |")
lines.append(f"| total consumers | {profile.result_coverage.total_consumers} |")
lines.append(f"| result consumers | {profile.result_coverage.result_consumers} |")
lines.append("")
# Type alias coverage
lines.append("## Type alias coverage")
lines.append("")
lines.append(f"**Summary:** {profile.type_alias_coverage.summary}")
lines.append("")
lines.append("| metric | value |")
lines.append("|---|---|")
lines.append(f"| total field-access sites | {profile.type_alias_coverage.total_sites} |")
lines.append(f"| typed sites (canonical field) | {profile.type_alias_coverage.typed_sites} |")
lines.append(f"| untyped sites (wildcard) | {profile.type_alias_coverage.untyped_sites} |")
lines.append("")
# Cross-audit findings
lines.append("## Cross-audit findings")
lines.append("")
total_cf = (
len(profile.cross_audit_findings.weak_types)
+ len(profile.cross_audit_findings.exception_handling)
+ len(profile.cross_audit_findings.optional_in_baseline)
+ len(profile.cross_audit_findings.config_io_ownership)
+ len(profile.cross_audit_findings.import_graph)
)
if total_cf == 0:
lines.append("_(no cross-audit findings mapped to this aggregate)_")
else:
lines.append("| bucket | audit script | site count | example file | example line | note |")
lines.append("|---|---|---|---|---|---|")
for f in profile.cross_audit_findings.weak_types:
lines.append(f"| weak_types | `{f.audit_script}` | {f.site_count} | `{f.example_file}` | {f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.exception_handling:
lines.append(f"| exception_handling | `{f.audit_script}` | {f.site_count} | `{f.example_file}` | {f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.optional_in_baseline:
lines.append(f"| optional_in_baseline | `{f.audit_script}` | {f.site_count} | `{f.example_file}` | {f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.config_io_ownership:
lines.append(f"| config_io_ownership | `{f.audit_script}` | {f.site_count} | `{f.example_file}` | {f.example_line} | {f.note} |")
for f in profile.cross_audit_findings.import_graph:
lines.append(f"| import_graph | `{f.audit_script}` | {f.site_count} | `{f.example_file}` | {f.example_line} | {f.note} |")
lines.append("")
# Decomposition cost
lines.append("## Decomposition cost")
lines.append("")
dc = profile.decomposition_cost
lines.append(f"**Current cost estimate:** {dc.current_cost_estimate} us/turn")
lines.append(f"**Componentize savings:** {dc.componentize_savings} us/turn")
lines.append(f"**Unify savings:** {dc.unify_savings} us/turn")
lines.append(f"**Recommended direction:** {dc.recommended_direction}")
lines.append(f"**Rationale:** {dc.recommended_rationale}")
lines.append(f"**Struct field count (estimated):** {dc.struct_field_count}")
lines.append(f"**Struct frozen:** {dc.struct_frozen}")
lines.append("")
# Struct shape (inferred)
lines.append("## Struct shape (inferred from producer returns)")
lines.append("")
if profile.producers:
field_usage: Counter[str] = Counter()
for ev in profile.access_pattern_evidence:
field_usage.update(ev.field_accesses.keys())
if field_usage:
lines.append("| field | access count | access pattern |")
lines.append("|---|---|---|")
sorted_fields_by_use = field_usage.most_common()
for field_name, count in sorted_fields_by_use:
if count >= 3:
pattern = "hot"
elif count >= 1:
pattern = "used"
else:
pattern = "dead"
lines.append(f"| `{field_name}` | {count} | {pattern} |")
else:
lines.append("_(no field access data; cannot infer shape)_")
else:
lines.append("_(no producers; cannot infer shape)_")
lines.append("")
# Optimization candidates
lines.append("## Optimization candidates")
lines.append("")
if profile.optimization_candidates:
for cand in profile.optimization_candidates:
lines.append(f"### {cand.direction.upper()}: {cand.candidate}")
lines.append("")
lines.append(f"- **Effort:** {cand.effort}")
lines.append(f"- **Priority:** {cand.priority}")
lines.append(f"- **Estimated savings:** {cand.estimated_savings_us} us/turn")
lines.append(f"- **Affected files ({len(cand.affected_files)}):**")
for f in cand.affected_files:
lines.append(f" - `{f}`")
lines.append(f"- **Reference:** {cand.cross_ref}")
lines.append("")
else:
lines.append("_(no optimization candidates generated)_")
lines.append("")
# Verdict
lines.append("## Verdict")
lines.append("")
lines.append(f"{dc.recommended_rationale}")
lines.append("")
# Evidence appendix
lines.append("## Evidence appendix")
lines.append("")
if profile.access_pattern_evidence:
lines.append("### Access pattern evidence")
lines.append("")
lines.append("| function | pattern | field_accesses | confidence |")
lines.append("|---|---|---|---|")
for ev in profile.access_pattern_evidence:
fields_str = ", ".join(f"`{k}`={v}" for k, v in list(ev.field_accesses.items())[:10])
if len(ev.field_accesses) > 10:
fields_str += f" (+{len(ev.field_accesses) - 10} more)"
lines.append(f"| `{ev.function.fqname}` | `{ev.pattern}` | {fields_str} | {ev.confidence} |")
lines.append("")
if profile.frequency_evidence:
lines.append("### Frequency evidence")
lines.append("")
lines.append("| function | frequency | source | note |")
lines.append("|---|---|---|---|")
for ev in profile.frequency_evidence:
lines.append(f"| `{ev.function.fqname}` | `{ev.frequency}` | `{ev.source}` | {ev.note} |")
lines.append("")
return "\n".join(lines)
def render_field_usage_rollup(profiles: tuple[AggregateProfile, ...]) -> str:
"""Render the field usage rollup (cross-aggregate)."""
lines: list[str] = ["# Field Usage Rollup", ""]
lines.append("Cross-aggregate analysis of which fields are accessed how often across the codebase.")
lines.append("")
all_field_usage: dict[str, dict[str, int]] = {}
for p in profiles:
if p.is_candidate:
continue
for ev in p.access_pattern_evidence:
aggregate_fields = all_field_usage.setdefault(p.name, {})
for field_name, count in ev.field_accesses.items():
aggregate_fields[field_name] = aggregate_fields.get(field_name, 0) + count
if all_field_usage:
lines.append("| aggregate | field | total accesses |")
lines.append("|---|---|---|")
for aggregate in sorted(all_field_usage.keys()):
fields = all_field_usage[aggregate]
for field_name, count in sorted(fields.items(), key=lambda x: -x[1])[:10]:
lines.append(f"| `{aggregate}` | `{field_name}` | {count} |")
lines.append("")
return "\n".join(lines)
def render_call_graph_rollup(profiles: tuple[AggregateProfile, ...]) -> str:
"""Render the call graph rollup (most-touched functions per aggregate)."""
lines: list[str] = ["# Call Graph Rollup", ""]
lines.append("Functions that are producers or consumers of each aggregate, grouped by file.")
lines.append("")
for p in profiles:
if p.is_candidate:
continue
lines.append(f"## {p.name} ({len(p.producers)} producers + {len(p.consumers)} consumers)")
lines.append("")
if p.producers or p.consumers:
lines.append("| role | fqname | file |")
lines.append("|---|---|---|")
for prod in p.producers:
lines.append(f"| producer | `{prod.fqname}` | `{prod.file}` |")
for cons in p.consumers:
lines.append(f"| consumer | `{cons.fqname}` | `{cons.file}` |")
else:
lines.append("_(no producers or consumers)_")
lines.append("")
return "\n".join(lines)
@@ -0,0 +1,198 @@
"""Additional rollups for code_path_audit v2."""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
from code_path_audit import AggregateProfile
def render_decomposition_matrix_rich(profiles):
lines = ["# Decomposition Matrix", ""]
lines.append("## All aggregates ranked by current cost")
lines.append("")
lines.append("| Aggregate | Producers | Consumers | Struct fields | Current cost (us/turn) | Direction | Actionable savings (us/turn) |")
lines.append("|---|---|---|---|---|---|---|")
real_profiles = [p for p in profiles if not p.is_candidate]
sorted_profiles = sorted(real_profiles, key=lambda p: p.decomposition_cost.current_cost_estimate, reverse=True)
for p in sorted_profiles:
dc = p.decomposition_cost
actionable = dc.componentize_savings + dc.unify_savings
lines.append(f"| `{p.name}` | {len(p.producers)} | {len(p.consumers)} | {dc.struct_field_count} | {dc.current_cost_estimate} | `{dc.recommended_direction}` | {actionable} |")
lines.append("")
lines.append("## Aggregates flagged for refactoring")
lines.append("")
flaggable = [p for p in real_profiles if p.decomposition_cost.recommended_direction in ("componentize", "unify")]
if flaggable:
lines.append("| Aggregate | Direction | Estimated savings (us/turn) | Top refactor step |")
lines.append("|---|---|---|---|")
for p in sorted(flaggable, key=lambda p: -(p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings)):
dc = p.decomposition_cost
savings = dc.componentize_savings + dc.unify_savings
step = p.decomposition_cost.recommended_rationale
lines.append(f"| `{p.name}` | `{dc.recommended_direction}` | {savings} | {step} |")
else:
lines.append("_(no aggregates currently flagged for refactoring; most have 'hold' status)_")
lines.append("")
lines.append("## Aggregates needing runtime profiling")
lines.append("")
insufficient = [p for p in real_profiles if p.decomposition_cost.recommended_direction == "insufficient_data"]
if insufficient:
lines.append("| Aggregate | Reason |")
lines.append("|---|---|")
for p in insufficient:
lines.append(f"| `{p.name}` | {p.decomposition_cost.recommended_rationale} |")
else:
lines.append("_(none)_")
lines.append("")
return "\n".join(lines)
def render_summary_rich(profiles):
lines = ["# Code Path & Data Pipeline Audit Summary", ""]
lines.append("Generated for " + str(len(profiles)) + " aggregates on 2026-06-22")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
candidate_profiles = [p for p in profiles if p.is_candidate]
lines.append("- **Real aggregates (in scope):** " + str(len(real_profiles)))
lines.append("- **Candidate aggregates (placeholders):** " + str(len(candidate_profiles)))
total_producers = sum(len(p.producers) for p in real_profiles)
total_consumers = sum(len(p.consumers) for p in real_profiles)
total_cost = sum(p.decomposition_cost.current_cost_estimate for p in real_profiles)
total_actionable = sum(p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings for p in real_profiles)
lines.append("- **Total producers:** " + str(total_producers))
lines.append("- **Total consumers:** " + str(total_consumers))
lines.append("- **Total current cost (us/turn):** " + str(total_cost))
lines.append("- **Total actionable savings (us/turn):** " + str(total_actionable))
lines.append("")
lines.append("## 4-mem-dim rollup")
lines.append("")
by_dim = {}
for p in profiles:
by_dim.setdefault(p.memory_dim, []).append(p.name)
for dim, names in sorted(by_dim.items()):
lines.append("- **" + dim + "** (" + str(len(names)) + "): " + ", ".join(names))
lines.append("")
lines.append("## Per-aggregate memory_dim + access pattern")
lines.append("")
lines.append("| Aggregate | Kind | Memory dim | Access pattern | Producers | Consumers |")
lines.append("|---|---|---|---|---|---|")
for p in sorted(real_profiles, key=lambda p: p.name):
lines.append(f"| `{p.name}` | `{p.aggregate_kind}` | `{p.memory_dim}` | `{p.access_pattern}` | {len(p.producers)} | {len(p.consumers)} |")
for p in sorted(candidate_profiles, key=lambda p: p.name):
lines.append(f"| `{p.name}` | `candidate_dataclass` | `{p.memory_dim}` | `{p.access_pattern}` | {len(p.producers)} | {len(p.consumers)} |")
lines.append("")
lines.append("## Cross-validation verdict")
lines.append("")
for p in sorted(real_profiles, key=lambda p: p.name):
rc = p.result_coverage
tac = p.type_alias_coverage
total_cf = (
len(p.cross_audit_findings.weak_types)
+ len(p.cross_audit_findings.exception_handling)
+ len(p.cross_audit_findings.optional_in_baseline)
+ len(p.cross_audit_findings.config_io_ownership)
+ len(p.cross_audit_findings.import_graph)
)
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("- **Result coverage:** " + rc.summary)
lines.append("- **Type alias coverage:** " + tac.summary)
lines.append("- **Cross-audit findings (total sites):** " + str(total_cf))
lines.append("")
return "\n".join(lines)
def render_candidates_rich(profiles):
lines = ["# Optimization Candidates", ""]
real_profiles = [p for p in profiles if not p.is_candidate]
all_candidates = []
for p in real_profiles:
for c in p.optimization_candidates:
all_candidates.append((p, c))
all_candidates.sort(key=lambda pc: -pc[1].estimated_savings_us)
lines.append("Total candidates: " + str(len(all_candidates)))
lines.append("")
if all_candidates:
lines.append("## Ranked by estimated savings")
lines.append("")
lines.append("| Rank | Aggregate | Direction | Savings (us/turn) | Effort | Priority | Affected files |")
lines.append("|---|---|---|---|---|---|---|")
for i, (p, c) in enumerate(all_candidates, 1):
lines.append(f"| {i} | `{p.name}` | `{c.direction}` | {c.estimated_savings_us} | `{c.effort}` | `{c.priority}` | {len(c.affected_files)} |")
lines.append("")
lines.append("## Detailed candidate steps")
lines.append("")
for p, c in all_candidates:
lines.append("### " + p.name + ": " + c.candidate)
lines.append("")
lines.append("- **Direction:** `" + c.direction + "`")
lines.append("- **Effort:** `" + c.effort + "`")
lines.append("- **Priority:** `" + c.priority + "`")
lines.append("- **Estimated savings:** " + str(c.estimated_savings_us) + " us/turn")
lines.append("- **Affected files:** " + ", ".join(c.affected_files[:10]))
if len(c.affected_files) > 10:
lines.append(" (+" + str(len(c.affected_files) - 10) + " more)")
lines.append("- **Reference:** " + c.cross_ref)
lines.append("")
else:
lines.append("_(no optimization candidates currently generated)_")
lines.append("")
lines.append("## Candidate placeholder aggregates")
lines.append("")
for p in [x for x in profiles if x.is_candidate]:
lines.append("- `" + p.name + "`: " + p.decomposition_cost.recommended_rationale)
lines.append("")
return "\n".join(lines)
def render_hot_path_rollup(profiles):
lines = ["# Hot Path Analysis", ""]
lines.append("Functions on the per-LLM-turn path (high-frequency consumers).")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
lines.append("## Per-aggregate hot consumers (top 5 by field access count)")
lines.append("")
for p in real_profiles:
ev = p.access_pattern_evidence
if not ev:
continue
ranked = sorted(ev, key=lambda e: -sum(e.field_accesses.values()))[:5]
if not ranked:
continue
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("| function | pattern | total field accesses |")
lines.append("|---|---|---|")
for e in ranked:
total = sum(e.field_accesses.values())
lines.append(f"| `{e.function.fqname}` | `{e.pattern}` | {total} |")
lines.append("")
return "\n".join(lines)
def render_dead_field_rollup(profiles):
lines = ["# Dead Field Analysis", ""]
lines.append("Fields that appear in producer return shapes but are never read by any consumer.")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
for p in real_profiles:
read_fields = set()
for ev in p.access_pattern_evidence:
read_fields.update(ev.field_accesses.keys())
if not read_fields:
continue
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("Fields read by at least one consumer: " + str(len(read_fields)))
lines.append("")
field_counts = {}
for ev in p.access_pattern_evidence:
for k, v in ev.field_accesses.items():
field_counts[k] = field_counts.get(k, 0) + v
if len(field_counts) <= 30:
lines.append("| field | read count |")
lines.append("|---|---|")
for f in sorted(field_counts.keys()):
lines.append(f"| `{f}` | {field_counts[f]} |")
lines.append("")
return "\n".join(lines)
@@ -0,0 +1,356 @@
"""SSDL analysis for code_path_audit v2.
Translates per-aggregate findings into SSDL (Spec/Sketch Description
Language) sketches + computes "effective codepaths" + suggests
specific defusing techniques per aggregate.
This is the layer that produces real DEDUCTIONS on codebase
organization: not just "this is a fat struct" but "this branch
explosion can be defused by introducing a nil sentinel here".
"""
from __future__ import annotations
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "src"))
import ast
from code_path_audit import (
AggregateProfile,
FunctionRef,
)
SSDL_PRIMITIVES: dict[str, str] = {
"I": "Instruction (single unit of computation)",
"T": "Terminator (returns/exits)",
"B": "Branch (conditional fork)",
"M": "Merge (control flow reconverges)",
"Q": "State Query (reads persistent state)",
"S": "State Mutation (writes persistent state)",
"N": "Nil Sentinel (defuses branches)",
}
def _resolve_filepath(fref: FunctionRef, src_dir: str) -> Path | None:
_p = Path(fref.file)
filepath = _p if _p.exists() else Path(src_dir) / fref.file
if not filepath.exists():
return None
return filepath
def compute_effective_codepaths(profile: AggregateProfile, src_dir: str = "src") -> int:
"""Compute the effective codepath count for one aggregate.
Effective codepaths = sum over all consumer functions of
2^(branch_count_in_function).
This is the combinatoric explosion metric (Fleury).
High numbers indicate branch-explosion risk; defusing with
nil sentinels or immediate-mode caches reduces it to ~1.
"""
if profile.is_candidate:
return 0
total = 0
for fref in profile.consumers:
branches = count_branches_in_function(fref, src_dir)
total += 2 ** branches
return total
def count_branches_in_function(fref: FunctionRef, src_dir: str = "src") -> int:
"""Count the explicit branch points (if/elif/while/try/for/with) in a function."""
filepath = _resolve_filepath(fref, src_dir)
if filepath is None:
return 0
try:
source = filepath.read_text(encoding="utf-8")
tree = ast.parse(source)
except (OSError, SyntaxError):
return 0
func_name = fref.fqname.rsplit(".", 1)[-1]
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
if node.name != func_name:
continue
count = 0
for sub in ast.walk(node):
if isinstance(sub, (ast.If, ast.For, ast.While, ast.With, ast.Try, ast.ExceptHandler)):
count += 1
elif isinstance(sub, ast.BoolOp):
count += len(sub.values) - 1
return count
return 0
def detect_nil_check_pattern(fref: FunctionRef, src_dir: str = "src") -> bool:
"""Detect if the function uses `is None` / `== None` / `!= None` checks.
A nil check is a branch that a nil sentinel could defuse.
"""
filepath = _resolve_filepath(fref, src_dir)
if filepath is None:
return False
try:
source = filepath.read_text(encoding="utf-8")
tree = ast.parse(source)
except (OSError, SyntaxError):
return False
func_name = fref.fqname.rsplit(".", 1)[-1]
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
if node.name != func_name:
continue
for sub in ast.walk(node):
if not isinstance(sub, ast.Compare):
continue
for comparator in sub.comparators:
if isinstance(comparator, ast.Constant) and comparator.value is None:
return True
return False
return False
def compute_field_access_efficiency(profile: AggregateProfile) -> float:
"""Compute field-access efficiency: ratio of typed accesses to total accesses.
High efficiency (>0.7) means consumers are using the typed fields directly.
Low efficiency (<0.3) means consumers are using wildcards or the aggregate
is being passed through without field use (candidate for immediate-mode).
"""
if profile.is_candidate:
return 1.0
tac = profile.type_alias_coverage
if tac.total_sites == 0:
return 0.0
return tac.typed_sites / tac.total_sites
def suggest_defusing_technique(profile: AggregateProfile, src_dir: str = "src") -> list[dict]:
"""Suggest specific SSDL defusing techniques for this aggregate.
Returns a list of {technique, location, current_state, recommended_change,
effective_codepaths_before, effective_codepaths_after}.
"""
suggestions: list[dict] = []
if profile.is_candidate:
return suggestions
nil_check_count = sum(1 for f in profile.consumers if detect_nil_check_pattern(f, src_dir))
effective = compute_effective_codepaths(profile, src_dir)
efficiency = compute_field_access_efficiency(profile)
branch_count = sum(count_branches_in_function(f, src_dir) for f in profile.consumers)
if nil_check_count > 0:
suggestions.append({
"technique": "Nil Sentinel `[N]`",
"location": f"{nil_check_count} consumer function{'s' if nil_check_count != 1 else ''} have `is None` / `== None` checks",
"current_state": f"{nil_check_count} nil-check branches contribute to branch explosion",
"recommended_change": "Introduce a module-level `NIL_<AGGREGATE>` sentinel whose field accesses return safe defaults. Replace None checks with the sentinel. Collapses 2^branch_count into ~1.",
"effective_codepaths_before": effective,
"effective_codepaths_after": max(1, effective - nil_check_count * 2),
})
if efficiency < 0.3:
suggestions.append({
"technique": "Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]`",
"location": f"{profile.name} consumers access {profile.type_alias_coverage.total_sites} sites, only {profile.type_alias_coverage.typed_sites} typed ({efficiency*100:.0f}%)",
"current_state": "Many consumers use wildcard or defensive access patterns",
"recommended_change": f"Introduce a `{profile.name.lower()}_cache` keyed lookup. Consumers request by key, get cached value, no field-existence checks. Reduces {profile.type_alias_coverage.total_sites} field-check branches to 1 cache lookup.",
"effective_codepaths_before": effective,
"effective_codepaths_after": max(1, profile.type_alias_coverage.total_sites),
})
if branch_count > 20:
suggestions.append({
"technique": "Generational Handles `[I:ResolveHandle] -> [B:Gen matches?] -> [N|safe]`",
"location": f"{profile.name} consumers have {branch_count} explicit branch points total",
"current_state": f"Branch explosion: {branch_count} branches = {effective} effective codepaths",
"recommended_change": "Wrap the aggregate in a generational handle (index + generation). Validation is one comparison; mismatch returns the nil sentinel. Reduces N lifetime branches to 1 handle validation + sentinel return.",
"effective_codepaths_before": effective,
"effective_codepaths_after": len(profile.consumers),
})
return suggestions
def render_ssdl_sketch(profile: AggregateProfile, src_dir: str = "src") -> str:
"""Render an SSDL sketch of one aggregate's access pattern.
The sketch shows:
- Producers (queries that fetch the aggregate)
- Consumers (instruction sequences that read the aggregate)
- Branch points (B)
- Defusing opportunities (N)
- Effective codepaths metric
"""
if profile.is_candidate:
return f"## SSDL Sketch for {profile.name}\n\n_(placeholder; candidate aggregate)_\n"
lines: list[str] = [f"## SSDL Sketch for `{profile.name}`", ""]
lines.append("```")
lines.append(f"[Q:{profile.name} entry-point] -> [Q:PCG lookup]")
nil_check_funcs = [f for f in profile.consumers if detect_nil_check_pattern(f, src_dir)]
branches_total = 0
for i, fref in enumerate(profile.consumers):
b = count_branches_in_function(fref, src_dir)
branches_total += b
is_nil = fref in nil_check_funcs
nil_marker = "[B:is None?]" if is_nil else "[B:check]"
nil_defuse = "[N:safe]" if is_nil else ""
short_name = fref.fqname.rsplit(".", 1)[-1]
lines.append(f" -> [{i+1}: {short_name}] {nil_marker} (branches={b}) {nil_defuse}")
lines.append(" -> [T:done]")
lines.append("```")
lines.append("")
effective = compute_effective_codepaths(profile, src_dir)
lines.append(f"**Effective codepaths:** {effective} (sum of 2^branches across {len(profile.consumers)} consumers)")
lines.append(f"**Total branch points:** {branches_total}")
lines.append(f"**Nil-check functions:** {len(nil_check_funcs)}")
lines.append("")
suggestions = suggest_defusing_technique(profile, src_dir)
if suggestions:
lines.append("**Defusing opportunities:**")
lines.append("")
for s in suggestions:
lines.append(f"- **{s['technique']}**: {s['recommended_change']}")
lines.append(f" - Effective codepaths: {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}")
else:
lines.append("**No SSDL defusing opportunities detected** (the aggregate is already well-structured for data-oriented access).")
lines.append("")
return "\n".join(lines)
def render_ssdl_rollup(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str:
"""Render the SSDL rollup (all aggregates + their defusing opportunities)."""
lines: list[str] = ["# SSDL Analysis Rollup", ""]
lines.append("Per-aggregate analysis: effective codepaths, branch points, defusing opportunities.")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
lines.append("## Effective codepaths ranking")
lines.append("")
lines.append("| Aggregate | Consumers | Total branches | Effective codepaths | Field efficiency |")
lines.append("|---|---|---|---|---|")
ranked = sorted(real_profiles, key=lambda p: -compute_effective_codepaths(p, src_dir))
for p in ranked:
ec = compute_effective_codepaths(p, src_dir)
tc = sum(count_branches_in_function(f, src_dir) for f in p.consumers)
eff = compute_field_access_efficiency(p) * 100
lines.append(f"| `{p.name}` | {len(p.consumers)} | {tc} | {ec} | {eff:.0f}% |")
lines.append("")
lines.append("## Defusing recommendations (top 10)")
lines.append("")
all_suggestions: list[tuple[AggregateProfile, dict]] = []
for p in real_profiles:
for s in suggest_defusing_technique(p, src_dir):
all_suggestions.append((p, s))
all_suggestions.sort(key=lambda ps: -(ps[1]['effective_codepaths_before'] - ps[1]['effective_codepaths_after']))
if not all_suggestions:
lines.append("_(no defusing recommendations detected)_\n")
return "\n".join(lines)
for p, s in all_suggestions[:10]:
lines.append(f"### `{p.name}` - {s['technique']}")
lines.append("")
lines.append(f"- **Location:** {s['location']}")
lines.append(f"- **Current state:** {s['current_state']}")
lines.append(f"- **Recommended change:** {s['recommended_change']}")
lines.append(f"- **Effective codepaths:** {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}")
lines.append("")
return "\n".join(lines)
def render_organization_deductions(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str:
"""Render the organization deductions rollup.
Cross-aggregate view of codebase organization. Based on SSDL principles:
- Well-organized: few branches, high field efficiency, few effective codepaths
- Needs restructuring: many branches, low efficiency, branch-explosion risk
"""
lines: list[str] = ["# Organization Deductions", ""]
lines.append("Cross-aggregate view of codebase organization. Verdicts derived from SSDL analysis:")
lines.append("- **well-organized**: <=50 effective codepaths AND >=50% field efficiency")
lines.append("- **moderate**: between the two thresholds")
lines.append("- **needs restructuring**: >200 effective codepaths OR <20% field efficiency")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
lines.append("## Module organization observations")
lines.append("")
lines.append("### Files with most cross-aggregate involvement")
lines.append("")
file_agg: dict[str, set[str]] = {}
file_consumers: dict[str, set[str]] = {}
for p in real_profiles:
for f in p.producers:
file_agg.setdefault(f.file, set()).add(p.name)
for f in p.consumers:
file_consumers.setdefault(f.file, set()).add(p.name)
rows: list[tuple[str, int, int]] = []
for f in sorted(file_agg.keys()):
rows.append((f, len(file_agg[f]), len(file_consumers.get(f, set()))))
rows.sort(key=lambda r: -(r[1] + r[2]))
lines.append("| file | aggregates produced | aggregates consumed |")
lines.append("|---|---|---|")
for f, pc, cc in rows[:15]:
lines.append(f"| `{f}` | {pc} | {cc} |")
lines.append("")
lines.append("### Files with high coupling (producers + consumers >= 8)")
lines.append("")
lines.append("These files are the central nervous system of the codebase. Changes ripple across the most aggregates.")
lines.append("")
lines.append("| file | coupling score (producers + consumers) |")
lines.append("|---|---|")
high_coupling = [(f, pc, cc) for f, pc, cc in rows if (pc + cc) >= 8]
for f, pc, cc in high_coupling:
lines.append(f"| `{f}` | {pc + cc} (high) |")
lines.append("")
lines.append("## Per-aggregate organization verdict")
lines.append("")
lines.append("| Aggregate | Verdict | Notes |")
lines.append("|---|---|---|")
verdict_counts = {"well-organized": 0, "moderate": 0, "needs restructuring": 0}
for p in real_profiles:
ec = compute_effective_codepaths(p, src_dir)
eff = compute_field_access_efficiency(p) * 100
nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir))
if ec <= 50 and eff >= 50:
verdict = "well-organized"
elif ec > 200 or eff < 20:
verdict = "needs restructuring"
else:
verdict = "moderate"
verdict_counts[verdict] += 1
notes: list[str] = []
if nil_count > 0:
notes.append(f"{nil_count} nil checks")
if eff < 50:
notes.append(f"{eff:.0f}% field efficiency")
if ec > 100:
notes.append(f"{ec} effective codepaths")
note_str = "; ".join(notes) if notes else "no major issues"
lines.append(f"| `{p.name}` | {verdict} | {note_str} |")
lines.append("")
lines.append(f"**Tally:** {verdict_counts['well-organized']} well-organized, {verdict_counts['moderate']} moderate, {verdict_counts['needs restructuring']} needs restructuring")
lines.append("")
lines.append("## Restructuring routes (prioritized)")
lines.append("")
priority_routes = []
for p in real_profiles:
ec = compute_effective_codepaths(p, src_dir)
eff = compute_field_access_efficiency(p)
if ec > 100 or eff < 0.3:
priority_routes.append((p, ec, eff))
priority_routes.sort(key=lambda r: -r[1])
if priority_routes:
lines.append("Top restructuring routes (by effective codepath count):")
lines.append("")
for i, (p, ec, eff) in enumerate(priority_routes[:5], 1):
nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir))
lines.append(f"{i}. **`{p.name}`**: {ec} effective codepaths ({eff*100:.0f}% field efficiency)")
lines.append(f" - Apply nil sentinel to {nil_count} nil-check functions")
lines.append(f" - Migrate to immediate-mode cache for {p.type_alias_coverage.total_sites} field-access sites")
else:
lines.append("_(no high-priority restructuring routes; all aggregates have moderate effective codepath counts)_")
lines.append("")
return "\n".join(lines)