"""SSDL analysis for code_path_audit v2. Translates per-aggregate findings into SSDL (Spec/Sketch Description Language) sketches + computes "effective codepaths" + suggests specific defusing techniques per aggregate. This is the layer that produces real DEDUCTIONS on codebase organization: not just "this is a fat struct" but "this branch explosion can be defused by introducing a nil sentinel here". """ from __future__ import annotations import ast from pathlib import Path from src.code_path_audit import ( AggregateProfile, FunctionRef, ) SSDL_PRIMITIVES: dict[str, str] = { "I": "Instruction (single unit of computation)", "T": "Terminator (returns/exits)", "B": "Branch (conditional fork)", "M": "Merge (control flow reconverges)", "Q": "State Query (reads persistent state)", "S": "State Mutation (writes persistent state)", "N": "Nil Sentinel (defuses branches)", } def _resolve_filepath(fref: FunctionRef, src_dir: str) -> Path | None: _p = Path(fref.file) filepath = _p if _p.exists() else Path(src_dir) / fref.file if not filepath.exists(): return None return filepath def compute_effective_codepaths(profile: AggregateProfile, src_dir: str = "src") -> int: """Compute the effective codepath count for one aggregate. Effective codepaths = sum over all consumer functions of 2^(branch_count_in_function). This is the combinatoric explosion metric (Fleury). High numbers indicate branch-explosion risk; defusing with nil sentinels or immediate-mode caches reduces it to ~1. """ if profile.is_candidate: return 0 total = 0 for fref in profile.consumers: branches = count_branches_in_function(fref, src_dir) total += 2 ** branches return total def count_branches_in_function(fref: FunctionRef, src_dir: str = "src") -> int: """Count the explicit branch points (if/elif/while/try/for/with) in a function.""" filepath = _resolve_filepath(fref, src_dir) if filepath is None: return 0 try: source = filepath.read_text(encoding="utf-8") tree = ast.parse(source) except (OSError, SyntaxError): return 0 func_name = fref.fqname.rsplit(".", 1)[-1] for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue if node.name != func_name: continue count = 0 for sub in ast.walk(node): if isinstance(sub, (ast.If, ast.For, ast.While, ast.With, ast.Try, ast.ExceptHandler)): count += 1 elif isinstance(sub, ast.BoolOp): count += len(sub.values) - 1 return count return 0 def detect_nil_check_pattern(fref: FunctionRef, src_dir: str = "src") -> bool: """Detect if the function uses `is None` / `== None` / `!= None` checks. A nil check is a branch that a nil sentinel could defuse. """ filepath = _resolve_filepath(fref, src_dir) if filepath is None: return False try: source = filepath.read_text(encoding="utf-8") tree = ast.parse(source) except (OSError, SyntaxError): return False func_name = fref.fqname.rsplit(".", 1)[-1] for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue if node.name != func_name: continue for sub in ast.walk(node): if not isinstance(sub, ast.Compare): continue for comparator in sub.comparators: if isinstance(comparator, ast.Constant) and comparator.value is None: return True return False return False def compute_field_access_efficiency(profile: AggregateProfile) -> float: """Compute field-access efficiency: ratio of typed accesses to total accesses. High efficiency (>0.7) means consumers are using the typed fields directly. Low efficiency (<0.3) means consumers are using wildcards or the aggregate is being passed through without field use (candidate for immediate-mode). """ if profile.is_candidate: return 1.0 tac = profile.type_alias_coverage if tac.total_sites == 0: return 0.0 return tac.typed_sites / tac.total_sites def suggest_defusing_technique(profile: AggregateProfile, src_dir: str = "src") -> list[dict]: """Suggest specific SSDL defusing techniques for this aggregate. Returns a list of {technique, location, current_state, recommended_change, effective_codepaths_before, effective_codepaths_after}. """ suggestions: list[dict] = [] if profile.is_candidate: return suggestions nil_check_count = sum(1 for f in profile.consumers if detect_nil_check_pattern(f, src_dir)) effective = compute_effective_codepaths(profile, src_dir) efficiency = compute_field_access_efficiency(profile) branch_count = sum(count_branches_in_function(f, src_dir) for f in profile.consumers) if nil_check_count > 0: suggestions.append({ "technique": "Nil Sentinel `[N]`", "location": f"{nil_check_count} consumer function{'s' if nil_check_count != 1 else ''} have `is None` / `== None` checks", "current_state": f"{nil_check_count} nil-check branches contribute to branch explosion", "recommended_change": "Introduce a module-level `NIL_` sentinel whose field accesses return safe defaults. Replace None checks with the sentinel. Collapses 2^branch_count into ~1.", "effective_codepaths_before": effective, "effective_codepaths_after": max(1, effective - nil_check_count * 2), }) if efficiency < 0.3: suggestions.append({ "technique": "Immediate-Mode Cache `[Q:key] -> [I:FetchCached] -> [T]`", "location": f"{profile.name} consumers access {profile.type_alias_coverage.total_sites} sites, only {profile.type_alias_coverage.typed_sites} typed ({efficiency*100:.0f}%)", "current_state": "Many consumers use wildcard or defensive access patterns", "recommended_change": f"Introduce a `{profile.name.lower()}_cache` keyed lookup. Consumers request by key, get cached value, no field-existence checks. Reduces {profile.type_alias_coverage.total_sites} field-check branches to 1 cache lookup.", "effective_codepaths_before": effective, "effective_codepaths_after": max(1, profile.type_alias_coverage.total_sites), }) if branch_count > 20: suggestions.append({ "technique": "Generational Handles `[I:ResolveHandle] -> [B:Gen matches?] -> [N|safe]`", "location": f"{profile.name} consumers have {branch_count} explicit branch points total", "current_state": f"Branch explosion: {branch_count} branches = {effective} effective codepaths", "recommended_change": "Wrap the aggregate in a generational handle (index + generation). Validation is one comparison; mismatch returns the nil sentinel. Reduces N lifetime branches to 1 handle validation + sentinel return.", "effective_codepaths_before": effective, "effective_codepaths_after": len(profile.consumers), }) return suggestions def render_ssdl_sketch(profile: AggregateProfile, src_dir: str = "src") -> str: """Render an SSDL sketch of one aggregate's access pattern. The sketch shows: - Producers (queries that fetch the aggregate) - Consumers (instruction sequences that read the aggregate) - Branch points (B) - Defusing opportunities (N) - Effective codepaths metric """ if profile.is_candidate: return f"## SSDL Sketch for {profile.name}\n\n_(placeholder; candidate aggregate)_\n" lines: list[str] = [f"## SSDL Sketch for `{profile.name}`", ""] lines.append("```") lines.append(f"[Q:{profile.name} entry-point] -> [Q:PCG lookup]") nil_check_funcs = [f for f in profile.consumers if detect_nil_check_pattern(f, src_dir)] branches_total = 0 for i, fref in enumerate(profile.consumers): b = count_branches_in_function(fref, src_dir) branches_total += b is_nil = fref in nil_check_funcs nil_marker = "[B:is None?]" if is_nil else "[B:check]" nil_defuse = "[N:safe]" if is_nil else "" short_name = fref.fqname.rsplit(".", 1)[-1] lines.append(f" -> [{i+1}: {short_name}] {nil_marker} (branches={b}) {nil_defuse}") lines.append(" -> [T:done]") lines.append("```") lines.append("") effective = compute_effective_codepaths(profile, src_dir) lines.append(f"**Effective codepaths:** {effective} (sum of 2^branches across {len(profile.consumers)} consumers)") lines.append(f"**Total branch points:** {branches_total}") lines.append(f"**Nil-check functions:** {len(nil_check_funcs)}") lines.append("") suggestions = suggest_defusing_technique(profile, src_dir) if suggestions: lines.append("**Defusing opportunities:**") lines.append("") for s in suggestions: lines.append(f"- **{s['technique']}**: {s['recommended_change']}") lines.append(f" - Effective codepaths: {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}") else: lines.append("**No SSDL defusing opportunities detected** (the aggregate is already well-structured for data-oriented access).") lines.append("") return "\n".join(lines) def render_ssdl_rollup(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str: """Render the SSDL rollup (all aggregates + their defusing opportunities).""" lines: list[str] = ["# SSDL Analysis Rollup", ""] lines.append("Per-aggregate analysis: effective codepaths, branch points, defusing opportunities.") lines.append("") real_profiles = [p for p in profiles if not p.is_candidate] lines.append("## Effective codepaths ranking") lines.append("") lines.append("| Aggregate | Consumers | Total branches | Effective codepaths | Field efficiency |") lines.append("|---|---|---|---|---|") ranked = sorted(real_profiles, key=lambda p: -compute_effective_codepaths(p, src_dir)) for p in ranked: ec = compute_effective_codepaths(p, src_dir) tc = sum(count_branches_in_function(f, src_dir) for f in p.consumers) eff = compute_field_access_efficiency(p) * 100 lines.append(f"| `{p.name}` | {len(p.consumers)} | {tc} | {ec} | {eff:.0f}% |") lines.append("") lines.append("## Defusing recommendations (top 10)") lines.append("") all_suggestions: list[tuple[AggregateProfile, dict]] = [] for p in real_profiles: for s in suggest_defusing_technique(p, src_dir): all_suggestions.append((p, s)) all_suggestions.sort(key=lambda ps: -(ps[1]['effective_codepaths_before'] - ps[1]['effective_codepaths_after'])) if not all_suggestions: lines.append("_(no defusing recommendations detected)_\n") return "\n".join(lines) for p, s in all_suggestions[:10]: lines.append(f"### `{p.name}` - {s['technique']}") lines.append("") lines.append(f"- **Location:** {s['location']}") lines.append(f"- **Current state:** {s['current_state']}") lines.append(f"- **Recommended change:** {s['recommended_change']}") lines.append(f"- **Effective codepaths:** {s['effective_codepaths_before']} -> {s['effective_codepaths_after']}") lines.append("") return "\n".join(lines) def render_organization_deductions(profiles: tuple[AggregateProfile, ...], src_dir: str = "src") -> str: """Render the organization deductions rollup. Cross-aggregate view of codebase organization. Based on SSDL principles: - Well-organized: few branches, high field efficiency, few effective codepaths - Needs restructuring: many branches, low efficiency, branch-explosion risk """ lines: list[str] = ["# Organization Deductions", ""] lines.append("Cross-aggregate view of codebase organization. Verdicts derived from SSDL analysis:") lines.append("- **well-organized**: <=50 effective codepaths AND >=50% field efficiency") lines.append("- **moderate**: between the two thresholds") lines.append("- **needs restructuring**: >200 effective codepaths OR <20% field efficiency") lines.append("") real_profiles = [p for p in profiles if not p.is_candidate] lines.append("## Module organization observations") lines.append("") lines.append("### Files with most cross-aggregate involvement") lines.append("") file_agg: dict[str, set[str]] = {} file_consumers: dict[str, set[str]] = {} for p in real_profiles: for f in p.producers: file_agg.setdefault(f.file, set()).add(p.name) for f in p.consumers: file_consumers.setdefault(f.file, set()).add(p.name) rows: list[tuple[str, int, int]] = [] for f in sorted(file_agg.keys()): rows.append((f, len(file_agg[f]), len(file_consumers.get(f, set())))) rows.sort(key=lambda r: -(r[1] + r[2])) lines.append("| file | aggregates produced | aggregates consumed |") lines.append("|---|---|---|") for f, pc, cc in rows[:15]: lines.append(f"| `{f}` | {pc} | {cc} |") lines.append("") lines.append("### Files with high coupling (producers + consumers >= 8)") lines.append("") lines.append("These files are the central nervous system of the codebase. Changes ripple across the most aggregates.") lines.append("") lines.append("| file | coupling score (producers + consumers) |") lines.append("|---|---|") high_coupling = [(f, pc, cc) for f, pc, cc in rows if (pc + cc) >= 8] for f, pc, cc in high_coupling: lines.append(f"| `{f}` | {pc + cc} (high) |") lines.append("") lines.append("## Per-aggregate organization verdict") lines.append("") lines.append("| Aggregate | Verdict | Notes |") lines.append("|---|---|---|") verdict_counts = {"well-organized": 0, "moderate": 0, "needs restructuring": 0} for p in real_profiles: ec = compute_effective_codepaths(p, src_dir) eff = compute_field_access_efficiency(p) * 100 nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir)) if ec <= 50 and eff >= 50: verdict = "well-organized" elif ec > 200 or eff < 20: verdict = "needs restructuring" else: verdict = "moderate" verdict_counts[verdict] += 1 notes: list[str] = [] if nil_count > 0: notes.append(f"{nil_count} nil checks") if eff < 50: notes.append(f"{eff:.0f}% field efficiency") if ec > 100: notes.append(f"{ec} effective codepaths") note_str = "; ".join(notes) if notes else "no major issues" lines.append(f"| `{p.name}` | {verdict} | {note_str} |") lines.append("") lines.append(f"**Tally:** {verdict_counts['well-organized']} well-organized, {verdict_counts['moderate']} moderate, {verdict_counts['needs restructuring']} needs restructuring") lines.append("") lines.append("## Restructuring routes (prioritized)") lines.append("") priority_routes = [] for p in real_profiles: ec = compute_effective_codepaths(p, src_dir) eff = compute_field_access_efficiency(p) if ec > 100 or eff < 0.3: priority_routes.append((p, ec, eff)) priority_routes.sort(key=lambda r: -r[1]) if priority_routes: lines.append("Top restructuring routes (by effective codepath count):") lines.append("") for i, (p, ec, eff) in enumerate(priority_routes[:5], 1): nil_count = sum(1 for f in p.consumers if detect_nil_check_pattern(f, src_dir)) lines.append(f"{i}. **`{p.name}`**: {ec} effective codepaths ({eff*100:.0f}% field efficiency)") lines.append(f" - Apply nil sentinel to {nil_count} nil-check functions") lines.append(f" - Migrate to immediate-mode cache for {p.type_alias_coverage.total_sites} field-access sites") else: lines.append("_(no high-priority restructuring routes; all aggregates have moderate effective codepath counts)_") lines.append("") return "\n".join(lines)