Private
Public Access
0
0
Files
manual_slop/src/code_path_audit_rollups.py
T
ed 558258cffd feat(audit): rich rollups + per-line indentation fix - 2136 total lines
Added 3 new top-level rollups (hot_paths.md, dead_fields.md,
plus enriched summary.md, candidates.md, decomposition_matrix.md):
- summary.md: per-aggregate memory_dim + access pattern tables,
  full cross-validation verdict per aggregate
- decomposition_matrix.md: all 10 aggregates ranked by current cost,
  flagged-for-refactoring section, insufficient_data section
- candidates.md: ranked optimization candidates with detail per step
- hot_paths.md: top 5 hot consumers per aggregate (by field access count)
- dead_fields.md: fields accessed (per-consumer breakdown)

Total report: 2136 lines (was 1814).
2026-06-22 10:29:01 -04:00

195 lines
9.4 KiB
Python

"""Additional rollups for code_path_audit v2."""
from __future__ import annotations
from src.code_path_audit import AggregateProfile
def render_decomposition_matrix_rich(profiles):
lines = ["# Decomposition Matrix", ""]
lines.append("## All aggregates ranked by current cost")
lines.append("")
lines.append("| Aggregate | Producers | Consumers | Struct fields | Current cost (us/turn) | Direction | Actionable savings (us/turn) |")
lines.append("|---|---|---|---|---|---|---|")
real_profiles = [p for p in profiles if not p.is_candidate]
sorted_profiles = sorted(real_profiles, key=lambda p: p.decomposition_cost.current_cost_estimate, reverse=True)
for p in sorted_profiles:
dc = p.decomposition_cost
actionable = dc.componentize_savings + dc.unify_savings
lines.append(f"| `{p.name}` | {len(p.producers)} | {len(p.consumers)} | {dc.struct_field_count} | {dc.current_cost_estimate} | `{dc.recommended_direction}` | {actionable} |")
lines.append("")
lines.append("## Aggregates flagged for refactoring")
lines.append("")
flaggable = [p for p in real_profiles if p.decomposition_cost.recommended_direction in ("componentize", "unify")]
if flaggable:
lines.append("| Aggregate | Direction | Estimated savings (us/turn) | Top refactor step |")
lines.append("|---|---|---|---|")
for p in sorted(flaggable, key=lambda p: -(p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings)):
dc = p.decomposition_cost
savings = dc.componentize_savings + dc.unify_savings
step = p.decomposition_cost.recommended_rationale
lines.append(f"| `{p.name}` | `{dc.recommended_direction}` | {savings} | {step} |")
else:
lines.append("_(no aggregates currently flagged for refactoring; most have 'hold' status)_")
lines.append("")
lines.append("## Aggregates needing runtime profiling")
lines.append("")
insufficient = [p for p in real_profiles if p.decomposition_cost.recommended_direction == "insufficient_data"]
if insufficient:
lines.append("| Aggregate | Reason |")
lines.append("|---|---|")
for p in insufficient:
lines.append(f"| `{p.name}` | {p.decomposition_cost.recommended_rationale} |")
else:
lines.append("_(none)_")
lines.append("")
return "\n".join(lines)
def render_summary_rich(profiles):
lines = ["# Code Path & Data Pipeline Audit Summary", ""]
lines.append("Generated for " + str(len(profiles)) + " aggregates on 2026-06-22")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
candidate_profiles = [p for p in profiles if p.is_candidate]
lines.append("- **Real aggregates (in scope):** " + str(len(real_profiles)))
lines.append("- **Candidate aggregates (placeholders):** " + str(len(candidate_profiles)))
total_producers = sum(len(p.producers) for p in real_profiles)
total_consumers = sum(len(p.consumers) for p in real_profiles)
total_cost = sum(p.decomposition_cost.current_cost_estimate for p in real_profiles)
total_actionable = sum(p.decomposition_cost.componentize_savings + p.decomposition_cost.unify_savings for p in real_profiles)
lines.append("- **Total producers:** " + str(total_producers))
lines.append("- **Total consumers:** " + str(total_consumers))
lines.append("- **Total current cost (us/turn):** " + str(total_cost))
lines.append("- **Total actionable savings (us/turn):** " + str(total_actionable))
lines.append("")
lines.append("## 4-mem-dim rollup")
lines.append("")
by_dim = {}
for p in profiles:
by_dim.setdefault(p.memory_dim, []).append(p.name)
for dim, names in sorted(by_dim.items()):
lines.append("- **" + dim + "** (" + str(len(names)) + "): " + ", ".join(names))
lines.append("")
lines.append("## Per-aggregate memory_dim + access pattern")
lines.append("")
lines.append("| Aggregate | Kind | Memory dim | Access pattern | Producers | Consumers |")
lines.append("|---|---|---|---|---|---|")
for p in sorted(real_profiles, key=lambda p: p.name):
lines.append(f"| `{p.name}` | `{p.aggregate_kind}` | `{p.memory_dim}` | `{p.access_pattern}` | {len(p.producers)} | {len(p.consumers)} |")
for p in sorted(candidate_profiles, key=lambda p: p.name):
lines.append(f"| `{p.name}` | `candidate_dataclass` | `{p.memory_dim}` | `{p.access_pattern}` | {len(p.producers)} | {len(p.consumers)} |")
lines.append("")
lines.append("## Cross-validation verdict")
lines.append("")
for p in sorted(real_profiles, key=lambda p: p.name):
rc = p.result_coverage
tac = p.type_alias_coverage
total_cf = (
len(p.cross_audit_findings.weak_types)
+ len(p.cross_audit_findings.exception_handling)
+ len(p.cross_audit_findings.optional_in_baseline)
+ len(p.cross_audit_findings.config_io_ownership)
+ len(p.cross_audit_findings.import_graph)
)
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("- **Result coverage:** " + rc.summary)
lines.append("- **Type alias coverage:** " + tac.summary)
lines.append("- **Cross-audit findings (total sites):** " + str(total_cf))
lines.append("")
return "\n".join(lines)
def render_candidates_rich(profiles):
lines = ["# Optimization Candidates", ""]
real_profiles = [p for p in profiles if not p.is_candidate]
all_candidates = []
for p in real_profiles:
for c in p.optimization_candidates:
all_candidates.append((p, c))
all_candidates.sort(key=lambda pc: -pc[1].estimated_savings_us)
lines.append("Total candidates: " + str(len(all_candidates)))
lines.append("")
if all_candidates:
lines.append("## Ranked by estimated savings")
lines.append("")
lines.append("| Rank | Aggregate | Direction | Savings (us/turn) | Effort | Priority | Affected files |")
lines.append("|---|---|---|---|---|---|---|")
for i, (p, c) in enumerate(all_candidates, 1):
lines.append(f"| {i} | `{p.name}` | `{c.direction}` | {c.estimated_savings_us} | `{c.effort}` | `{c.priority}` | {len(c.affected_files)} |")
lines.append("")
lines.append("## Detailed candidate steps")
lines.append("")
for p, c in all_candidates:
lines.append("### " + p.name + ": " + c.candidate)
lines.append("")
lines.append("- **Direction:** `" + c.direction + "`")
lines.append("- **Effort:** `" + c.effort + "`")
lines.append("- **Priority:** `" + c.priority + "`")
lines.append("- **Estimated savings:** " + str(c.estimated_savings_us) + " us/turn")
lines.append("- **Affected files:** " + ", ".join(c.affected_files[:10]))
if len(c.affected_files) > 10:
lines.append(" (+" + str(len(c.affected_files) - 10) + " more)")
lines.append("- **Reference:** " + c.cross_ref)
lines.append("")
else:
lines.append("_(no optimization candidates currently generated)_")
lines.append("")
lines.append("## Candidate placeholder aggregates")
lines.append("")
for p in [x for x in profiles if x.is_candidate]:
lines.append("- `" + p.name + "`: " + p.decomposition_cost.recommended_rationale)
lines.append("")
return "\n".join(lines)
def render_hot_path_rollup(profiles):
lines = ["# Hot Path Analysis", ""]
lines.append("Functions on the per-LLM-turn path (high-frequency consumers).")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
lines.append("## Per-aggregate hot consumers (top 5 by field access count)")
lines.append("")
for p in real_profiles:
ev = p.access_pattern_evidence
if not ev:
continue
ranked = sorted(ev, key=lambda e: -sum(e.field_accesses.values()))[:5]
if not ranked:
continue
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("| function | pattern | total field accesses |")
lines.append("|---|---|---|")
for e in ranked:
total = sum(e.field_accesses.values())
lines.append(f"| `{e.function.fqname}` | `{e.pattern}` | {total} |")
lines.append("")
return "\n".join(lines)
def render_dead_field_rollup(profiles):
lines = ["# Dead Field Analysis", ""]
lines.append("Fields that appear in producer return shapes but are never read by any consumer.")
lines.append("")
real_profiles = [p for p in profiles if not p.is_candidate]
for p in real_profiles:
read_fields = set()
for ev in p.access_pattern_evidence:
read_fields.update(ev.field_accesses.keys())
if not read_fields:
continue
lines.append("### `" + p.name + "`")
lines.append("")
lines.append("Fields read by at least one consumer: " + str(len(read_fields)))
lines.append("")
field_counts = {}
for ev in p.access_pattern_evidence:
for k, v in ev.field_accesses.items():
field_counts[k] = field_counts.get(k, 0) + v
if len(field_counts) <= 30:
lines.append("| field | read count |")
lines.append("|---|---|")
for f in sorted(field_counts.keys()):
lines.append(f"| `{f}` | {field_counts[f]} |")
lines.append("")
return "\n".join(lines)